CompletableFuture 案例

private static ThreadFactory namedTreadFactory = new ThreadFactoryBuilder().setNameFormat("lottery-pool-%d").build();

private static ExecutorService doLotteryPool = new ThreadPoolExecutor(1, 5,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingDeque<>(2),
    namedTreadFactory,
    new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) {


    Long start = System.currentTimeMillis();
    // 结果集
    List<String> list = new ArrayList<>();


    List<Integer> taskList = Arrays.asList(2, 1, 3,2, 1, 3,2, 1, 3,2, 1, 3,2, 1, 3,2, 1, 3,2, 1, 3);
    // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
    List<CompletableFuture<String>> futures = taskList.stream()
        .map(integer ->

            CompletableFuture.supplyAsync(() ->
                    calc(integer),
                doLotteryPool)
                //当计算完成的时候请执行某个function
                .thenApply(h ->
                    thenApply(h)
                )
                //计算结果完成时的处理
                .whenComplete((s, e) -> {
                    System.out.println("任务" + s + "完成!result=" + s + ",异常 e=" + e + "," + new Date());
                    list.add(s);
                })
                .exceptionally(e -> {
                    //e.printStackTrace();
                    System.out.println("出现异常"+e);
                    //返回一个默认值
                    return "0";
                })
        ).collect(Collectors.toList());

    // 封装后无返回值,必须自己whenComplete()获取
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
    System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start));

    // 输出结果;

    //获取值
    System.out.println("----------等待结果返回-------------");
    String res = futures.stream()
        .map(f -> {
            try {
                return f.thenApply(String::valueOf).get();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "";
        }).collect(Collectors.joining(" , ", "", ""));
    System.out.println("res = " + res);
}

private static String thenApply(Integer h) {
    System.out.println("当计算完成的时候请执行某个function");
    return Integer.toString(h);
}

public static Integer calc(Integer i) {
    try {
        if (i == 1) {
            Thread.sleep(3000);//任务1耗时3秒
        } else if (i == 5) {
            Thread.sleep(5000);//任务5耗时5秒
        } else {
            Thread.sleep(1000);//其它任务耗时1秒
        }
        System.out.println("task线程:" + Thread.currentThread().getName()
            + "任务i=" + i + ",完成!+" + new Date());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if(i==3){
        int num=1/0;
    }

    return i;
}

CompletableFuture 详解

 一、Future模式

Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future接口可以构建异步应用,是多线程开发中常见的设计模式。

当我们需要调用一个函数方法时。如果这个函数执行很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。

因此,我们可以让被调用者立即返回,让他在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获取需要的数据。

image.png

 

用法如下:

public static void main(String[] args) {

    ExecutorService executor = Executors.newCachedThreadPool();

    Future<String> future=executor.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            String result=new Random().nextLong()+"";
            Thread.sleep(2000);
            return result;
        }
    });

    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

}

Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却是不方便,只能通过阻塞或轮询的方式得到任务结果。阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源。而且还不能及时得到计算结果.

Future 接口的局限性

 了解了Future的使用,这里就要谈谈Future的局限性。Future很难直接表述多个Future 结果之间的依赖性,开发中,我们经常需要达成以下目的:

将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果)

等待 Future 集合中的所有任务都完成。

仅等待 Future 集合中最快结束的任务完成,并返回它的结果。

CompletableFuture的作用

CompletableFuture是Java8的一个新加的类,它在原来的Future类上,结合Java8的函数式编程,扩展了一系列强大的功能.

CompletableFuture 的方法比较多;

Either

Either 表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行。

方法名

描述

acceptEither(CompletionStage<?   extends T> other, Consumer<? super T> action)

当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。

acceptEitherAsync(CompletionStage<?   extends T> other, Consumer<? super T> action)

当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用ForkJoinPool

acceptEitherAsync(CompletionStage<?   extends T> other, Consumer<? super T> action, Executor executor)

当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用指定的线程池


例子:

public static void main(String[] args) {

    Random random = new Random();

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {

        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "from future1";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {

        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "from future2";
    });

    CompletableFuture<Void> future = future1.acceptEither(future2,
        str -> System.out.println("The future is " + str));

    try {
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

}

执行结果:The future is from future1 或者 The future is from future2。
因为future1和future2,执行的顺序是随机的。

applyToEither 方法:

applyToEither 跟 acceptEither 类似。

方法名

描述

applyToEither(CompletionStage<?   extends T> other, Function<? super T,U> fn)

当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。

applyToEitherAsync(CompletionStage<?   extends T> other, Function<? super T,U> fn)

当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。使用ForkJoinPool

applyToEitherAsync(CompletionStage<?   extends T> other, Function<? super T,U> fn, Executor executor)

当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。使用指定的线程池

例子省略。。。。。

allOf 方法

方法名

描述

allOf(CompletableFuture<?>…   cfs)

在所有Future对象完成后结束,并返回一个future。

 

allOf()方法所返回的CompletableFuture,并不能组合前面多个CompletableFuture的计算结果。于是我们借助Java 8的Stream来组合多个future的结果。

 

CompletableFuture<String>   future1 = CompletableFuture.supplyAsync(() -> "tony");

 

        CompletableFuture<String>   future2 = CompletableFuture.supplyAsync(() -> "cafei");

 

        CompletableFuture<String>   future3 = CompletableFuture.supplyAsync(() -> "aaron");

 

        CompletableFuture.allOf(future1,   future2, future3)

                .thenApply(v ->

                Stream.of(future1, future2,   future3)

                        .map(CompletableFuture::join)

                        .collect(Collectors.joining("   ")))

                .thenAccept(System.out::print);

 

执行结果:

 tony cafei aaron

3.7.2 anyOf

方法名

描述

anyOf(CompletableFuture<?>…   cfs)

在任何一个Future对象结束后结束,并返回一个future。

Random rand = new Random();
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future2";
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "from future3";
        });
 
        CompletableFuture<Object> future =  CompletableFuture.anyOf(future1,future2,future3);
 
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

使用anyOf()时,只要某一个future完成,就结束了。所以执行结果可能是"from future1"、"from future2"、"from future3"中的任意一个。

anyOf 和 acceptEither、applyToEither的区别在于,后两者只能使用在两个future中,而anyOf可以使用在多个future中。

 

异步方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
 
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
 
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
 
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

supplyAsync 返回Future有结果,而runAsync返回CompletableFuture<Void>。

Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池。

CompletableFuture<Void>   cf = CompletableFuture.runAsync(() -> {
      System.out.println(Thread.currentThread().isDaemon());
      try {
          TimeUnit.SECONDS.sleep(3);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  });
 
  System.out.println("done=" + cf.isDone());
  TimeUnit.SECONDS.sleep(4);
  System.out.println("done=" + cf.isDone());

 

在这段代码中,runAsync 是异步执行的 ,通过 Thread.currentThread().isDaemon() 打印的结果就可以知道是Daemon线程异步执行的。

 

同步执行方法:

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public CompletableFuture<Void> thenRun(Runnable action);
 
 

CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApply(s -> {
    System.out.println(Thread.currentThread().isDaemon() + "_"+ s);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return s.toUpperCase();
});

System.out.println("done=" + cf.isDone());
TimeUnit.SECONDS.sleep(4);
System.out.println("done=" + cf.isDone());

// 一直等待成功,然后返回结果
System.out.println(cf.join());

thenApply

 

当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。thenApplyAsync默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

thenApply相当于回调函数(callback)

 image.png

image.png 

thenAccept与thenRun

image.png

 

可以看到,thenAccept和thenRun都是无返回值的。如果说thenApply是不停的输入输出的进行生产,那么thenAccept和thenRun就是在进行消耗。它们是整个计算的最后两个阶段。

同样是执行指定的动作,同样是消耗,二者也有区别:

thenAccept接收上一阶段的输出作为本阶段的输入,同步执行;  

thenRun根本不关心前一阶段的输出,根本不不关心前一阶段的计算结果,因为它不需要输入参数

 

CompletableFuture异常处理


exceptionally

方法名

描述

exceptionally(Function<Throwable,?   extends T> fn)

只有当CompletableFuture抛出异常的时候,才会触发这个exceptionally的计算,调用function计算值。

        CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });

执行结果:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException

对上面的代码稍微做了一下修改,修复了空指针的异常。

      CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
//                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .exceptionally(t -> {
                    System.out.println("Unexpected error:" + t);
                    return null;
                });

执行结果:

11

whenComplete

whenComplete 在上一篇文章其实已经介绍过了,在这里跟exceptionally的作用差不多,可以捕获任意阶段的异常。如果没有异常的话,就执行action。

 CompletableFuture.supplyAsync(() -> "hello world")
                .thenApply(s -> {
                    s = null;
                    int length = s.length();
                    return length;
                }).thenAccept(i -> System.out.println(i))
                .whenComplete((result, throwable) -> {
 
                    if (throwable != null) {
                       System.out.println("Unexpected error:"+throwable);
                    } else {
                        System.out.println(result);
                    }
 
                });

执行结果:

Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException

跟whenComplete相似的方法是handle,handle的用法在上一篇文章中也已经介绍过。

 

方法名

描述

thenCompose(Function<?   super T, ? extends CompletionStage<U>> fn)

在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。

thenComposeAsync(Function<?   super T, ? extends CompletionStage<U>> fn)

在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用ForkJoinPool。

thenComposeAsync(Function<?   super T, ? extends CompletionStage<U>> fn,Executor executor)

在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用指定的线程池。

thenCompose可以用于组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
 
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

执行结果:

Hello World

下面的例子展示了多次调用thenCompose()

 CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "100"))
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> Double.parseDouble(s)));
 
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

执行结果:

100100.0

 

组合

方法名

描述

thenCombine(CompletionStage<?   extends U> other, BiFunction<? super T,? super U,? extends V> fn)

当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。

thenCombineAsync(CompletionStage<?   extends U> other, BiFunction<? super T,? super U,? extends V> fn)

当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。

thenCombineAsync(CompletionStage<?   extends U> other, BiFunction<? super T,? super U,? extends V> fn,   Executor executor)

当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。

现在有CompletableFuture<T>、CompletableFuture<U>和一个函数(T,U)->V,thenCompose就是将CompletableFuture<T>和CompletableFuture<U>变为CompletableFuture<V>。

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
 
        CompletableFuture<Double> future = future1.thenCombine(future2, (s, i) -> Double.parseDouble(s + i));
 
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

执行结果:

100100.0

使用thenCombine()之后future1、future2之间是并行执行的,最后再将结果汇总。这一点跟thenCompose()不同。

thenAcceptBoth跟thenCombine类似,但是返回CompletableFuture<Void>类型。

方法名

描述

thenAcceptBoth(CompletionStage<?   extends U> other, BiConsumer<? super T,? super U> action)

当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。

thenAcceptBothAsync(CompletionStage<?   extends U> other, BiConsumer<? super T,? super U> action)

当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。

thenAcceptBothAsync(CompletionStage<?   extends U> other, BiConsumer<? super T,? super U> action, Executor   executor)

当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。

 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
 
        CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));
 
        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

执行结果:

100100.0

3.5 计算结果完成时的处理

当CompletableFuture完成计算结果后,我们可能需要对结果进行一些处理。

3.5.1 执行特定的Action

方法名

描述

whenComplete(BiConsumer<?   super T,? super Throwable> action)

当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。

whenCompleteAsync(BiConsumer<?   super T,? super Throwable> action)

当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用ForkJoinPool。

whenCompleteAsync(BiConsumer<?   super T,? super Throwable> action, Executor executor)

当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用指定的线程池。

        CompletableFuture.supplyAsync(() -> "Hello")

                .thenApply(s->s+" World")

                .thenApply(s->s+ "\nThis is CompletableFuture demo")

                .thenApply(String::toLowerCase)

                .whenComplete((result, throwable) -> System.out.println(result));

执行结果:

hello world

this is completablefuture demo

3.5.2 执行完Action可以做转换

方法名

描述

handle(BiFunction<?   super T, Throwable, ? extends U> fn)

当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn

handleAsync(BiFunction<?   super T, Throwable, ? extends U> fn)

当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用ForkJoinPool。

handleAsync(BiFunction<?   super T, Throwable, ? extends U> fn, Executor executor)

当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用指定的线程池。

        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")

                .thenApply(s->s+"100")

                .handle((s, t) -> s != null ? Double.parseDouble(s) : 0);

 

        try {

            System.out.println(future.get());

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

        }

执行结果:

100100.0

在这里,handle()的参数是BiFunction,apply()方法返回R,相当于转换的操作。

@FunctionalInterface

public interface BiFunction<T, U, R> {

 

    /**

     * Applies this function to the given arguments.

     *

     * @param t the first function argument

     * @param u the second function argument

     * @return the function result

     */

    R apply(T t, U u);

 

    /**

     * Returns a composed function that first applies this function to

     * its input, and then applies the {@code after} function to the result.

     * If evaluation of either function throws an exception, it is relayed to

     * the caller of the composed function.

     *

     * @param <V> the type of output of the {@code after} function, and of the

     *           composed function

     * @param after the function to apply after this function is applied

     * @return a composed function that first applies this function and then

     * applies the {@code after} function

     * @throws NullPointerException if after is null

     */

    default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {

        Objects.requireNonNull(after);

        return (T t, U u) -> after.apply(apply(t, u));

    }

}

而whenComplete()的参数是BiConsumer,accept()方法返回void。

@FunctionalInterface

public interface BiConsumer<T, U> {

 

    /**

     * Performs this operation on the given arguments.

     *

     * @param t the first input argument

     * @param u the second input argument

     */

    void accept(T t, U u);

 

    /**

     * Returns a composed {@code BiConsumer} that performs, in sequence, this

     * operation followed by the {@code after} operation. If performing either

     * operation throws an exception, it is relayed to the caller of the

     * composed operation.  If performing this operation throws an exception,

     * the {@code after} operation will not be performed.

     *

     * @param after the operation to perform after this operation

     * @return a composed {@code BiConsumer} that performs in sequence this

     * operation followed by the {@code after} operation

     * @throws NullPointerException if {@code after} is null

     */

    default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {

        Objects.requireNonNull(after);

 

        return (l, r) -> {

            accept(l, r);

            after.accept(l, r);

        };

    }

}

所以,handle()相当于whenComplete()+转换。

3.5.3 纯消费(执行Action)

方法名

描述

thenAccept(Consumer<?   super T> action)

当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值

thenAcceptAsync(Consumer<?   super T> action)

当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值,使用ForkJoinPool。

thenAcceptAsync(Consumer<?   super T> action, Executor executor)

当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值

thenAccept()是只会对计算结果进行消费而不会返回任何结果的方法。

        CompletableFuture.supplyAsync(() -> "Hello")

                .thenApply(s->s+" World")

                .thenApply(s->s+ "\nThis is CompletableFuture demo")

                .thenApply(String::toLowerCase)

                .thenAccept(System.out::print);

执行结果:

hello world

this is completablefuture demo

 

 

https://www.imooc.com/article/21654

基于千万数据的在线表结构修改

   在系统正常运作一定时间后,随着市场、产品汪的需求不断变更,比较大的一些表结构面临不得不增加字段的方式来扩充满足业务需求;并且是不能停止服务器。在 MySQL 在体量上了千万、数据的时候,Alter Table 的操作,可以让你等一天,而且在高峰期执行这种 SQL ,会造成表被锁,增加字段的操作需要很长的时间 ,让你的数据库也承担着压力。 

方案: pt-online-schema-change 工具

       https://segmentfault.com/a/1190000014924677

方案二:土方法 rename;

1.       创建目的表结构的空表,A_new (新加的字段也有了);

2.       在A表上创建触发器,包括增、删、改触发器;这样在copy的过程中,有其他的操作的话可以同步进行处理;

3.       通过insert…select…limit N 语句分片拷贝数据到目的表 (这个可以通过一个定时器Job)去做这些事情。

4.       Copy完成后,将A_new表rename到A表。

触发器例子:

增加触发触发器;

DELIMITER   $$

 

USE   `demo_1`$$

 

DROP   TRIGGER /*!50032 IF EXISTS */ `user_insert`$$

 

CREATE

    /*!50017 DEFINER = 'root'@'localhost' */

    TRIGGER `user_insert` AFTER INSERT ON   `t_student_0`

    FOR EACH ROW

   BEGIN

          — 插入目标表

               INSERT INTO

                 `demo_1`.`t_student_1`

               VALUES

                 (new.id, new.student_id,new.name,new.age);

   END;

$$

DELIMITER ;

 

 

修改触发触发器:

DELIMITER $$

 

USE `demo_1`$$

 

DROP TRIGGER /*!50032 IF EXISTS */   `user_update`$$

 

CREATE

      /*!50017 DEFINER = 'root'@'localhost' */

      TRIGGER `user_update` AFTER UPDATE ON `t_student_0`

    FOR   EACH ROW

   BEGIN

       UPDATE `demo_1`.`t_student_1` SET   student_id=new.student_id,NAME=new.name, age=new.age WHERE id=old.id;

   END;

$$

 

DELIMITER ;

 

删除触发触发器:

 

DELIMITER   $$

 

USE   `demo_1`$$

 

DROP   TRIGGER /*!50032 IF EXISTS */ `user_delete`$$

 

CREATE

    /*!50017 DEFINER = 'root'@'localhost' */

    TRIGGER `user_delete` AFTER DELETE ON   `t_student_0`

    FOR EACH ROW

   BEGIN

     DELETE FROM `demo_1`.`t_student_1` WHERE   id=old.id;

   END;

$$

 

DELIMITER   ;

 

基于binlog 增量的数据解析服务

什么是binlog

binlog是mysql的一种二进制日志文件,用来记录数据的变化。mysql使用binlog进行主从复制,如图:

 image.png

客户端向master的mysql sever写入数据

  1. 当数据发生变化时,master将变更的数据记录写入到二进制文件中,即binlog。

  2. slave订阅了master的binlog,所以会通过一个I/O THREAD与master的DUMP THREAD进行通信,同步binlog

  3. I/O      THREAD读取到binlog后会吸入到relay log中,准备重放。

  4. slave会通过SQL THREAD读取relay log,重放数据的改动并执行相应的改动。

这里有几点需要注意:

  1. 主从复制不是强一致性,只能保证最终一致

  2. master配合binlog复制会影响性能,所以尽量不要在master上挂太多的slave,如果对时间要求不高,可以在slave上挂slave

2.binlog的业务应用

  上面介绍了mysql中应用binlog的场景,而我们的业务可以伪装成master的slave节点,感知数据的变化,这就给了我们很多的业务运用空间。

2.1 数据异构

 经常有这样一个场景:

  原来业务是一个很单一的系统,所以表也在一起。随着业务的发展,系统开始拆分,总有一些表是各个业务都关注的表,但是对相关的字段的运用场景不同,所以这样一份元数据怎样更好的为各个系统服务就成了问题。当然,多写或者读写分离可以从物理节点上减少对数据服务器的压力,但是对业务并没有做到足够的支持,因为这些表都是一样的。因此我们可以通过binlog进行数据异构。

image.png

如图所示,订单系统生成订单后,通过binlog可以解析生成用户维度的订单信息供用户中心查询、商户维度订单表供运营管理,以及搜索系统的搜索数据,提供全文搜索功能。

这样,我们就通过原始的订单数据异构到三个系统中,提供了丰富的数据访问功能。不仅从节点上降低了数据服务器的压力,数据表现形式也更贴近自己的服务,减少不必要的字段冗余。

2.2 缓存数据的补充

对于高并发的系统,数据库往往是系统性能的瓶颈,毕竟IO响应速度是远远小于电子的运算速度的。因此,很多查询类服务都会在CPU与数据库之间加上一层缓存。即现从缓存获取,命中后直接返回,否则从DB中获取并存入缓存后返回。而如果原始数据变化了但缓存尚未超时,则缓存中的数据就是过时的数据了。当数据有变更的时候主动修改缓存数据。

image.png

当客户端更改了数据之后,中间件系统通过binlog获得数据变更,并同步到缓存中。这样就保证了缓存中数据有效性,减少了对数据库的调用,从而提高整体性能。

2.3 基于数据的任务分发

有这样一个场景:

  很多系统依赖同一块重要数据,当这些数据发生变化的时候,需要调用其他相关系统的通知接口同步数据变化,或者mq消息告知变化并等待其主动同步。这两种情况都对原始系统造成了侵入,原始系统改一块数据,并不想做这么多其他的事情。所以这时候可以通过binlog进行任务分发。

image.png

当原始业务系统修改数据后,不需要进行其他的业务关联。由调度系统读取binlog进行相应的任务分发、消息发送以及同步其他业务状态。这样可以将其他业务与原始业务系统解耦,并从数据的角度将所有管理功能放在了同一个调度系统中,责任清晰。

2.4 可以用于数据平滑迁移:

   https://www.w3cschool.cn/architectroad/architectroad-data-smooth-migration.html

2.5 数据抽取

https://blog.csdn.net/u010670689/article/details/81066807

mysql bin-log相关比较好的开源项目

https://blog.csdn.net/everlasting_188/article/details/53304530

 

binlog的业务实践:

1.  首先需要开启mysql bin_log功能;

启动成功之后,我们可以登陆查看我们的配置是否起作用

show variables like '%log_bin%'

本次解析只考虑insert、update、delete三种事件类型

 

采用OpenReplicator解析MySQL binlog

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。

Open Replicator项目地址:https://github.com/whitesock/open-replicator

 

binlog事件分析结构图

image.png

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{

  "eventId": 1,

  "databaseName": "canal_test",

  "tableName": "`company`",

  "eventType": 2,

  "timestamp": 1477033198000,

  "timestampReceipt": 1477033248780,

  "binlogName": "mysql-bin.000006",

  "position": 353,

  "nextPostion": 468,

  "serverId": 2,

  "before": null,

  "after": null,

  "isDdl": true,

  "sql": "DROP TABLE `company` /* generated by server */"

}

 

DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

 

{

  "eventId": 0,

  "databaseName": "canal_test",

  "tableName": "person",

  "eventType": 24,

  "timestamp": 1477030734000,

  "timestampReceipt": 1477032161988,

  "binlogName": "mysql-bin.000006",

  "position": 242,

  "nextPostion": 326,

  "serverId": 2,

  "before": {

    "id": "3",

    "sex": "f",

    "address": "shanghai",

    "age": "23",

    "name": "zzh3"

  },

  "after": {

    "id": "3",

    "sex": "m",

    "address": "shanghai",

    "age": "23",

    "name": "zzh3"

  },

  "isDdl": false,

  "sql": null

}

 

相关的类文件如下: 

OpenReplicatorTest:
public class OpenReplicatorTest {

   
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
   
private static final String host = "192.168.56.101";
   
private static final int port = 3306;
   
private static final String user = "root";
   
private static final String password = "123456";


   
public static void main(String[] args) throws IOException {


        OpenReplicator or =
new OpenReplicator ();
        or.setUser(
user);
        or.setPassword(
password);
        or.setHost(
host);
        or.setPort(
port);
        MysqlConnection.setConnection(
host, port, user, password);

       
//      or.setServerId(MysqlConnection.getServerId());
        //
配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId

       
BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
        or.setBinlogFileName(bms.getBinlogName());
        or.setBinlogPosition(
4);
        or.setBinlogEventListener(
new NotificationListener());
       
try {
            or.start();
        }
catch (Exception e) {
           
logger.error(e.getMessage(),e);
        }
        Thread thread =
new Thread(new PrintLogEvent());
        thread.start();
    }

   
public static class PrintLogEvent implements Runnable{
       
@Override
       
public void run() {
           
while(true){
               
if(CDCEventManager.queue.isEmpty() == false)
                {
                    LogEvent ce = CDCEventManager.
queue.pollFirst();
                    String prettyStr1 = JSON.toJSONString(ce);
                    System.
out.println(prettyStr1);
                }
               
else{
                   
try {
                        TimeUnit.
SECONDS.sleep(1);
                    }
catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }


}
 
 
LogEvent:
public class LogEvent implements Serializable {

   
/**
     *
只针对delete、insert、update事件
     */
   
private static final long serialVersionUID = 5503152746318421290L;

   
private long eventId = 0;//事件唯一标识
   
private String databaseName = null;
   
private String tableName = null;
   
private int eventType = 0;//事件类型
   
private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
   
private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
   
private long position = 0;
   
private long nextPostion = 0;
   
private long serverId = 0;
   
private Map<String, String> before = null;
   
private Map<String, String> after = null;
   
private Boolean isDdl = null;
   
private String sql = null;

   
private static AtomicLong uuid = new AtomicLong(0);

   
public LogEvent() {}

   
public LogEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName) {
       
this.init(are);
       
this.databaseName = databaseName;
       
this.tableName = tableName;
    }

   
private void init(final BinlogEventV4 be) {
       
this.eventId = uuid.getAndAdd(1);
        BinlogEventV4Header header = be.getHeader();
       
this.timestamp = header.getTimestamp();
       
this.eventType = header.getEventType();
       
this.serverId = header.getServerId();
       
this.timestampReceipt = header.getTimestampOfReceipt();
       
this.position = header.getPosition();
       
this.nextPostion = header.getNextPosition();
    }

   
@Override
   
public String toString() {
        StringBuilder builder =
new StringBuilder();
        builder.append(
"{ eventId:").append(eventId);
        builder.append(
",databaseName:").append(databaseName);
        builder.append(
",tableName:").append(tableName);
        builder.append(
",eventType:").append(eventType);
        builder.append(
",timestamp:").append(timestamp);
        builder.append(
",timestampReceipt:").append(timestampReceipt);
        builder.append(
",position:").append(position);
        builder.append(
",nextPostion:").append(nextPostion);
        builder.append(
",serverId:").append(serverId);
        builder.append(
",isDdl:").append(isDdl);
        builder.append(
",sql:").append(sql);
        builder.append(
",before:").append(before);
        builder.append(
",after:").append(after).append("}");

       
return builder.toString();
    }

   
public long getEventId() {
       
return eventId;
    }

    
public void setEventId(long eventId) {
       
this.eventId = eventId;
    }

   
public String getDatabaseName() {
       
return databaseName;
    }

   
public void setDatabaseName(String databaseName) {
       
this.databaseName = databaseName;
    }

   
public String getTableName() {
       
return tableName;
    }

   
public void setTableName(String tableName) {
       
this.tableName = tableName;
    }

   
public int getEventType() {
       
return eventType;
    }

   
public void setEventType(int eventType) {
       
this.eventType = eventType;
    }

   
public long getTimestamp() {
       
return timestamp;
    }

   
public void setTimestamp(long timestamp) {
       
this.timestamp = timestamp;
    }

   
public long getTimestampReceipt() {
       
return timestampReceipt;
    }

   
public void setTimestampReceipt(long timestampReceipt) {
       
this.timestampReceipt = timestampReceipt;
    }

   
public long getPosition() {
       
return position;
    }

   
public void setPosition(long position) {
       
this.position = position;
    }

   
public long getNextPostion() {
       
return nextPostion;
    }

   
public void setNextPostion(long nextPostion) {
       
this.nextPostion = nextPostion;
    }

   
public long getServerId() {
        
return serverId;
    }

   
public void setServerId(long serverId) {
       
this.serverId = serverId;
    }

   
public Map<String, String> getBefore() {
       
return before;
    }

   
public void setBefore(Map<String, String> before) {
       
this.before = before;
    }

   
public Map<String, String> getAfter() {
       
return after;
    }

   
public void setAfter(Map<String, String> after) {
       
this.after = after;
    }

   
public Boolean getDdl() {
       
return isDdl;
    }

   
public void setDdl(Boolean ddl) {
       
isDdl = ddl;
    }

   
public String getSql() {
       
return sql;
    }

   
public void setSql(String sql) {
       
this.sql = sql;
    }


}

public class CDCEventManager {

   
public static final ConcurrentLinkedDeque<LogEvent> queue = new ConcurrentLinkedDeque<LogEvent>();

}

MysqlConnection:
public class MysqlConnection {

   
private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

   
private static Connection conn;

   
private static String host;
   
private static int port;
   
private static String user;
   
private static String password;

   
public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg) {
       
try {
           
if (conn == null || conn.isClosed()) {
                Class.forName(
"com.mysql.jdbc.Driver");

               
host = hostArg;
               
port = portArg;
               
user = userArg;
               
password = passwordArg;

               
conn = DriverManager.getConnection("jdbc:mysql://" + host + ":" + port + "/", user, password);
               
logger.info("connected to mysql:{} : {}", user, password);
            }
        }
catch (ClassNotFoundException e) {
           
logger.error(e.getMessage(), e);
        }
catch (SQLException e) {
           
logger.error(e.getMessage(), e);
        }
    }

   
public static Connection getConnection() {
       
try {
           
if (conn == null || conn.isClosed()) {
                setConnection(
host, port, user, password);
            }
        }
catch (SQLException e) {
           
logger.error(e.getMessage(), e);
        }
       
return conn;
    }

   
/**
     *
获取Column信息
     *
     * @return
    
*/
   
public static Map<String, List<ColumnInfo>> getColumns() {
        Map<String, List<ColumnInfo>> cols =
new HashMap<>();
        Connection conn = getConnection();

       
try {
            DatabaseMetaData metaData = conn.getMetaData();
            ResultSet r = metaData.getCatalogs();
            String tableType[] = {
"TABLE"};
           
while (r.next()) {
                String databaseName = r.getString(
"TABLE_CAT");
                ResultSet result = metaData.getTables(databaseName,
null, null, tableType);
               
while (result.next()) {
                    String tableName = result.getString(
"TABLE_NAME");
                   
//                  System.out.println(result.getInt("TABLE_ID"));
                   
String key = databaseName + "." + tableName;
                    ResultSet colSet = metaData.getColumns(databaseName,
null, tableName, null);
                    cols.put(key,
new ArrayList<ColumnInfo>());
                   
while (colSet.next()) {
                        ColumnInfo columnInfo =
new ColumnInfo(colSet.getString("COLUMN_NAME"),
                            colSet.getString(
"TYPE_NAME"));
                        cols.get(key).add(columnInfo);
                    }

                }
            }
        }
catch (SQLException e) {
           
logger.error(e.getMessage(), e);
        }
       
return cols;
    }

   
/**
     *
参考 mysql> show binary logs +------------------+-----------+ | Log_name         | File_size |
     * +------------------+-----------+ | mysql-bin.000001 |       126 | | mysql-bin.000002 |       126 | |
     * mysql-bin.000003 |      6819 | | mysql-bin.000004 |      1868 | +------------------+-----------+
     */
   
public static List<BinlogInfo> getBinlogInfo() {
        List<BinlogInfo> binlogList =
new ArrayList<>();

        Connection conn =
null;
        Statement statement =
null;
        ResultSet resultSet =
null;

       
try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery(
"show binary logs");
            
while (resultSet.next()) {
                BinlogInfo binlogInfo =
new BinlogInfo(resultSet.getString("Log_name"), resultSet.getLong("File_size"));
                binlogList.add(binlogInfo);
            }
        }
catch (Exception e) {
           
logger.error(e.getMessage(), e);
        }
finally {
           
try {
               
if (resultSet != null) { resultSet.close(); }
               
if (statement != null) { statement.close(); }
               
if (conn != null) { conn.close(); }
            }
catch (SQLException e) {
               
logger.error(e.getMessage(), e);
            }
        }

       
return binlogList;
    }

   
/**
     *
参考: mysql> show master status; +------------------+----------+--------------+------------------+ | File |
     * Position | Binlog_Do_DB | Binlog_Ignore_DB | +------------------+----------+--------------+------------------+ |
     * mysql-bin.000004 |     1868 |              |                  |
     * +------------------+----------+--------------+------------------+
     *
     * @return
    
*/
   
public static BinlogMasterStatus getBinlogMasterStatus() {
        BinlogMasterStatus binlogMasterStatus =
new BinlogMasterStatus();
        Connection conn =
null;
        Statement statement =
null;
        ResultSet resultSet =
null;

       
try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery(
"show master status");
           
while (resultSet.next()) {
                binlogMasterStatus.setBinlogName(resultSet.getString(
"File"));
                binlogMasterStatus.setPosition(resultSet.getLong(
"Position"));
            }
        }
catch (Exception e) {
           
logger.error(e.getMessage(), e);
        }
finally {
            
try {
               
if (resultSet != null) { resultSet.close(); }
               
if (statement != null) { statement.close(); }
               
if (conn != null) { conn.close(); }
            }
catch (SQLException e) {
               
logger.error(e.getMessage(), e);
            }
        }

       
return binlogMasterStatus;
    }

   
/**
     *
获取open-replicator所连接的mysql服务器的serverid信息
     *
     * @return
    
*/
   
public static int getServerId() {
       
int serverId = 6789;
        Connection conn =
null;
        Statement statement =
null;
        ResultSet resultSet =
null;
       
try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery(
"show variables like 'server_id'");
           
while (resultSet.next()) {
                serverId = resultSet.getInt(
"Value");
            }
        }
catch (Exception e) {
           
logger.error(e.getMessage(), e);
        }
finally {
           
try {
                
if (resultSet != null) { resultSet.close(); }
               
if (statement != null) { statement.close(); }
               
if (conn != null) { conn.close(); }
            }
catch (SQLException e) {
               
logger.error(e.getMessage(), e);
            }
        }
       
return serverId;
    }

}
 
 
NotificationListener:
public class NotificationListener  implements BinlogEventListener {

   
private static Logger logger = LoggerFactory.getLogger(NotificationListener.class);

   
@Override
   
public void onEvents(BinlogEventV4 event) {
       
if (event == null) {
           
logger.error("binlog event is null");
           
return;
        }
       
int eventType = event.getHeader().getEventType();
        System.
out.println("eventType---->"+ MySqlEventTypeIdToString.getInstance().get(eventType));
       
switch (eventType) {
           
case MySQLConstants.FORMAT_DESCRIPTION_EVENT: {
               
logger.trace("FORMAT_DESCRIPTION_EVENT");
               
break;
            }
           
case MySQLConstants.TABLE_MAP_EVENT:
               
//每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId
           
{
                TableMapEvent tme = (TableMapEvent)event;
                TableInfoKeeper.saveTableIdMap(tme);
               
logger.trace("TABLE_MAP_EVENT:tableId:{}", tme.getTableId());
               
break;
            }
           
case MySQLConstants.DELETE_ROWS_EVENT: {
                DeleteRowsEvent dre = (DeleteRowsEvent)event;
                
long tableId = dre.getTableId();
               
logger.trace("DELETE_ROW_EVENT:tableId:{}", tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Row> rows = dre.getRows();
               
for (Row row : rows) {
                    List<Column> before = row.getColumns();
                    Map<String, String> beforeMap = getMap(before, databaseName, tableName);
                   
if (beforeMap != null && beforeMap.size() > 0) {
                        LogEvent cdcEvent =
new LogEvent(dre, databaseName, tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setBefore(beforeMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}", cdcEvent);
                    }
                }
               
break;

            }
           
case MySQLConstants.UPDATE_ROWS_EVENT:
            {
                UpdateRowsEvent upe = (UpdateRowsEvent)event;
               
long tableId = upe.getTableId();
               
logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Pair<Row>> rows = upe.getRows();
               
for(Pair<Row> p:rows){
                    List<Column> colsBefore = p.getBefore().getColumns();
                    List<Column> colsAfter = p.getAfter().getColumns();
                    Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
                    Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
                   
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
                        LogEvent cdcEvent =
new LogEvent(upe,databaseName,tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setBefore(beforeMap);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
               
break;
            }
           
case MySQLConstants.UPDATE_ROWS_EVENT_V2:
            {
                UpdateRowsEventV2 upe = (UpdateRowsEventV2)event;
               
long tableId = upe.getTableId();
               
logger.info("UPDATE_ROWS_EVENT_V2:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Pair<Row>> rows = upe.getRows();
                
for(Pair<Row> p:rows){
                    List<Column> colsBefore = p.getBefore().getColumns();
                    List<Column> colsAfter = p.getAfter().getColumns();
                    Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
                    Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
                   
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
                        LogEvent cdcEvent =
new LogEvent(upe,databaseName,tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setBefore(beforeMap);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
               
break;
            }
           
case MySQLConstants.WRITE_ROWS_EVENT :
                {
                    WriteRowsEventV2 wre = (WriteRowsEventV2)event;
               
long tableId = wre.getTableId();
               
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Row> rows = wre.getRows();
               
for(Row row: rows){
                    List<Column> after = row.getColumns();
                    Map<String,String> afterMap = getMap(after,databaseName,tableName);
                   
if(afterMap!=null && afterMap.size()>0){
                        LogEvent cdcEvent =
new LogEvent(wre,databaseName,tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
               
break;
            }
           
case   MySQLConstants.WRITE_ROWS_EVENT_V2:
            {
                WriteRowsEventV2 wre = (WriteRowsEventV2)event;
               
long tableId = wre.getTableId();
               
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Row> rows = wre.getRows();
               
for(Row row: rows){
                    List<Column> after = row.getColumns();
                    Map<String,String> afterMap = getMap(after,databaseName,tableName);
                   
if(afterMap!=null && afterMap.size()>0){
                        LogEvent cdcEvent =
new LogEvent(wre,databaseName,tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
               
break;
            }
           
case MySQLConstants.QUERY_EVENT:
            {
                QueryEvent qe = (QueryEvent)event;
                TableInfo tableInfo = createTableInfo(qe);
               
if(tableInfo == null){
                   
break;
                }

                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();
               
logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);

                LogEvent cdcEvent =
new LogEvent(qe,databaseName,tableName);
                cdcEvent.setDdl(
true);
                cdcEvent.setSql(qe.getSql().toString());
                CDCEventManager.
queue.addLast(cdcEvent);
               
logger.info("cdcEvent:{}",cdcEvent);
               
break;
            }
           
case MySQLConstants.XID_EVENT:{
                XidEvent xe = (XidEvent)event;
               
logger.trace("XID_EVENT: xid:{}",xe.getXid());
               
break;
            }
           
default:
            {
               
logger.trace("DEFAULT:{}",eventType);
               
break;
            }

        }
    }


   
/**
     * ROW_EVENT
中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
     * 然后跟取回的List<Column>进行映射。
     *
     * @param
cols
    
* @param databaseName
    
* @param tableName
    
* @return
    
*/
   
private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
        Map<String,String> map =
new HashMap<>();
       
if(cols == null || cols.size()==0){
           
return null;
        }
        String fullName = databaseName+
"."+tableName;
        List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
       
if(columnInfoList == null)
           
return null;
       
if(columnInfoList.size() != cols.size()){
            TableInfoKeeper.refreshColumnsMap();
           
if(columnInfoList.size() != cols.size())
            {
               
logger.warn("columnInfoList.size is not equal to cols.");
               
return null;
            }
        }

       
for(int i=0;i<columnInfoList.size(); i++){
           
if(cols.get(i).getValue()==null)
                map.put(columnInfoList.get(i).getName(),
"");
           
else
               
map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
        }

       
return map;
    }

   
/**
     *
从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
     * 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
     *
     * @param
qe
    
* @return
    
*/
   
private TableInfo createTableInfo(QueryEvent qe){
        String sql = qe.getSql().toString().toLowerCase();
       
if("begin".equals(sql)){
           
return null;
        }
        TableInfo ti =
new TableInfo();
        String databaseName = qe.getDatabaseName().toString();
        String tableName =
null;
       
if(checkFlag(sql,"table")){
            tableName = getTableName(sql,
"table");
        }
else if(checkFlag(sql,"truncate")){
            tableName = getTableName(sql,
"truncate");
        }
else{
            
logger.warn("can not find table name from sql:{}",sql);
           
return null;
        }
        ti.setDatabaseName(databaseName);
        ti.setTableName(tableName);
        ti.setFullName(databaseName+
"."+tableName);

       
return ti;
    }

   
private boolean checkFlag(String sql, String flag){
        String[] ss = sql.split(
" ");
       
for(String s:ss){
           
if(s.equals(flag)){
               
return true;
            }
        }
       
return false;
    }

   
private String getTableName(String sql, String flag){
        String[] ss = sql.split(
"\\.");
        String tName =
null;
       
if (ss.length > 1) {
            String[] strs = ss[
1].split(" ");
            tName = strs[
0];
        }
else {
            String[] strs = sql.split(
" ");
           
boolean start = false;
           
for (String s : strs) {
               
if (s.indexOf(flag) >= 0) {
                    start =
true;
                   
continue;
                }
               
if (start && !s.isEmpty()) {
                    tName = s;
                   
break;
                }
            }
        }
        tName.replaceAll(
"`", "").replaceAll(";", "");

       
//del "("[create table person(....]
       
int index = tName.indexOf('(');
       
if(index>0){
            tName = tName.substring(
0, index);
        }

       
return tName;
    }

}
 
 
TableInfo:
public class TableInfo {

   
private String databaseName;
   
private String tableName;
   
private String fullName;
   
// 省略Getter和Setter

   
public String getDatabaseName() {
       
return databaseName;
    }

   
public void setDatabaseName(String databaseName) {
       
this.databaseName = databaseName;
    }

   
public String getTableName() {
       
return tableName;
    }

   
public void setTableName(String tableName) {
       
this.tableName = tableName;
    }

   
public String getFullName() {
       
return fullName;
    }

   
public void setFullName(String fullName) {
       
this.fullName = fullName;
    }

   
@Override
   
public boolean equals(Object o) {
       
if (this == o) { return true; }
       
if (o == null || getClass() != o.getClass()) { return false; }
        TableInfo tableInfo = (TableInfo)o;
       
return Objects.equals(databaseName, tableInfo.databaseName) &&
            Objects.equals(
tableName, tableInfo.tableName) &&
            Objects.equals(
fullName, tableInfo.fullName);
    }

   
@Override
   
public int hashCode() {
       
return Objects.hash(databaseName, tableName, fullName);
    }
}
 
 
TableInfoKeeper:
public class TableInfoKeeper {

   
private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);

   
private static Map<Long, TableInfo> tabledIdMap = new ConcurrentHashMap<>();
   
private static Map<String, List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();

   
static {
       
columnsMap = MysqlConnection.getColumns();
    }

   
public static void saveTableIdMap(TableMapEvent tme) {
       
long tableId = tme.getTableId();
       
tabledIdMap.remove(tableId);

        TableInfo table =
new TableInfo();
        table.setDatabaseName(tme.getDatabaseName().toString());
        table.setTableName(tme.getTableName().toString());
        table.setFullName(tme.getDatabaseName() +
"." + tme.getTableName());

        
tabledIdMap.put(tableId, table);
    }

   
public static synchronized void refreshColumnsMap() {
        Map<String, List<ColumnInfo>> map = MysqlConnection.getColumns();
       
if (map.size() > 0) {
           
//          logger.warn("refresh and clear cols.");
           
columnsMap = map;
           
//          logger.warn("refresh and switch cols:{}",map);
       
} else {
           
logger.error("refresh columnsMap error.");
        }
    }

   
public static TableInfo getTableInfo(long tableId) {
       
return tabledIdMap.get(tableId);
    }

   
public static List<ColumnInfo> getColumns(String fullName) {
       
return columnsMap.get(fullName);
    }

}

MySQL主从配置-Docker

MySQL单机启动mysql
docker run --name mysqlserver -e MYSQL_ROOT_PASSWORD=123456 -d -i -p 3306:3306  mysql:5.7

进入终端:

docker exec -it  2a7a85124400  /bin/bash
mysql -h 127.0.0.1 -u root -p

宿主机目录结构:

image.png

主从配置:

Master和Slaver 配置文件

 

Master: my.cnf   

[mysqld]
 
server_id = 1
 
log-bin= mysql-bin
 
read-only=0
 
binlog-do-db=order_demo

 
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema
 
character-set-server=utf8
 
!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/mysql.conf.d/

 

 Slaver: my.cnf

[mysqld]
 
server_id = 2
 
log-bin= mysql-bin
 
read-only=1
 
replicate-do-db=order_demo
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema
 
character-set-server=utf8
 
 
!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/mysql.conf.d/

说明: log-bin :需要启用二进制日志 server_id : 用于标识不同的数据库服务器,而且唯一

binlog-do-db : 需要记录到二进制日志的数据库 binlog-ignore-db : 忽略记录二进制日志的数据库 auto-increment-offset :该服务器自增列的初始值 auto-increment-increment :该服务器自增列增量

replicate-do-db :指定复制的数据库 replicate-ignore-db :不复制的数据库 relay_log :从库的中继日志,主库日志写到中继日志,中继日志再重做到从库 log-slave-updates :该从库是否写入二进制日志,如果需要成为多主则可启用。只读可以不需要

如果为多主的话注意设置 auto-increment-offset 和 auto-increment-increment 如上面为双主的设置: 服务器 152 自增列显示为:1,3,5,7,……(offset=1,increment=2) 服务器 153 自增列显示为:2,4,6,8,……(offset=2,increment=2)

 

1)read_only=1只读模式,不会影响slave同步复制的功能,所以在MySQL slave库中设定了read_only=1后,通过 show slave status\G ,命令查看salve状态,可以看到salve仍然会读取master上的日志,并且在slave库中应用日志,保证主从数据库同步一致;

  2)read_only=1只读模式,可以限定普通用户进行数据修改的操作,但不会限定具有super权限的用户的数据修改操作;在MySQL中设置read_only=1后,普通的应用用户进行insert、update、delete等会产生数据变化的DML操作时,都会报出数据库处于只读模式不能发生数据变化的错误,但具有super权限的用户,例如在本地或远程通过root用户登录到数据库,还是可以进行数据变化的DML操作;

 

2、启动创建主从容器

//创建并启动主从容器;

 

//master

docker run –name mastermysql -d -p 3307:3306 -e MYSQL_ROOT_PASSWORD=123456 -v /opt/docker/mysql/master/data:/var/lib/mysql -v /opt/docker/mysql/master/conf/my.cnf:/etc/mysql/my.cnf  mysql:5.7

 //slave

docker run –name slavermysql -d -p 3308:3306 -e MYSQL_ROOT_PASSWORD=123456 -v /opt/docker/mysql/slaver/data:/var/lib/mysql -v /opt/docker/mysql/slaver/conf/my.cnf:/etc/mysql/my.cnf  mysql:5.7

这里为了方便查看数据,把Docker的端口都与本机进行了映射,对应的本地master/data文件夹和slaver/data文件夹下也能看到同步的数据库文件

 

 

Master和Slaver设置;

//进入master容器

docker exec -it mastermysql bash

//启动mysql命令,刚在创建窗口时我们把密码设置为:anech

mysql -u root -p

//创建一个用户来同步数据

GRANT REPLICATION SLAVE ON *.* to 'backup'@'%' identified by '123456';

//这里表示创建一个slaver同步账号backup,允许访问的IP地址为%,%表示通配符

//例如:192.168.0.%表示192.168.0.0-192.168.0.255的slaver都可以用backup用户登陆到master上

//查看状态,记住File、Position的值,在Slaver中将用到

show master status;

 

 

slaver容器:

//进入slaver容器

docker exec -it slavermysql bash

//启动mysql命令,刚在创建窗口时我们把密码设置为:anech

mysql -u root -p

 

//设置主库链接

change master to master_host='172.17.0.2',master_user='backup',master_password='123456',master_log_file='mysql-bin.000001',master_log_pos=0,master_port=3306;

//启动从库同步

start slave;

 

//查看状态

show slave status\G;

 image.png

表示配置成功;

说明:

master_host:主库地址

master_user:主库创建的同步账号

master_password:主库创建的同步密码

master_log_file:主库产生的日志

master_log_pos:主库日志记录偏移量

master_port:主库使用的端口,默认为3306

 

测试主从是否成功,是否同步!

在master创建数据内容,看slave 是否同步过去,

create database order_demo;

use order_demo;

create table userinfo(username varchar(50),age int);

insert into userinfo values('Tom',18);

select * from userinfo;

 

 

 

 

java  读写分离操作:

 

https://www.cnblogs.com/fengwenzhee/p/7193218.html?utm_source=itdadao&utm_medium=referral

 


 

https://www.cnblogs.com/xiaoit/p/4599914.html

 

 

log4j2配置文件log4j2.xml详解

配置实例:

<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
 <!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出-->
 <!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
 <configuration status="WARN" monitorInterval="30">
 <!—配置参数 -->    
<Properties>
         <Property name="log_dir">logs</Property>
         <Property name="PATTERN_LAYOUT">%d{yyyy-MM-dd HH:mm:ss} %-5level %class{36} %L %M - %msg%xEx%n</Property>
     </Properties>
 
     <Appenders>
 
         <!-- Console日志: 线上删除console, 把此段日志配置删除即可-->
         <Console name="STDOUT">
<!--输出日志的格式-->
             <PatternLayout pattern="${PATTERN_LAYOUT}"/>
         </Console>
 
<!--文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,这个也挺有用的,适合临时测试用-->
<File name="log" fileName="log/test.log" append="false">
        <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
</File>
 
         <!-- INFO 级别日志 -->
 
          <!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档-->
 <RollingFile name="INFO" fileName="${log_dir}/info/info.log"
                      filePattern="${log_dir}/info/info-%d{yyyyMMdd}-%i.log.gz">
             <PatternLayout pattern="${PATTERN_LAYOUT}"/>
             <Filters>
                 <!--如果是error级别拒绝-->
                 <ThresholdFilter level="error" onMatch="DENY" onMismatch="NEUTRAL"/>
                 <ThresholdFilter level="warn" onMatch="DENY" onMismatch="NEUTRAL"/>
                 <!--如果是 debug\info 输出-->
             </Filters>
             <Policies>
                 <TimeBasedTriggeringPolicy/>
                 <!--单个文件大小-->
                 <SizeBasedTriggeringPolicy size="500MB"/>
             </Policies>
             <!--保存日志文件个数-->
             <DefaultRolloverStrategy max="10"/>
         </RollingFile>
 
         <!--error级别日志输出-->
         <RollingFile name="ERROR" fileName="${log_dir}/info/error.log"
                      filePattern="${log_dir}/info/error-%d{yyyyMMdd}-%i.log.gz">
             <PatternLayout pattern="${PATTERN_LAYOUT}"/>
             <Filters>
                 <!--如果是 error 输出-->
                 <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/>
             </Filters>
             <Policies>
                 <TimeBasedTriggeringPolicy/>
                 <SizeBasedTriggeringPolicy size="500MB"/>
             </Policies>
             <DefaultRolloverStrategy max="10"/>
         </RollingFile>
 
     </Appenders>
 
     <Loggers>
 
         <!-- Console日志: 线上删除console, 把此段日志配置删除即可-->
         <Root level="debug">
             <AppenderRef ref="STDOUT"/>
         </Root>
 
         <AsyncLogger name="jws.event.rec" level="info"   additivity="false">
             <AppenderRef ref="eventRecRolling"/>
         </AsyncLogger>
 
 
     </Loggers>
 </Configuration>

(1).根节点Configuration有两个属性:status和monitorinterval,有两个子节点:Appenders和Loggers(表明可以定义多个Appender和Logger).

       status 属性,这个属性表示log4j2本身的日志信息打印级别。如果把status改为TRACE再执行测试代码,可以看到控制台中打印了一些log4j加载插件、组装logger等调试信息。

       monitorinterval用于指定log4j自动重新配置的监测间隔时间,单位是s,最小是5s.

日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出。对于Loggers中level的定义同样适用。

上面配置了两种日志打印的方式,打印的等级是info。

 

(2).Appenders节点,常见的有三种子节点:Console、RollingFile、File.

 

Console节点用来定义输出到控制台的Appender.

        name:指定Appender的名字.

        target:SYSTEM_OUT 或 SYSTEM_ERR,一般只设置默认:SYSTEM_OUT.

        PatternLayout:输出格式,不设置默认为:%m%n.

 

File节点用来定义输出到指定位置的文件的Appender.

        name:指定Appender的名字.

        fileName:指定输出日志的目的文件带全路径的文件名.

        PatternLayout:输出格式,不设置默认为:%m%n.

 

 RollingFile节点用来定义超过指定大小自动删除旧的创建新的的Appender.

        name:指定Appender的名字.

        fileName:指定输出日志的目的文件带全路径的文件名.

        PatternLayout:输出格式,不设置默认为:%m%n.

        filePattern:指定新建日志文件的名称格式.

        Policies:指定滚动日志的策略,就是什么时候进行新建日志文件输出日志.

        TimeBasedTriggeringPolicy:Policies子节点,基于时间的滚动策略,interval属性用来指定多久滚动一次,默认是1 hour。modulate=true用来调整时间:比如现在是早上3am,interval是4,那么第一次滚动是在4am,接着是8am,12am…而不是7am.

        SizeBasedTriggeringPolicy:Policies子节点,基于指定文件大小的滚动策略,size属性用来定义每个日志文件的大小.

        DefaultRolloverStrategy:用来指定同一个文件夹下最多有几个日志文件时开始删除最旧的,创建新的(通过max属性)。

 

 (3).Loggers节点,常见的有两种:Root和Logger.

       Root节点用来指定项目的根日志,如果没有单独指定Logger,那么就会默认使用该Root日志输出

         level:日志输出级别,共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF.

         AppenderRef:Root的子节点,用来指定该日志输出到哪个Appender.

       Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。

         level:日志输出级别,共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF.

         name:用来指定该Logger所适用的类或者类所在的包全路径,继承自Root节点.

         AppenderRef:Logger的子节点,用来指定该日志输出到哪个Appender,如果没有指定,就会默认继承自Root.如果指定了,那么会在指定的这个Appender和Root的Appender中都会输出,此时我们可以设置Logger的additivity="false"只在自定义的Appender中进行输出。

 

(4).关于日志level.

      共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF.

      All:最低等级的,用于打开所有日志记录.

      Trace:是追踪,就是程序推进以下,你就可以写个trace输出,所以trace应该会特别多,不过没关系,我们可以设置最低日志级别不让他输出.

      Debug:指出细粒度信息事件对调试应用程序是非常有帮助的.

      Info:消息在粗粒度级别上突出强调应用程序的运行过程.

      Warn:输出警告及warn以下级别的日志.

      Error:输出错误信息日志.

      Fatal:输出每个严重的错误事件将会导致应用程序的退出的日志.

      OFF:最高等级的,用于关闭所有日志记录.

      程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少

java 日志框架详解-干货

一、 日志的重要性

对于我们开发人员来说,日志记录往往不被重视。在生产环境中,日志是查找问题来源的重要依据。日志可记录程序运行时产生的错误信息、状态信息、调试信息和执行时间信息等多种多样的信息。可以在程序运行出现错误时,快速地定位潜在的问题源。目前常用的日志系统有java.util.logging、commons logging、slf4j、log4j1.x、logback、log4j2.x 等若干种。

二、 Java常用日志框架历史

 1996年早期,欧洲安全电子市场项目组决定编写它自己的程序跟踪API(Tracing API)。经过不断的完善,这个API终于成为一个十分受欢迎的Java日志软件包,即log4j。后来log4j成为Apache基金会项目中的一员。

 期间log4j近乎成了Java社区的日志标准。据说Apache基金会还曾经建议Sun引入log4j到java的标准库中,但Sun拒绝了。

 2002年Java1.4发布,Sun推出了自己的日志库jul(java util logging),其实现基本模仿了log4j的实现。在JUL出来以前,log4j就已经成为一项成熟的技术,使得log4j在选择上占据了一定的优势。

 接着,Apache推出了jakarta commons logging,jcl只是定义了一套日志接口(其内部也提供一个simple log的简单实现),支持运行时动态加载日志组件的实现,也就是说,在你的应用代码里,只需调用commons logging的接口,底层实现可以是log4j,也可以是java util logging。

 后来(2006年),Ceki Gülcü不适应Apache的工作方式,离开了Apache。然后先后创建了slf4j(日志门面接口,类似于commons logging)和logback(slf4j的实现)两个项目,并回瑞典创建了QOS公司,QOS官网上是这样描述logback的:The Generic,Reliable Fast&Flexible Logging Framework(一个通用,可靠,快速且灵活的日志框架)。

 现今,Java日志领域被划分为两大阵营:commons logging阵营和slf4j阵营。

 commons logging在Apache大树的笼罩下,有很大的用户基数。但有证据表明,形式正在发生变化。2013年底有人分析了GitHub上30000个项目,统计出了最流行的100个Libraries,可以看出slf4j的发展趋势更好。如下图1所示。

 

图1

image.png

Apache眼看有被logback反超的势头,于2012年7月重写了log4j 1.x,成立了新的项目log4j2.x。log4j2在各个方面都与logback非常相似。

Java的logger世界

Commons logging
Apache的commons项目,一个很薄的logging抽象层,制定了使用log的相关接口和规范,可以由不同的logging implementations,[
http://commons.apache.org/proper/commons-logging/]

SLF4J
Simple Logging Facade for Java, 也是一个logging抽象层,底层logging框架可以是(e.g. java.util.logging, logback, log4j),是Commons logging的替代物, [
http://www.slf4j.org/]

jcl-over-slf4j
提供Commons-logging向slf4j迁移用的bridge, 可参考 [
http://www.slf4j.org/legacy.html]

slf4j-log4j
前面说了,SLF4J是个facade,log4j是其实现的一种框架,抽象成接口,具体的绿叶可以是Log4j/Log4j2/LockBack,当使用log4j时,需要此Jar包作为桥接,可参考 [
http://www.slf4j.org/legacy.html]

log4j
这个不用多说,大家使用最多的是1.x版本, [
http://logging.apache.org/log4j/1.2/], 不过好像2.x也出来了,据说采用了异步机制,性能有很大提升 [http://logging.apache.org/log4j/2.x/]

logback
原生实现了SLF4J API,所以不需要中间的bridge,号称是log4j终结者,下一代logging框架,性能比log4j有很大提升,推荐使用,况且现在我们已经在使用SLF4j,所以切换过去应该是很方便的事。[
http://logback.qos.ch/]

总的来说:slf4j与commons-logging只是一个日志门面,实际还是要依赖真正的日志库log4j,虽然slf4j和commons-loggins自带了日志库,但是毕竟log4j才是最强大的。

 

至于Logback是由log4j创始人设计的另一个开源日志组件,是用来取代log4j,

取代的理由自行百度;

https://blog.csdn.net/zbajie001/article/details/79596109

jul日志

 这个是SUN公司自带的日志输出框架,本来Log4j有建议过加入SUN JDK框架,但是SUN不要人家,后台就出了这个框架,但是比较XX,所以很少人使用;

@Test
 public void test() throws IOException {
     Logger logger = Logger.getLogger("");
     logger.info("Hola JDK!");
 }

这就是jul日志。

commons-logging日志

common logging本身不是log,你可以把它看做是一个日志的接口而log4j就是日志的实现,它自身只是实现了简单的日志实现类.

使用很简单:

commons-logging的使用非常简单。首先,需要在pom.xml文件中添加依赖:

<dependency>
     <groupId>commons-logging</groupId>
     <artifactId>commons-logging</artifactId>
     <version>1.2</version>
 </dependency>

声明测试代码:

public class commons_loggingDemo {
    Log log= LogFactory.getLog(commons_loggingDemo.class);
    @Test
    public void test() throws IOException {
        log.debug("Debug info.");
        log.info("Info info");
        log.warn("Warn info");
        log.error("Error info");
        log.fatal("Fatal info");
    }
}

接下来,在classpath下定义配置文件:commons-logging.properties:

#指定日志对象:
 org.apache.commons.logging.Log = org.apache.commons.logging.impl.Jdk14Logger
 #指定日志工厂:
 org.apache.commons.logging.LogFactory = org.apache.commons.logging.impl.LogFactoryImpl

如果只单纯的依赖了commons-logging,那么默认使用的日志对象就是Jdk14Logger,默认使用的日志工厂就是LogFactoryImpl

commons-logging + Log4j使用:

去掉commons-logging.properties 配置文件:

因为commons-logging

1) 首先在classpath下寻找自己的配置文件commons-logging.properties,如果找到,则使用其中定义的Log实现类; 

2) 如果找不到commons-logging.properties文件,则在查找是否已定义系统环境变量org.apache.commons.logging.Log,找到则使用其定义的Log实现类; 

3) 否则,查看classpath中是否有Log4j的包,如果发现,则自动使用Log4j作为日志实现类; 

4) 否则,使用JDK自身的日志实现类(JDK1.4以后才有日志实现类); 
5)
否则,使用commons-logging自己提供的一个简单的日志实现类SimpleLog; 

需要在pom.xml文件中添加依赖:

<dependency>
    <
groupId>log4j</groupId>
    <
artifactId>log4j</artifactId>
    <
version>1.2.17</version>
</
dependency>

接下来,在classpath下定义配置文件:log4j.properties

# Logger root
 # \u6ce8\u610f\uff1a\u7ebf\u4e0a\u7cfb\u7edf\uff0c\u9700\u628aconsole\u5220\u9664
 log4j.rootLogger=INFO ,console
 
 log4j.logger.org.springframework=info,console
 
 # \u6253\u5370\u5230Console\u7684\u65e5\u5fd7\uff0c\u6ce8\u610f\uff1a\u7ebf\u4e0a\u7cfb\u7edf\u9700\u8981\u5c06\u8be5\u6bb5\u65e5\u5fd7\u914d\u7f6e\u5220\u9664
 #### First appender writes to console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%-4p,%t,%d{MM-dd HH:mm:ss.SSS},%c{2}.%M:%L - %m%n

Log4j日志框架

 log4j是Apache的一个开放源代码的项目,通过使用log4j,我们可以控制日志信息输送的目的地, 日志的输出格式, 日志信息的级别,可以通过一个配置文件来灵活地进行配置,而不需要修改应用的代码

   如果在我们系统中单独使用log4j的话,我们只需要引入log4j的核心包就可以了

   <dependency>
    <
groupId>log4j</groupId>
    <
artifactId>log4j</artifactId>
    <
version>1.2.17</version>
</
dependency>
public class Log4jTest {
            private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Log4jTest.class);
           
            public static void main(String[] args) {
                       logger.info("hello word");
            }
}

在系统的src目录下添加依赖的配置文件:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n

Log4j 的代码结构:

image.png

  Log4j启动过程:

1.找到对应的配置文件,虽然log4j支持多种配置文件,在平时使用中多数使用log4j.xml格式,该文件位置应该放在classpath下面,在log4j启动时候会去clasPath下面找寻log4j.xml。
2.解析xml,根据配置生成对应名字的logger以及绑定该logger应用的appender。

log4j日志输出关键类图

 image.png 

logger组件:logger的父类category完成基本所有的log功能,其中实现的接口appenderAttachable用于存储与该logger绑定的appender。logger主要用于管理日志level,确定是否需要打出日志,即调用appender。其中每个category中都会有自己父亲的引用,当additivity参数为true的时候(默认为true),则会在自己appender输出信息后,调用父category去输出日志。

appender组件:appender是具体message输出的组件,管理信息输出的位置和格式。
Log4j获取特定名字logger

org.apache.log4j.Logger log4j = or.apache.log4h.Logger.getLogger(Name.class)

logger内部的存储是由Hierarchy来完成,在启动过程中会初始化所有log4j.xml中定义的logger。如果logger中不存在该名字的logger则会新生成一个新的logger,由于该名字的logger不存在配置文件中,所以会根据名字规则寻找其父亲logger,如果找不到则会以rootLogger为父亲。所以如果没有配置该类名对应的logger则会调用其父亲logger来输出日志。

Log4j 的详细配置:

#Access log

log4j.appender.A=org.apache.log4j.DailyRollingFileAppender

log4j.additivity.A = false 

log4j.appender.A.File=${catalina.base}/logs/access.log

log4j.appender.A.layout=org.apache.log4j.PatternLayout

log4j.appender.A.layout.ConversionPattern=%-4p,%t,%d{MM-dd   HH:mm:ss.SSS},%c{2}.%M:%L – %m%n

log4j.appender.A.DatePattern='_' yyyy-MM-dd

log4j.appender.A.append=true

log4j.appender.A.ImmediateFlush=true

log4j.appender.A.Threshold = INFO

https://blog.csdn.net/earthchinagl/article/details/70256527

 

高级配置:

https://www.cnblogs.com/dengjiahai/p/4608946.html

https://www.cnblogs.com/leefreeman/p/3610459.html

 

使用坑:

  在高并发时候会有性能问题:

http://zl378837964.iteye.com/blog/2373591

解决的方式:

1.规避:

1)排查代码是否写入了大量日志,删除非必要日志

 2)简化日志序列,尽量不出现日志嵌套(日志B打印调用了日志D)

2.解决:

1)使用log4j异步写AsyncAppender

2)升级到log4j 2 –> 建议

3)使用logback替换log4j –> 建议

4)补丁:使用可重入锁替换synchronized 

slf4j日志

 一个新的日志框架横空出世了:slf4j , 这个框架由log4j的作者开发并且后台出了一个性能更加好的框架logback;

 

 哪有人会问?为什么还需要slf4j了???

 

   https://www.oschina.net/translate/why-use-sl4j-over-log4j-for-logging

  slf4j是一个日志统一的框架,主要是为了接入不同日志系统建立的统一包装层的框架。其他具体实现日志的系统只需要实现slf4j的一些特定规则则可以接入slf4j使用。

 image.png

上图中可以看出,slf4j对外提供了统一的api,这由slf4j-api.jar包提供。另外如果需要接入log4j,则需要在api和具体框架中加入一个适配层,实现slf4j和log4j接口的适配,这个由包slf4j-log4j12实现。

有几个包需要区别一下:

log4j-slf4j-impl 是 slf4j 和 logj4 2 的 Binding,而 slf4j-log4j12 是 slf4j 和 log4j 1.2 的 Binding,jcl-over-slf4j 是common-logggin 适配 slf4j。

log4j-over-slf4j 是把Log4j适配到slf4j,比如有些系统使用logback+slef4j,有一个第三方jar依赖于log4j打印日志,就需要这个包了

log4j-slf4j-impl 是用于log4j2与slf4j 的桥接用的;

slf4j + Log4j 使用过程:

private org.slf4j.Logger log= LoggerFactory.getLogger(SlfloggingDemo.class);

@Test
public void test() throws IOException {
   
log.debug("Debug info.");
   
log.info("Info info");
   
log.warn("Warn info");
   
log.error("Error info");
}
<dependency>
    <
groupId>log4j</groupId>
    <
artifactId>log4j</artifactId>
    <
version>1.2.17</version>
</
dependency>

<
dependency>
    <
groupId>org.slf4j</groupId>
    <
artifactId>slf4j-log4j12</artifactId>
    <
version>1.7.21</version>
</
dependency>

slf4j 底层实现方式:

1.slf4j的loggerfactory在getlogger()的过程中会检查slf4j-LoggerFactory是否初始化没有,其中performInitialization调用会去适配具体的日志框架。(在slf4j-api层)

 image.png 

2.slf4j-LoggerFactory未初始化,则会调用当前classloader去寻找“org/slf4j/impl/StaticLoggerBinder.class”类,并创建该类。所以需要适配slf4j的日志框架都需要实现该类。(在slf4j-api层)

 image.png

3. 调用对应的staticLoggerBinder会初始化log4j。并生成新的log4j12-log4jloggerFactory。该类在返回logger的时候会将log4j返回的logger在装饰一层即log4jLoggerAdapter,用于适配slf4j的logger接口。(在slf4j-log4j12)

image.png

注意:当有多个日志框架的时候,在找寻“org/slf4j/impl/StaticLoggerBinder.class”会出现多个,这时候加载具体那个StaticLoggerBinder.class则会由jvm来决定,从而加载了对应的日志框架。如果不想出现这个问题,则应该保证classPath下只有一个该类。

有些系统你残留了一些另外的日志框架比如apache commons-logging ( 简称 jcl)的接口, 则当引入这个二方包的时候,由于原本自己并不支持jcl接口,或者想将jcl接口最后输入的日志系统为log4j。则需要引入jcl的桥接工具 jcl-over-slf4j,该包功能代码很少,核心为将jcl的接口调用适配成slf4j接口的调用。这样即可让jcl接口的二方库和自己共用一个日志框架。

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>jcl-over-slf4j</artifactId>
    <version>1.7.13</version>
    <scope>runtime</scope>
</dependency>

总结:在使用日志系统中,通常在工作的系统上,主要即为log4j作为具体的日志系统的实现,然后将slf4j作为日志系统的抽象层,这样使得应用和具体的日志系统解耦。

LogBack日志

  LogBack和Log4j都是开源日记工具库,LogBack是Log4j的改良版本,比Log4j拥有更多的特性,同时也带来很大性能提升。LogBack官方建议配合Slf4j使用,这样可以灵活地替换底层日志框架。 为了优化log4j,以及更大性能的提升,Apache基金会已经着手开发了log4j 2.0

LogBack被分为3个组件,logback-core, logback-classic 和 logback-access。 
logback-core
:提供了LogBack的核心功能,是另外两个组件的基础。 
logback-classic
:实现了Slf4j的API,所以当想配合Slf4j使用时,需要将logback-classic加入classpath。 
logback-access
:是为了集成Servlet环境而准备的,可提供HTTP-access的日志接口。

需要在pom.xml文件中添加依赖:

<dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.3</version>
  </dependency>
  <dependency>
        <groupId>org.logback-extensions</groupId>
        <artifactId>logback-ext-spring</artifactId>
        <version>0.1.2</version>
  </dependency>

接下来,在classpath下定义配置文件:logback.xml

<?xml version="1.0" encoding="UTF-8"?>
 
<!-- logback日志系统基础配置 -->
<!-- 1.此处debug="true"与logger level无关,只与配置的状态信息有关(如配置文件是否规范,某些标签元素属性是否赋值) -->
<!-- 3.此处scan="true"设置后,可以扫描本日志配置文件变动并重加载配置,可设置扫描间隔时间,默认为1分钟扫描一次,单位milliseconds, seconds, minutes 或 hours,如scanPeriod="30 seconds" -->
<!-- 4.此处packagingData="true"可以在日志后看到依赖jar包名和版本,很费性能,不建议开启 -->
 
<configuration debug="false" scan="true" scanPeriod="30 seconds" packagingData="false">
 
 
 
   
<!-- 设置 logger context 名称,一旦设置不可改变,默认为default -->
   
<contextName>sharding-jdbc-demo</contextName>
 
    <
property name="logDir" value="D:/eclipse-workspace/logs" />
 
 
   
<!--用于对控制台进行日志输出-->
   
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
       
<!-- encoders are by default assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder -->
       
<encoder>
            <
pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </
encoder>
    </
appender>
 
    <
appender name="fileInfoLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <
file>${logDir}/info.log</file>
       
<!-- 过滤日志 -->
        <!-- 过滤掉非INFO级别 -->
       
<filter class="ch.qos.logback.classic.filter.LevelFilter">
            <
level>INFO</level>
            <
onMatch>ACCEPT</onMatch>  <!-- 如果命中就禁止这条日志 -->
           
<onMismatch>DENY</onMismatch> <!-- 如果没有命中就使用这条规则 -->
       
</filter>
 
        <
rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
           
<!-- rollover daily -->
           
<fileNamePattern>${logDir}/info-%d{yyyy-MM-dd_HH-mm}.%i.log</fileNamePattern>
            <
maxHistory>5</maxHistory>
            <
timeBasedFileNamingAndTriggeringPolicy
                   
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
               
<!-- or whenever the file size reaches 100MB -->
               
<maxFileSize>10MB</maxFileSize>
            </
timeBasedFileNamingAndTriggeringPolicy>
        </
rollingPolicy>
        <
encoder>
            <
charset>UTF-8</charset>
            <
pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
        </
encoder>
    </
appender> 
 
    <
appender name="fileErrorLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <
file>${logDir}/error.log</file>
       
<!-- 过滤日志 -->
       
<filter class="ch.qos.logback.classic.filter.LevelFilter">
           
<!-- 过滤掉非IERROR级别 -->
           
<level>ERROR</level>
            <
onMatch>ACCEPT</onMatch>  <!-- 如果命中就禁止这条日志 -->
           
<onMismatch>DENY</onMismatch> <!-- 如果没有命中就使用这条规则 -->
       
</filter>
 
        <
rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
           
<!-- rollover daily -->
           
<fileNamePattern>${logDir}/error-%d{yyyy-MM-dd_HH-mm}.%i.log</fileNamePattern>
            <
maxHistory>5</maxHistory>
            <
timeBasedFileNamingAndTriggeringPolicy
                   
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
               
<!-- or whenever the file size reaches 100MB -->
               
<maxFileSize>10MB</maxFileSize>
            </
timeBasedFileNamingAndTriggeringPolicy>
        </
rollingPolicy>
        <
encoder>
            <
charset>UTF-8</charset>
            <
pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
        </
encoder>
    </
appender> 
 
 
    <
appender name="ASYNC_FILEINFO_LOG" class="ch.qos.logback.classic.AsyncAppender">
       
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
       
<discardingThreshold>0</discardingThreshold>
       
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
       
<queueSize>2048</queueSize>
       
<!-- 添加附加的appender,最多只能添加一个 -->
       
<appender-ref ref="fileInfoLog" />
    </
appender>
 
 
   
<!-- 日志级别若没显示定义,则继承最近的父logger(该logger需显示定义level,直到rootLogger)的日志级别-->
    <!-- logger的appender默认具有累加性(默认日志输出到当前logger的appender和所有祖先logger的appender中),可通过配置 “additivity”属性修改默认行为-->
   
<logger name="com.fulihui.sharding.jdbc" level="INFO" additivity="false">
        <
appender-ref ref="ASYNC_FILEINFO_LOG" />
    </
logger>
 
   
<!-- 至多只能配置一个root -->
   
<root level="INFO">
        <
appender-ref ref="STDOUT" />
       
<!--<appender-ref ref="fileInfoLog" />-->
       
<appender-ref ref="fileErrorLog" />
        <
appender-ref ref="ASYNC_FILEINFO_LOG" />
    </
root>
 
  </
configuration>

 

 

https://blog.csdn.net/zzzgd_666/article/details/80458444

 

logback 使用的坑:

 1.packagingData="false" 当此属性设置为true时,logback可以包含它输出的堆栈跟踪行的每一行的打包数据,很影响性能,建议线上不能开启

 2.配置中有个<discardingThreshold>0</discardingThreshold>这个很表示不丢日志,并且底层是通过

//class :  ch.qos.logback.core.AsyncAppenderBase 的 append方法
  BlockingQueue<E> blockingQueue;
  @Override
  protected void append(E eventObject) {
    if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
      return;
    }
    preprocess(eventObject);
    put(eventObject);
  }
  private void put(E eventObject) {
    try {
      blockingQueue.put(eventObject);
    } catch (InterruptedException e) {
    }
  }

从代码中可以看出来虽然是异步的,但是把日志塞进队列中用的是put方法.是会block的,直到队列空出位置来,所以在配置上可以把队列配置大一掉;建议还是升级采用异步方式

采用log4j2 所以就出现 log4j2 这个异步框架了。

Log4j 与 logback的性能对比:

https://my.oschina.net/OutOfMemory/blog/789267

Log4j2.0日志

  Log4j2.0基于LMAX Disruptor的异步日志在多线程环境下性能会远远优于Log4j 1.x和logback(官方数据是10倍以上)。我想日后logback 也会优化成异步模式的,具体看官方公告。。。。

 

  https://www.jianshu.com/p/570b406bddcd

 

  具体搭建:

  需要在pom.xml文件中添加依赖:

 

<!--log4j-2模式-->
 
<dependency>
    <
groupId>org.slf4j</groupId>
    <
artifactId>slf4j-api</artifactId>
    <
version>1.7.25</version>
  </
dependency>
 
 
<!--核心log4j2jar包-->
 
<dependency>
    <
groupId>org.apache.logging.log4j</groupId>
    <
artifactId>log4j-api</artifactId>
    <
version>2.11.1</version>
  </
dependency>
 
  <
dependency>
    <
groupId>org.apache.logging.log4j</groupId>
    <
artifactId>log4j-core</artifactId>
    <
version>2.11.1</version>
  </
dependency>
 
 
<!--用于与slf4j保持桥接-->
 
<dependency>
    <
groupId>org.apache.logging.log4j</groupId>
    <
artifactId>log4j-slf4j-impl</artifactId>
    <
version>2.11.1</version>
  </
dependency>
 
 
<!--需要使用log4j2的AsyncLogger需要包含disruptor-->
 
<dependency>
    <
groupId>com.lmax</groupId>
    <
artifactId>disruptor</artifactId>
    <
version>3.4.2</version>
  </
dependency>
 
 
<!--web工程需要包含log4j-web,非web工程不需要-->
 
<dependency>
    <
groupId>org.apache.logging.log4j</groupId>
    <
artifactId>log4j-web</artifactId>
    <
version>2.4.1</version>
    <
scope>runtime</scope>
  </
dependency>

 

 

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="ERROR">
    <Properties>
        <Property name="baseDir">D:\eclipse-workspace\logs</Property>
        <Property name="filename">D:\eclipse-workspace\logs/info.log</Property>
        <Property name="filenameError">D:\eclipse-workspace\logs/error.log</Property>
    </Properties>

    <Appenders>
        <Console name="STDOUT">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %l - %msg%n"/>
        </Console>

        <RollingFile name="RollingFile" fileName="${filename}"
                     filePattern="${baseDir}/${date:yyyy-MM}/info-%d{yyyy-MM-dd-HH-mm}.log.gz">
            <PatternLayout pattern="%d %-5level [%t]%l - %msg%n"/>
            <Policies>
                <SizeBasedTriggeringPolicy size="200 MB"/>
                <TimeBasedTriggeringPolicy interval="10" modulate="true"/>
            </Policies>

            <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="ACCEPT"/>

            <!--自动删除超过120天的日志压缩文件-->
            <DefaultRolloverStrategy>
                <Delete basePath="${baseDir}" maxDepth="2">
                    <IfFileName glob="*/info-*.log.gz"/>
                    <IfLastModified age="20d"/>
                </Delete>
            </DefaultRolloverStrategy>

        </RollingFile>

        <!--错误日志入文件-->
        <RollingFile name="RollingFileError" fileName="${filenameError}"
                     filePattern="${baseDir}/${date:yyyy-MM}/error-%d{yyyy-MM-dd-HH}.log">
            <PatternLayout pattern="%d %-5level [%t]%l - %msg%n"/>
            <Policies>
                <SizeBasedTriggeringPolicy size="200 MB"/>
                <TimeBasedTriggeringPolicy interval="24" modulate="true"/>
            </Policies>

            <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>

            <!--自动删除超过120天的日志压缩文件-->
            <DefaultRolloverStrategy>
                <Delete basePath="${baseDir}" maxDepth="2">
                    <IfFileName glob="*/error-*.log"/>
                    <IfLastModified age="30d"/>
                </Delete>
            </DefaultRolloverStrategy>
        </RollingFile>


    </Appenders>

    <Loggers>

        <!--采用异步输出日志-->
        <AsyncLogger name="com.fulihui.sharding.jdbc" level="debug"   additivity="false">
            <!--写入info级别-->
            <AppenderRef ref="RollingFile" />
            <!--写入error级别-->
            <AppenderRef ref="RollingFileError" level="error"/>

            <AppenderRef ref="STDOUT"/>

        </AsyncLogger>

        <!--采用异步输出日志-->
        <AsyncRoot level="debug">
            <AppenderRef ref="STDOUT"/>
        </AsyncRoot>

    </Loggers>
</Configuration>


配置详情:

    https://blog.csdn.net/u013269532/article/details/53186526

https://blog.csdn.net/scherrer/article/details/73744392


logback log4j log4j2 性能实测

 https://blog.souche.com/logback-log4j-log4j2shi-ce/ 

快捷键 Eclipse VS Idea

分类

功能点

Eclipse快捷键

IDEA快捷键

搜索

搜索文本

Ctrl + F

Ctrl + F

Ctrl + R 查找替换

Alt + P/A 逐个/全部替换

Alt + F3 查找当前选中词

继续搜索

Ctrl + K 向前

Ctrl + Shift + K 向后

F3

Shift + F3

搜索方法

Ctrl + O

Ctrl + F12

显示类的所有防范 Alt + 7 

搜索类

Ctrl + Shift + T

Ctrl + N

搜索所有文件 Ctrl + Shift + R

搜索文件

Ctrl + Shift + T

Ctrl + Shift + N

这两个都支持简单的正则表达式,还支持直接按大写字母的缩略,例如:

查找JsonTranscoder,只需要输入JT

搜索所有引用处

Ctrl + Alt + H

Alt + F7

搜索所有文本出现的位置

Ctrl + H

Ctrl + Shift + F

编辑

自动代码补全

Alt + /

Ctrl + J

自动代码生成

Alt + Insert

快速修复错误

Ctrl + 1

Alt + Enter

删除当前行

Ctrl + D

Ctrl + X

复制到下一行

Ctrl + D

注释/取消注释

Ctrl + /

Ctrl + /

选中当前字

Ctrl + W

 

补全当前行

Ctrl + Shift + Enter

神器,补全当前行,最常用的场景时补全当前行后的;号,并将光标定位到下一行

调出最近复制的N份内容

Ctrl + Shift + V

查看最近编辑的文件

Ctrl + E

对比最近修改

Alt + Shift + C

格式化代码

Ctrl + Shift + F

Ctrl + Alt + L

整理import

Ctrl + Shift + O

Ctrl + Alt + O

跳转

显示方法层次

Ctrl + Shift + H

显示类、方法说明

F2

Ctrl + Q

跳到方法定义处

Ctrl + B

跳到方法实现处

Ctrl + Alt + B

跳到上/下一方法

Alt + Up/Down

上/下一查看处

Alt + <-

Alt + ->

Ctrl + Alt + Up/Down

跳到指定行

Ctrl + L

Ctrl + G

重构

改名

Alt + Shift + R

Shift + F6

其他常用

Ctrl + F6 修改方法签名

Ctrl + Shift + F6 修改参数的类型

Ctrl + Shift + V引入局部变量

Ctrl + Shift + P 引入参数

Ctrl + Shift + F 引入类变量

Ctrl + Shift + M 引入方法

Ctrl + Shift + C 引入常量

运行

启动调试

Alt + Shift + F9

启动运行

Alt + Shift + F10

单步进入

F5

F7

单步跳过

F6

F8

跳过

F8

F9

执行选中语句

Alt + F8

窗口

调出界面

Ctrl + Alt + S调出Settings界面

Ctrl + Alt + Shift + S调出项目Setting界面

关闭界面

Ctrl + F4 或 ESC

打开窗口

Alt + 窗口编号(例如项目窗口编号是1)

最大化窗口

Ctrl + M

Ctrl + Shift + F12

隐藏窗口

Shift + ESC

关闭当前文件

Ctrl + F4

垂直分屏

Ctrl + | (自定义的)

调整窗口位置

Ctrl + M 将当前光标处显示到屏幕中央

切换窗口

Ctrl + Tab

使用Nginx+Lua实现Web项目的灰度发布

需求:

领导对时间要求紧迫、研发对现有系统摸不透、做到数据的兼容性,基于这样的要求就必须做到系统上线采用灰度的方式,指定忠实用户进行线上测试、选取有特征的群体进行线上测试和基于流量切换的方式进行线上测试等。

常规的部署做法:

常规的部署方式是采用Nginx的 upstream  配置来简单的实现新旧机器的切换, 在开发过程中,开发完成,完成测试阶段,修复bug后都要重启后台服务,测试又在测试,每次重启都要一两分钟,平凡的重启,测试不干了;所以想到就是部署两台服务器;用nginx upstream 模块实现 无感知部署,发现一个bug,修复;直接部署不会打断测试;常用的是一台部署完毕之后部署另外一台机器; 

灰度发布概述:

灰度发布,简单来说,就是根据各种条件,让一部分用户使用旧版本,另一部分用户使用新版本。

灰度发布是指在黑与白之间,能够平滑过渡的一种发布方式。AB test就是一种灰度发布方式,让一部分用户继续用A,一部分用户开始用B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面 来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度。

灰度部署还可以根据设定的规则将请求路由到我们的灰度版本(灰度机器)上来。比如对于API来说,一般有如下几个需求:特定用户(比如测试帐号)、 特定的App(比如测试app或者合作App)、特定的模块、接口(只有某些接口需要灰度,这种一般是API Container的修改,拿一些不是很重要的API做灰度测试)、特定的机器(某些请求IP转发到灰度机)等。

本章只是简单的简述灰度部署的实现思路:

这里我们所做的灰度发布稍作改变:用1-2台机器作为B,B测试成功再部署A。用于WEB系统新代码的测试发布,让一部分(IP)用户访问新版本,一部分用户仍然访问正常版本,原理如图:

 image.png


执行过程:
1
、当用户请求到达前端web(代理)服务器Openresty,内嵌的lua模块解析Nginx配置文件中的lua脚本代码;
2
、Lua获取客户端IP地址,去查询Redis中是否有该键值,如果有返回值执行@clien2,否则执行@client1。
3
、Location @client2把请求转发给预发布服务器,location @client1把请求转发给生产服务器,服务器返回结果,整个过程完成。

Lua 的好处并不至于这个,可以使用LUA语言是实现一些业务上的负载,比如热点分离; 热点数据的自动降级机制;

  Lua教程: https://www.runoob.com/lua/lua-tutorial.html


案例实现:

1.       安装部署OpenResty:

OpenResty由Nginx核心加很多第三方模块组成,默认集成了Lua开发环境,使得Nginx可以作为一个Web Server使用。
     借助于Nginx的事件驱动模型和非阻塞IO,可以实现高性能的Web应用程序。
     而且OpenResty提供了大量组件如Mysql、Redis、Memcached等等,使在Nginx上开发Web应用更方便更简单。

1、部署第一个nginx,作为应用层nginx(192.168.1.104那个机器上)

1、  创建目录/usr/servers,以后我们把所有软件安装在此目录

mkdir -p /usr/servers 

cd /usr/servers/

2、  安装依赖(我的环境是centos,可以使用如下命令安装,其他的可以参考openresty安装步骤)

   yum install -y readline-devel pcre-devel openssl-devel gcc

 3、  下载ngx_openresty-xxx.tar.gz并解压(ngx_openresty-xxx/bundle目录里存nginx核心和很多第三方模块,比如有我们需要的Lua和LuaJIT。)

wget https://openresty.org/download/ngx_openresty-1.9.7.1.tar.gz

tar xvf ngx_openresty-1.9.7.1.tar.gz

 cd ngx_openresty-1.9.7.1

2. 安装LuaJIT

cd bundle/LuaJIT-2.1-20151219/

make clean && make && make install

 ln -sf luajit-2.1.0-alpha /usr/local/bin/luajit

 下载ngx_cache_purge模块,该模块用于清理nginx缓存

root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# wget https://github.com/FRiCKLE/ngx_cache_purge/archive/2.3.tar.gz
root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# tar -xvf 2.3.tar.gz

下载nginx_upstream_check_module模块,该模块用于ustream健康检查

             root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# 

                   wget https://github.com/yaoweibin/nginx_upstream_check_module/archive/v0.3.0.tar.gz

root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# tar -xvf v0.3.0.tar.gz


  安装ngx_openresty

root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# cd .. 
root@user:/usr/servers/ngx_openresty-1.9.7.1# ./configure –prefix=/usr/servers –with-http_realip_module –with-pcre –with-luajit –add-module=./bundle/ngx_cache_purge-2.3/ –add-module=./bundle/nginx_upstream_check_module-0.3.0/ -j2 
root@user:/usr/servers/ngx_openresty-1.9.7.1# make && make install

参数说明:
–with***
安装一些内置/集成的模块
–with-http_realip_module
取用户真实ip模块
-with-pcre Perl
兼容的达式模块
–with-luajit
集成luajit模块
–add-module
添加自定义的第三方模块,如此次的ngx_che_purge

  到/usr/servers目录下用ll命令查看,会发现多出来了如下目录,说明安装成功

root@user:/usr/servers/ngx_openresty-1.9.7.1# cd /usr/servers/ 
root@user:/usr/servers# ll

image.png

说明:
/usr/servers/luajit
:luajit环境,luajit类似于java的jit,即即时编译,lua是一种解释语言,通过luajit可以即时编译lua代码到机器代码,得到很好的性能;
/usr/servers/lualib
:要使用的lua库,里边提供了一些默认的lua库,如redis,json库等,也可以把一些自己开发的或第三方的放在这;
/usr/servers/nginx
:安装的nginx,通过/usr/servers/nginx/sbin/nginx -V 查看nginx版本和安装的模块

启动nginx

root@user:/usr/servers# /usr/servers/nginx/sbin/nginx
检测配置是否正确(需要先切换到root用户):
root@user:/usr/servers# /usr/servers/nginx/sbin/nginx -t
重启nginx:
root@user:/usr/servers# /usr/servers/nginx/sbin/nginx -s reload


LUA环境测试:

1、              为了方便开发我们在/usr/servers/nginx/conf目录下创建一个lua.conf 
root@user:/home/user# cd /usr/servers/nginx/conf
root@user:/usr/servers/nginx/conf# vim lua.conf

server  {

       listen 80;

       server_name _;

       #HelloWorld

       location /lua {

              default_type 'text/html';

              content_by_lua 'ngx.say("hello world")';

       }

}

编辑nginx.conf配置文件 

vim /usr/servers/nginx/conf/nginx.conf 
在http部分添加如下配置 
lua_package_path "/usr/servers/lualib/?.lua;;"; #lua
模块 
lua_package_cpath "/usr/servers/lualib/?.so;;"; #c
模块 
include lua.conf; #
单独lua配置

重启nginx

  /usr/servers/nginx/sbin/nginx -s reload

 访问如http://192.168.1.104/lua(自己的机器根据实际情况换ip),可以看到如下内容 

hello world
说明配置成功。

灰度部署测试:

采用redis 方式;比如 192.168.0.101这个IP采用的是访问服务器的项目资源,其他IP是访问旧项目的资源;进行测试完毕并且完成之后,可以切换正式环境;

image.png

默认情况下lua_code_cache  是开启的,即缓存lua代码,即每次lua代码变更必须reload nginx才生效,如果在开发阶段可以通过lua_code_cache  off;关闭缓存,这样调试时每次修改lua代码不需要reload nginx;但是正式环境一定记得开启缓存

开启后reload nginx会看到如下报警
nginx: [alert] lua_code_cache is off; this will hurt performance ******;


配置文件:

Nginx.conf:

#user  nobody;
worker_processes  1;
 
#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;
 
#pid        logs/nginx.pid;
 
 
events {
    worker_connections  1024;
}
 
 
http {
    include       mime.types;
    default_type  application/octet-stream;
       
       lua_package_path "/usr/servers/lualib/?.lua;;"; #lua 模块 
       lua_package_cpath "/usr/servers/lualib/?.so;;"; #c模块 
       
 
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
 
    access_log  logs/access.log  main;
 
    sendfile        on;
    #tcp_nopush     on;
 
    #keepalive_timeout  0;
    keepalive_timeout  65;
 
    #gzip  on;
       
       proxy_buffering             off;
    proxy_set_header            Host $host;
    proxy_set_header            X-real-ip $remote_addr;
    proxy_set_header            X-Forwarded-For $proxy_add_x_forwarded_for;
       
       
       upstream productServer {
         server 127.0.0.1:8080 weight=1 max_fails=3 fail_timeout=100s; #模拟生产服务器
       }
 
       upstream preServer {
         server 127.0.0.1:8081 weight=1 max_fails=3 fail_timeout=100s;  #模拟预发布服务器
       }
 
 
    server {
        listen       80;
        server_name  localhost;
 
        #charset koi8-r;
 
        #access_log  logs/host.access.log  main;
 
        location / {
                default_type 'text/html';  
          lua_code_cache off;  
          content_by_lua_file /usr/servers/nginx/conf/huidu.lua;
        }
 
        #error_page  404              /404.html;
        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
 
              location @productServer{
                proxy_pass http://productServer;
        }
              
        location @preServer{
                proxy_pass http://preServer;
        }
    
    }
 
 
 
}

huidu.lua:

local redis = require "resty.redis" 
local cache = redis.new() 
cache:set_timeout(60000)
 
local ok, err = cache.connect(cache, '127.0.0.1', 6379) 
if not ok then 
    ngx.say("failed to connect:", err) 
    return 
end 
 
--local red, err = cache:auth("foobared")
--if not red then
    --ngx.say("failed to authenticate: ", err)
    --return
--end
 
local local_ip = ngx.req.get_headers()["X-Real-IP"]
if local_ip == nil then
    local_ip = ngx.req.get_headers()["x_forwarded_for"]
end
 
if local_ip == nil then
    local_ip = ngx.var.remote_addr
end
--ngx.say("local_ip is : ", local_ip)
 
local intercept = cache:get(local_ip) 
 
 
if intercept == local_ip then
    ngx.exec("@preServer")
    return
end
 
ngx.exec("@productServer")
 
local ok, err = cache:close() 
 
if not ok then 
    ngx.say("failed to close:", err) 
    return 
end

上面的代码是简单的IP相等,可以采用IP段形式;


上面的例子测试:

image.png

image.png

image.png

额外参考资料:

https://my.oschina.net/izhangll/blog/884713

Redis Sentinel 容灾演练

本文主要介绍基于redis高可用的集群搭建,并做相应的高可用的容灾演练过程,熟悉一下redis高可用的方案配置;

简单的redis 系列教程可以查看之前的文章 http://www.dczou.com/viemall/tag/redis

首先的知道redis Sentinel 主从原理机制:

image.pngimage.png

image.pngimage.png


安装搭建Sentinel 集群:

分别有3个Sentinel节点,1个主节点,1个从节点组成一个简单的高可用方案;

role

IP

port

master

192.168.1.104

6379

slave

192.168.1.105

6379

Sentinel1

192.168.1.104

5000

Sentinel2

192.168.1.105

5000

Sentinel3

192.168.1.106

5000

安装redis 和部署redis 的主从方式,本章不阐述

配置sentinel.conf

哨兵默认用26379端口,默认不能跟其他机器在指定端口连通,只能在本地访问

192.168.1.104机器实例:

将sentinel.conf 复制 /usr/local/redis/etc

cp /opt/program/tools/redis-4.0.10/sentinel.conf  /usr/local/redis/etc

配置如下:

port 5000

bind 192.168.1.104

dir /tmp

sentinel monitor mymaster 192.168.1.104 6379 2

sentinel down-after-milliseconds mymaster 30000

sentinel failover-timeout mymaster 60000

sentinel parallel-syncs mymaster 1

daemonize yes

logfile /usr/local/redis/logs/sentinel_5000_log.log

192.168.1.105机器实例:

port 5000
bind 192.168.1.105
dir /tmp
sentinel monitor mymaster 192.168.1.104 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1
daemonize yes
logfile /usr/local/redis/logs/sentinel_5000_log.log

192.168.1.106机器实例:

port 5000
bind 192.168.1.106
dir /tmp
sentinel monitor mymaster 192.168.1.104 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1
daemonize yes
logfile /usr/local/redis/logs/sentinel_5000_log.log

Sentinel 配置文件解析:

http://www.mamicode.com/info-detail-1898706.html

检查设置:

启动哨兵进程

在192.168.1.104、192.168.1.105、192.168.1.106三台机器上,分别启动三个哨兵进程,组成一个集群,观察一下日志的输出

启动哨兵:

   方式一:redis-sentinel /path/to/sentinel.conf(推荐,这种方式启动和redis实例没有任何关系)

  方式二:redis-server /path/to/sentinel.conf –sentinel

  image.png

  日志里会显示出来,每个哨兵都能去监控到对应的redis master,并能够自动发现对应的slave

  哨兵之间,互相会自动进行发现,用的就是之前说的pub/sub,消息发布和订阅channel消息系统和机制


检查哨兵状态

redis-cli -h 192.168.1.104 -p 5000

sentinel master mymaster

SENTINEL slaves mymaster

SENTINEL sentinels mymaster

SENTINEL get-master-addr-by-name mymaster


检查各个redis状态查看

104、105主从状态查看命令

image.png

image.png


演练步骤:

哨兵节点的增加和删除:

    增加sentinal,会自动发现

   删除sentinal的步骤:

    (1)停止sentinal进程

    (2)SENTINEL RESET *,在所有sentinal上执行,清理所有的master状态

    (3)SENTINEL MASTER mymaster,在所有sentinal上执行,查看所有sentinal对数量是否达成了一致


slave的永久下线

   让master摘除某个已经下线的slave:SENTINEL RESET mastername,在所有的哨兵上面执行

    slave-priority,值越小优先级越高

容灾演练:

  master redis主节点宕机,看是否slave会选举成新的master节点;

  通过哨兵看一下当前的master:SENTINEL get-master-addr-by-name mymaster

  image.png

  image.png

   已经切换成功了;

    可以看到log信息

    image.png

    查看sentinal的日志,是否出现+sdown字样,识别出了master的宕机问题; 然后出现+odown字样,就是指定的quorum哨兵数量,都认为master宕机了

故障恢复

再将旧的master重新启动,查看是否被哨兵自动切换成slave节点

image.png

经过以上步骤,基本的sentinel下的高可用一主一从redis高可用就配置完成了。

java测试代码:

public static void main(String[] args) {

        Set<String> sentinels = new HashSet<String>();

        sentinels.add(new HostAndPort("192.168.1.104", 500).toString());

        sentinels.add(new HostAndPort("192.168.1.105", 500).toString());

        sentinels.add(new HostAndPort("192.168.1.106", 500).toString());

        JedisSentinelPool sentinelPool = new JedisSentinelPool("mymaster", sentinels);

        System.out.println("Current master: " + sentinelPool.getCurrentHostMaster().toString());

    }