基于binlog 增量的数据解析服务

什么是binlog

binlog是mysql的一种二进制日志文件,用来记录数据的变化。mysql使用binlog进行主从复制,如图:

 image.png

客户端向master的mysql sever写入数据

  1. 当数据发生变化时,master将变更的数据记录写入到二进制文件中,即binlog。

  2. slave订阅了master的binlog,所以会通过一个I/O THREAD与master的DUMP THREAD进行通信,同步binlog

  3. I/O      THREAD读取到binlog后会吸入到relay log中,准备重放。

  4. slave会通过SQL THREAD读取relay log,重放数据的改动并执行相应的改动。

这里有几点需要注意:

  1. 主从复制不是强一致性,只能保证最终一致

  2. master配合binlog复制会影响性能,所以尽量不要在master上挂太多的slave,如果对时间要求不高,可以在slave上挂slave

2.binlog的业务应用

  上面介绍了mysql中应用binlog的场景,而我们的业务可以伪装成master的slave节点,感知数据的变化,这就给了我们很多的业务运用空间。

2.1 数据异构

 经常有这样一个场景:

  原来业务是一个很单一的系统,所以表也在一起。随着业务的发展,系统开始拆分,总有一些表是各个业务都关注的表,但是对相关的字段的运用场景不同,所以这样一份元数据怎样更好的为各个系统服务就成了问题。当然,多写或者读写分离可以从物理节点上减少对数据服务器的压力,但是对业务并没有做到足够的支持,因为这些表都是一样的。因此我们可以通过binlog进行数据异构。

image.png

如图所示,订单系统生成订单后,通过binlog可以解析生成用户维度的订单信息供用户中心查询、商户维度订单表供运营管理,以及搜索系统的搜索数据,提供全文搜索功能。

这样,我们就通过原始的订单数据异构到三个系统中,提供了丰富的数据访问功能。不仅从节点上降低了数据服务器的压力,数据表现形式也更贴近自己的服务,减少不必要的字段冗余。

2.2 缓存数据的补充

对于高并发的系统,数据库往往是系统性能的瓶颈,毕竟IO响应速度是远远小于电子的运算速度的。因此,很多查询类服务都会在CPU与数据库之间加上一层缓存。即现从缓存获取,命中后直接返回,否则从DB中获取并存入缓存后返回。而如果原始数据变化了但缓存尚未超时,则缓存中的数据就是过时的数据了。当数据有变更的时候主动修改缓存数据。

image.png

当客户端更改了数据之后,中间件系统通过binlog获得数据变更,并同步到缓存中。这样就保证了缓存中数据有效性,减少了对数据库的调用,从而提高整体性能。

2.3 基于数据的任务分发

有这样一个场景:

  很多系统依赖同一块重要数据,当这些数据发生变化的时候,需要调用其他相关系统的通知接口同步数据变化,或者mq消息告知变化并等待其主动同步。这两种情况都对原始系统造成了侵入,原始系统改一块数据,并不想做这么多其他的事情。所以这时候可以通过binlog进行任务分发。

image.png

当原始业务系统修改数据后,不需要进行其他的业务关联。由调度系统读取binlog进行相应的任务分发、消息发送以及同步其他业务状态。这样可以将其他业务与原始业务系统解耦,并从数据的角度将所有管理功能放在了同一个调度系统中,责任清晰。

2.4 可以用于数据平滑迁移:

   https://www.w3cschool.cn/architectroad/architectroad-data-smooth-migration.html

2.5 数据抽取

https://blog.csdn.net/u010670689/article/details/81066807

mysql bin-log相关比较好的开源项目

https://blog.csdn.net/everlasting_188/article/details/53304530

 

binlog的业务实践:

1.  首先需要开启mysql bin_log功能;

启动成功之后,我们可以登陆查看我们的配置是否起作用

show variables like '%log_bin%'

本次解析只考虑insert、update、delete三种事件类型

 

采用OpenReplicator解析MySQL binlog

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。

Open Replicator项目地址:https://github.com/whitesock/open-replicator

 

binlog事件分析结构图

image.png

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{

  "eventId": 1,

  "databaseName": "canal_test",

  "tableName": "`company`",

  "eventType": 2,

  "timestamp": 1477033198000,

  "timestampReceipt": 1477033248780,

  "binlogName": "mysql-bin.000006",

  "position": 353,

  "nextPostion": 468,

  "serverId": 2,

  "before": null,

  "after": null,

  "isDdl": true,

  "sql": "DROP TABLE `company` /* generated by server */"

}

 

DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

 

{

  "eventId": 0,

  "databaseName": "canal_test",

  "tableName": "person",

  "eventType": 24,

  "timestamp": 1477030734000,

  "timestampReceipt": 1477032161988,

  "binlogName": "mysql-bin.000006",

  "position": 242,

  "nextPostion": 326,

  "serverId": 2,

  "before": {

    "id": "3",

    "sex": "f",

    "address": "shanghai",

    "age": "23",

    "name": "zzh3"

  },

  "after": {

    "id": "3",

    "sex": "m",

    "address": "shanghai",

    "age": "23",

    "name": "zzh3"

  },

  "isDdl": false,

  "sql": null

}

 

相关的类文件如下: 

OpenReplicatorTest:
public class OpenReplicatorTest {

   
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
   
private static final String host = "192.168.56.101";
   
private static final int port = 3306;
   
private static final String user = "root";
   
private static final String password = "123456";


   
public static void main(String[] args) throws IOException {


        OpenReplicator or =
new OpenReplicator ();
        or.setUser(
user);
        or.setPassword(
password);
        or.setHost(
host);
        or.setPort(
port);
        MysqlConnection.setConnection(
host, port, user, password);

       
//      or.setServerId(MysqlConnection.getServerId());
        //
配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId

       
BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
        or.setBinlogFileName(bms.getBinlogName());
        or.setBinlogPosition(
4);
        or.setBinlogEventListener(
new NotificationListener());
       
try {
            or.start();
        }
catch (Exception e) {
           
logger.error(e.getMessage(),e);
        }
        Thread thread =
new Thread(new PrintLogEvent());
        thread.start();
    }

   
public static class PrintLogEvent implements Runnable{
       
@Override
       
public void run() {
           
while(true){
               
if(CDCEventManager.queue.isEmpty() == false)
                {
                    LogEvent ce = CDCEventManager.
queue.pollFirst();
                    String prettyStr1 = JSON.toJSONString(ce);
                    System.
out.println(prettyStr1);
                }
               
else{
                   
try {
                        TimeUnit.
SECONDS.sleep(1);
                    }
catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }


}
 
 
LogEvent:
public class LogEvent implements Serializable {

   
/**
     *
只针对delete、insert、update事件
     */
   
private static final long serialVersionUID = 5503152746318421290L;

   
private long eventId = 0;//事件唯一标识
   
private String databaseName = null;
   
private String tableName = null;
   
private int eventType = 0;//事件类型
   
private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
   
private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
   
private long position = 0;
   
private long nextPostion = 0;
   
private long serverId = 0;
   
private Map<String, String> before = null;
   
private Map<String, String> after = null;
   
private Boolean isDdl = null;
   
private String sql = null;

   
private static AtomicLong uuid = new AtomicLong(0);

   
public LogEvent() {}

   
public LogEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName) {
       
this.init(are);
       
this.databaseName = databaseName;
       
this.tableName = tableName;
    }

   
private void init(final BinlogEventV4 be) {
       
this.eventId = uuid.getAndAdd(1);
        BinlogEventV4Header header = be.getHeader();
       
this.timestamp = header.getTimestamp();
       
this.eventType = header.getEventType();
       
this.serverId = header.getServerId();
       
this.timestampReceipt = header.getTimestampOfReceipt();
       
this.position = header.getPosition();
       
this.nextPostion = header.getNextPosition();
    }

   
@Override
   
public String toString() {
        StringBuilder builder =
new StringBuilder();
        builder.append(
"{ eventId:").append(eventId);
        builder.append(
",databaseName:").append(databaseName);
        builder.append(
",tableName:").append(tableName);
        builder.append(
",eventType:").append(eventType);
        builder.append(
",timestamp:").append(timestamp);
        builder.append(
",timestampReceipt:").append(timestampReceipt);
        builder.append(
",position:").append(position);
        builder.append(
",nextPostion:").append(nextPostion);
        builder.append(
",serverId:").append(serverId);
        builder.append(
",isDdl:").append(isDdl);
        builder.append(
",sql:").append(sql);
        builder.append(
",before:").append(before);
        builder.append(
",after:").append(after).append("}");

       
return builder.toString();
    }

   
public long getEventId() {
       
return eventId;
    }

    
public void setEventId(long eventId) {
       
this.eventId = eventId;
    }

   
public String getDatabaseName() {
       
return databaseName;
    }

   
public void setDatabaseName(String databaseName) {
       
this.databaseName = databaseName;
    }

   
public String getTableName() {
       
return tableName;
    }

   
public void setTableName(String tableName) {
       
this.tableName = tableName;
    }

   
public int getEventType() {
       
return eventType;
    }

   
public void setEventType(int eventType) {
       
this.eventType = eventType;
    }

   
public long getTimestamp() {
       
return timestamp;
    }

   
public void setTimestamp(long timestamp) {
       
this.timestamp = timestamp;
    }

   
public long getTimestampReceipt() {
       
return timestampReceipt;
    }

   
public void setTimestampReceipt(long timestampReceipt) {
       
this.timestampReceipt = timestampReceipt;
    }

   
public long getPosition() {
       
return position;
    }

   
public void setPosition(long position) {
       
this.position = position;
    }

   
public long getNextPostion() {
       
return nextPostion;
    }

   
public void setNextPostion(long nextPostion) {
       
this.nextPostion = nextPostion;
    }

   
public long getServerId() {
        
return serverId;
    }

   
public void setServerId(long serverId) {
       
this.serverId = serverId;
    }

   
public Map<String, String> getBefore() {
       
return before;
    }

   
public void setBefore(Map<String, String> before) {
       
this.before = before;
    }

   
public Map<String, String> getAfter() {
       
return after;
    }

   
public void setAfter(Map<String, String> after) {
       
this.after = after;
    }

   
public Boolean getDdl() {
       
return isDdl;
    }

   
public void setDdl(Boolean ddl) {
       
isDdl = ddl;
    }

   
public String getSql() {
       
return sql;
    }

   
public void setSql(String sql) {
       
this.sql = sql;
    }


}

public class CDCEventManager {

   
public static final ConcurrentLinkedDeque<LogEvent> queue = new ConcurrentLinkedDeque<LogEvent>();

}

MysqlConnection:
public class MysqlConnection {

   
private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

   
private static Connection conn;

   
private static String host;
   
private static int port;
   
private static String user;
   
private static String password;

   
public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg) {
       
try {
           
if (conn == null || conn.isClosed()) {
                Class.forName(
"com.mysql.jdbc.Driver");

               
host = hostArg;
               
port = portArg;
               
user = userArg;
               
password = passwordArg;

               
conn = DriverManager.getConnection("jdbc:mysql://" + host + ":" + port + "/", user, password);
               
logger.info("connected to mysql:{} : {}", user, password);
            }
        }
catch (ClassNotFoundException e) {
           
logger.error(e.getMessage(), e);
        }
catch (SQLException e) {
           
logger.error(e.getMessage(), e);
        }
    }

   
public static Connection getConnection() {
       
try {
           
if (conn == null || conn.isClosed()) {
                setConnection(
host, port, user, password);
            }
        }
catch (SQLException e) {
           
logger.error(e.getMessage(), e);
        }
       
return conn;
    }

   
/**
     *
获取Column信息
     *
     * @return
    
*/
   
public static Map<String, List<ColumnInfo>> getColumns() {
        Map<String, List<ColumnInfo>> cols =
new HashMap<>();
        Connection conn = getConnection();

       
try {
            DatabaseMetaData metaData = conn.getMetaData();
            ResultSet r = metaData.getCatalogs();
            String tableType[] = {
"TABLE"};
           
while (r.next()) {
                String databaseName = r.getString(
"TABLE_CAT");
                ResultSet result = metaData.getTables(databaseName,
null, null, tableType);
               
while (result.next()) {
                    String tableName = result.getString(
"TABLE_NAME");
                   
//                  System.out.println(result.getInt("TABLE_ID"));
                   
String key = databaseName + "." + tableName;
                    ResultSet colSet = metaData.getColumns(databaseName,
null, tableName, null);
                    cols.put(key,
new ArrayList<ColumnInfo>());
                   
while (colSet.next()) {
                        ColumnInfo columnInfo =
new ColumnInfo(colSet.getString("COLUMN_NAME"),
                            colSet.getString(
"TYPE_NAME"));
                        cols.get(key).add(columnInfo);
                    }

                }
            }
        }
catch (SQLException e) {
           
logger.error(e.getMessage(), e);
        }
       
return cols;
    }

   
/**
     *
参考 mysql> show binary logs +------------------+-----------+ | Log_name         | File_size |
     * +------------------+-----------+ | mysql-bin.000001 |       126 | | mysql-bin.000002 |       126 | |
     * mysql-bin.000003 |      6819 | | mysql-bin.000004 |      1868 | +------------------+-----------+
     */
   
public static List<BinlogInfo> getBinlogInfo() {
        List<BinlogInfo> binlogList =
new ArrayList<>();

        Connection conn =
null;
        Statement statement =
null;
        ResultSet resultSet =
null;

       
try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery(
"show binary logs");
            
while (resultSet.next()) {
                BinlogInfo binlogInfo =
new BinlogInfo(resultSet.getString("Log_name"), resultSet.getLong("File_size"));
                binlogList.add(binlogInfo);
            }
        }
catch (Exception e) {
           
logger.error(e.getMessage(), e);
        }
finally {
           
try {
               
if (resultSet != null) { resultSet.close(); }
               
if (statement != null) { statement.close(); }
               
if (conn != null) { conn.close(); }
            }
catch (SQLException e) {
               
logger.error(e.getMessage(), e);
            }
        }

       
return binlogList;
    }

   
/**
     *
参考: mysql> show master status; +------------------+----------+--------------+------------------+ | File |
     * Position | Binlog_Do_DB | Binlog_Ignore_DB | +------------------+----------+--------------+------------------+ |
     * mysql-bin.000004 |     1868 |              |                  |
     * +------------------+----------+--------------+------------------+
     *
     * @return
    
*/
   
public static BinlogMasterStatus getBinlogMasterStatus() {
        BinlogMasterStatus binlogMasterStatus =
new BinlogMasterStatus();
        Connection conn =
null;
        Statement statement =
null;
        ResultSet resultSet =
null;

       
try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery(
"show master status");
           
while (resultSet.next()) {
                binlogMasterStatus.setBinlogName(resultSet.getString(
"File"));
                binlogMasterStatus.setPosition(resultSet.getLong(
"Position"));
            }
        }
catch (Exception e) {
           
logger.error(e.getMessage(), e);
        }
finally {
            
try {
               
if (resultSet != null) { resultSet.close(); }
               
if (statement != null) { statement.close(); }
               
if (conn != null) { conn.close(); }
            }
catch (SQLException e) {
               
logger.error(e.getMessage(), e);
            }
        }

       
return binlogMasterStatus;
    }

   
/**
     *
获取open-replicator所连接的mysql服务器的serverid信息
     *
     * @return
    
*/
   
public static int getServerId() {
       
int serverId = 6789;
        Connection conn =
null;
        Statement statement =
null;
        ResultSet resultSet =
null;
       
try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery(
"show variables like 'server_id'");
           
while (resultSet.next()) {
                serverId = resultSet.getInt(
"Value");
            }
        }
catch (Exception e) {
           
logger.error(e.getMessage(), e);
        }
finally {
           
try {
                
if (resultSet != null) { resultSet.close(); }
               
if (statement != null) { statement.close(); }
               
if (conn != null) { conn.close(); }
            }
catch (SQLException e) {
               
logger.error(e.getMessage(), e);
            }
        }
       
return serverId;
    }

}
 
 
NotificationListener:
public class NotificationListener  implements BinlogEventListener {

   
private static Logger logger = LoggerFactory.getLogger(NotificationListener.class);

   
@Override
   
public void onEvents(BinlogEventV4 event) {
       
if (event == null) {
           
logger.error("binlog event is null");
           
return;
        }
       
int eventType = event.getHeader().getEventType();
        System.
out.println("eventType---->"+ MySqlEventTypeIdToString.getInstance().get(eventType));
       
switch (eventType) {
           
case MySQLConstants.FORMAT_DESCRIPTION_EVENT: {
               
logger.trace("FORMAT_DESCRIPTION_EVENT");
               
break;
            }
           
case MySQLConstants.TABLE_MAP_EVENT:
               
//每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId
           
{
                TableMapEvent tme = (TableMapEvent)event;
                TableInfoKeeper.saveTableIdMap(tme);
               
logger.trace("TABLE_MAP_EVENT:tableId:{}", tme.getTableId());
               
break;
            }
           
case MySQLConstants.DELETE_ROWS_EVENT: {
                DeleteRowsEvent dre = (DeleteRowsEvent)event;
                
long tableId = dre.getTableId();
               
logger.trace("DELETE_ROW_EVENT:tableId:{}", tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Row> rows = dre.getRows();
               
for (Row row : rows) {
                    List<Column> before = row.getColumns();
                    Map<String, String> beforeMap = getMap(before, databaseName, tableName);
                   
if (beforeMap != null && beforeMap.size() > 0) {
                        LogEvent cdcEvent =
new LogEvent(dre, databaseName, tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setBefore(beforeMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}", cdcEvent);
                    }
                }
               
break;

            }
           
case MySQLConstants.UPDATE_ROWS_EVENT:
            {
                UpdateRowsEvent upe = (UpdateRowsEvent)event;
               
long tableId = upe.getTableId();
               
logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Pair<Row>> rows = upe.getRows();
               
for(Pair<Row> p:rows){
                    List<Column> colsBefore = p.getBefore().getColumns();
                    List<Column> colsAfter = p.getAfter().getColumns();
                    Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
                    Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
                   
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
                        LogEvent cdcEvent =
new LogEvent(upe,databaseName,tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setBefore(beforeMap);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
               
break;
            }
           
case MySQLConstants.UPDATE_ROWS_EVENT_V2:
            {
                UpdateRowsEventV2 upe = (UpdateRowsEventV2)event;
               
long tableId = upe.getTableId();
               
logger.info("UPDATE_ROWS_EVENT_V2:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Pair<Row>> rows = upe.getRows();
                
for(Pair<Row> p:rows){
                    List<Column> colsBefore = p.getBefore().getColumns();
                    List<Column> colsAfter = p.getAfter().getColumns();
                    Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
                    Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
                   
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
                        LogEvent cdcEvent =
new LogEvent(upe,databaseName,tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setBefore(beforeMap);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
               
break;
            }
           
case MySQLConstants.WRITE_ROWS_EVENT :
                {
                    WriteRowsEventV2 wre = (WriteRowsEventV2)event;
               
long tableId = wre.getTableId();
               
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Row> rows = wre.getRows();
               
for(Row row: rows){
                    List<Column> after = row.getColumns();
                    Map<String,String> afterMap = getMap(after,databaseName,tableName);
                   
if(afterMap!=null && afterMap.size()>0){
                        LogEvent cdcEvent =
new LogEvent(wre,databaseName,tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
               
break;
            }
           
case   MySQLConstants.WRITE_ROWS_EVENT_V2:
            {
                WriteRowsEventV2 wre = (WriteRowsEventV2)event;
               
long tableId = wre.getTableId();
               
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Row> rows = wre.getRows();
               
for(Row row: rows){
                    List<Column> after = row.getColumns();
                    Map<String,String> afterMap = getMap(after,databaseName,tableName);
                   
if(afterMap!=null && afterMap.size()>0){
                        LogEvent cdcEvent =
new LogEvent(wre,databaseName,tableName);
                        cdcEvent.setDdl(
false);
                        cdcEvent.setSql(
null);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.
queue.addLast(cdcEvent);
                       
logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
               
break;
            }
           
case MySQLConstants.QUERY_EVENT:
            {
                QueryEvent qe = (QueryEvent)event;
                TableInfo tableInfo = createTableInfo(qe);
               
if(tableInfo == null){
                   
break;
                }

                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();
               
logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);

                LogEvent cdcEvent =
new LogEvent(qe,databaseName,tableName);
                cdcEvent.setDdl(
true);
                cdcEvent.setSql(qe.getSql().toString());
                CDCEventManager.
queue.addLast(cdcEvent);
               
logger.info("cdcEvent:{}",cdcEvent);
               
break;
            }
           
case MySQLConstants.XID_EVENT:{
                XidEvent xe = (XidEvent)event;
               
logger.trace("XID_EVENT: xid:{}",xe.getXid());
               
break;
            }
           
default:
            {
               
logger.trace("DEFAULT:{}",eventType);
               
break;
            }

        }
    }


   
/**
     * ROW_EVENT
中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
     * 然后跟取回的List<Column>进行映射。
     *
     * @param
cols
    
* @param databaseName
    
* @param tableName
    
* @return
    
*/
   
private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
        Map<String,String> map =
new HashMap<>();
       
if(cols == null || cols.size()==0){
           
return null;
        }
        String fullName = databaseName+
"."+tableName;
        List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
       
if(columnInfoList == null)
           
return null;
       
if(columnInfoList.size() != cols.size()){
            TableInfoKeeper.refreshColumnsMap();
           
if(columnInfoList.size() != cols.size())
            {
               
logger.warn("columnInfoList.size is not equal to cols.");
               
return null;
            }
        }

       
for(int i=0;i<columnInfoList.size(); i++){
           
if(cols.get(i).getValue()==null)
                map.put(columnInfoList.get(i).getName(),
"");
           
else
               
map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
        }

       
return map;
    }

   
/**
     *
从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
     * 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
     *
     * @param
qe
    
* @return
    
*/
   
private TableInfo createTableInfo(QueryEvent qe){
        String sql = qe.getSql().toString().toLowerCase();
       
if("begin".equals(sql)){
           
return null;
        }
        TableInfo ti =
new TableInfo();
        String databaseName = qe.getDatabaseName().toString();
        String tableName =
null;
       
if(checkFlag(sql,"table")){
            tableName = getTableName(sql,
"table");
        }
else if(checkFlag(sql,"truncate")){
            tableName = getTableName(sql,
"truncate");
        }
else{
            
logger.warn("can not find table name from sql:{}",sql);
           
return null;
        }
        ti.setDatabaseName(databaseName);
        ti.setTableName(tableName);
        ti.setFullName(databaseName+
"."+tableName);

       
return ti;
    }

   
private boolean checkFlag(String sql, String flag){
        String[] ss = sql.split(
" ");
       
for(String s:ss){
           
if(s.equals(flag)){
               
return true;
            }
        }
       
return false;
    }

   
private String getTableName(String sql, String flag){
        String[] ss = sql.split(
"\\.");
        String tName =
null;
       
if (ss.length > 1) {
            String[] strs = ss[
1].split(" ");
            tName = strs[
0];
        }
else {
            String[] strs = sql.split(
" ");
           
boolean start = false;
           
for (String s : strs) {
               
if (s.indexOf(flag) >= 0) {
                    start =
true;
                   
continue;
                }
               
if (start && !s.isEmpty()) {
                    tName = s;
                   
break;
                }
            }
        }
        tName.replaceAll(
"`", "").replaceAll(";", "");

       
//del "("[create table person(....]
       
int index = tName.indexOf('(');
       
if(index>0){
            tName = tName.substring(
0, index);
        }

       
return tName;
    }

}
 
 
TableInfo:
public class TableInfo {

   
private String databaseName;
   
private String tableName;
   
private String fullName;
   
// 省略Getter和Setter

   
public String getDatabaseName() {
       
return databaseName;
    }

   
public void setDatabaseName(String databaseName) {
       
this.databaseName = databaseName;
    }

   
public String getTableName() {
       
return tableName;
    }

   
public void setTableName(String tableName) {
       
this.tableName = tableName;
    }

   
public String getFullName() {
       
return fullName;
    }

   
public void setFullName(String fullName) {
       
this.fullName = fullName;
    }

   
@Override
   
public boolean equals(Object o) {
       
if (this == o) { return true; }
       
if (o == null || getClass() != o.getClass()) { return false; }
        TableInfo tableInfo = (TableInfo)o;
       
return Objects.equals(databaseName, tableInfo.databaseName) &&
            Objects.equals(
tableName, tableInfo.tableName) &&
            Objects.equals(
fullName, tableInfo.fullName);
    }

   
@Override
   
public int hashCode() {
       
return Objects.hash(databaseName, tableName, fullName);
    }
}
 
 
TableInfoKeeper:
public class TableInfoKeeper {

   
private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);

   
private static Map<Long, TableInfo> tabledIdMap = new ConcurrentHashMap<>();
   
private static Map<String, List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();

   
static {
       
columnsMap = MysqlConnection.getColumns();
    }

   
public static void saveTableIdMap(TableMapEvent tme) {
       
long tableId = tme.getTableId();
       
tabledIdMap.remove(tableId);

        TableInfo table =
new TableInfo();
        table.setDatabaseName(tme.getDatabaseName().toString());
        table.setTableName(tme.getTableName().toString());
        table.setFullName(tme.getDatabaseName() +
"." + tme.getTableName());

        
tabledIdMap.put(tableId, table);
    }

   
public static synchronized void refreshColumnsMap() {
        Map<String, List<ColumnInfo>> map = MysqlConnection.getColumns();
       
if (map.size() > 0) {
           
//          logger.warn("refresh and clear cols.");
           
columnsMap = map;
           
//          logger.warn("refresh and switch cols:{}",map);
       
} else {
           
logger.error("refresh columnsMap error.");
        }
    }

   
public static TableInfo getTableInfo(long tableId) {
       
return tabledIdMap.get(tableId);
    }

   
public static List<ColumnInfo> getColumns(String fullName) {
       
return columnsMap.get(fullName);
    }

}