一个增量数据收集的解决方案

December . 24 . 2020
  • 前言

项目 GitHub 地址: springboot-binlog

1. 下载项目

2. 导入 resources 目录下的 test.sql 文件

3. 拷贝 application.yml.example 文件至 application.yml

4. 修改相关数据库配置

5. 运行 Application 主类

  • 增量数据到底指什么

在说明增量数据之前,我们先要知道一个概念:全量数据。全量数据并不是说业务系统中所有的数据,而是说初始数据。下面, 我以具体例子来说明这两个概念。

一个系统的基本数据包括:系统配置, 系统字典(你可以发散为其他业务系统中的数据,这并不重要)。在系统上线之前,这些数据必须是具备的,否则,线上没有数据就没有意义了。而这些上线之前就已经准备好的数据,就被称作是初始数据,也叫做全量数据。系统在运行过程中,必然会存在新数据插入、老数据更新或删除,而这些引起变动的数据就被称为增量数据。

  • 认识 binlog

DDL 是 SQL 语言的四大功能之一,即数据定义语言。它用于定义数据库的三级结构,包括外模式、概念模式、内模式,以及定义数据的完整性、安全控制等约束。DDL 最大的特点是,它不需要COMMIT。DDL 语句包含:CREATE、ALTER、DROP、TRUNCATE 等等。DML 即数据操纵语言。DML 语句包含:SELECT、INSERT、UPDATE、DELETE、CALL、EXPLAIN PLAN 等等。MySQL 的 Binlog 会记录所有的 DDL 和 DML(除了不改变数据的语句,例如SELECT、SHOW 等等)语句。它会以事件的形式记录,包含语句执行所消耗的时间,且是事务安全的.

  MySQL Binlog 记录的所有操作都会有事件类型与之对应,且不同的 Binlog 格式(ROW、STATEMENT 等)会有不同的事件类型,但是,基本上是类似的。下面,我以最常见的 ROW 格式去对事件类型做介绍。

对于 ROW 格式的 Binlog 来说,所有的 DML(由于这里是收集增量数据,所以,不对 DDL 做深入探究)操作都会记录在 ROWS_EVENT 中, ROWS_EVENT 包含三种: UPDATE_ROWS_EVENT 、WRITE_ROWS_EVENT 以及 DELETE_ROWS_EVENT。

它们分别代表:

  1. UPDATE_ROWS_EVENT:对应 UPDATE 操作,Binlog 中不仅包含了修改后的数据,还包含了修改前的数据
  2. WRITE_ROWS_EVENT:对应 INSERT 操作,Binlog 中记录了被插入的数据
  3. DELETE_ROWS_EVENT:对应 DELETE 操作,Binlog 中记录了被删除的数据

另外, 对 应 每 一 个 ROWS_EVENT 事 件 之 前 都 会 有 一 个TABLE_MAP_EVENT,用于描述操作表的相关信息。

  • Master/Slave 协议

    在实际操作之前,还需要先搞清楚 MySQL 的 Master/Slave 协议。下面这张图(来源于网络)标识的MySQL 的主从复制过程,其中包含了两个线程:IO 线程(Master 端)和 SQL 线程(Slave 端)。

                              20180725155158623.jpg

     可以清晰的看到,MySQL 主从复制依赖于 Binlog:Master 服务器接收到来自 Slave 服务器的 IO 线程的请求后,Master 服务器上的 IO 线程根据 Slave 服务器的 IO 线程请求的信息,读取指定 Binlog 日志文件指定位置之后的 Binlog 日志信息,然后返回给 Slave 端的 IO 线程。 由于 Binlog 中包含了完整的数据库变更信息,Slave 只需要 “重现一遍” Binlog 日志记录,就可以达到与 Master 一致的状态了。

    我们想要实时得到 Binlog 的变更信息,其实就需要 Master/Slave 协议,也就是说我们需要把应用 “伪装” 成 Slave,去监听 Master 的 Binlog 变更,并等待 Master 同步变更日志。

  • 监听 Binlog 的设计

    目前市面上有很多开源框架可以实现(按照 Master/Slave 协议的接口要求实现即可监听到Binlog,按照 Binlog 的格式规范解析即可得到可读信息)监听 Binlog 并把它解释为可读的信息,这里使用的是 mysql-binlog-connector-java 它代码简洁, 可读性好。当然还有 alibaba-canal 也是非常不错的。

   理解了从 Binlog 中获取增量数据的原理,也确定了监听、解析 Binlog 的框架,接下来,就可以去实际操作了。

需求目标: 

1. 实现对 sys_config 表数据变动的监听并实时同步到 sys_config_copy 表中.

2. 断线恢复机制 (监听服务宕机到重启期间的数据不丢失)

首先构造一个 Object 用于存储解析的 Binlog 信息:

package com.niu.springboot.binlog.domain.dto;

...

/**
 * Binlog 数据
 *
 * @author [nza]
 * @version 1.0 [2020/12/21 11:21]
 * @createTime [2020/12/21 11:21]
 */
@Data
@Accessors(chain = true)
public class BinlogRowDataDTO {

    /**
     * 数据库名
     */
    private String schemaName;

    /**
     * 表名
     */
    private String tableName;

    /**
     * 当前记录的位置
     */
    private Long curPosition;

    /**
     * 下一个记录的位置
     */
    private Long nextPosition;

    /**
     * 主键
     */
    private List primaryKeys;

    /**
     * 事件类型
     */
    private EventType eventType;

    /**
     * 更新之后的数据,对于删除类型来说,即为空
     */
    private List> after;

    /**
     * 更新之前的数据,对于插入类型来说,即为空
     */
    private List> before;
}

有了这些定义之后,我们就可以使用框架来监听并解析 Binlog 日志了, 代码如下:

(这里只展示关键代码, 具体可下载源码查看)

package com.niu.springboot.binlog.service.impl;

...

/**
 * 数据收集业务实现类
 *
 * @author [nza]
 * @version 1.0 [2020/12/21 11:14]
 * @createTime [2020/12/21 11:14]
 */
@Service
@Slf4j
public class DataCollectionServiceImpl implements DataCollectionService {

	...

    @Override
    public void collectionIncrementalData(Event event) {

        // 获取到事件类型
        EventType type = event.getHeader().getEventType();

        // 切换了 binlog 文件
        if (handleBinlogFileChange(type, event)) {
            return;
        }

        // 设置表信息
        optionTableInfo(event, type);

        // 判断是否可以收集
        if (!canCollection(type)) {
            return;
        }

        // 执行收集逻辑
        doCollection(event, type);
    }

    /**
     * 处理 binlog 文件切换事件
     *
     * @param type  事件类型
     * @param event binlog事件
     * @return boolean
     * @author nza
     * @createTime 2020/12/22 14:13
     */
    private boolean handleBinlogFileChange(EventType type, Event event) {
        if (EventType.ROTATE.equals(type)) {
            // 更新 binlog 文件相关记录配置
            String originalFile = sysDictionaryService.getValByKey(SysDictionaryEnum.BIN_LOG_FILE_NAME);
            String binlogFilename = ((RotateEventData) event.getData()).getBinlogFilename();
            // 如果文件未变化忽略即可
            if (StringUtils.equals(binlogFilename, originalFile)) {
                return true;
            }

            isBinlogChanged = true;
            sysDictionaryService.updateByKey(SysDictionaryEnum.BIN_LOG_FILE_NAME, binlogFilename);
            return true;
        }
        if (EventType.FORMAT_DESCRIPTION.equals(type) && isBinlogChanged) {
            // 更新 binlog 开始位置记录配置
            long nextPosition = ((EventHeaderV4) event.getHeader()).getNextPosition();
            sysDictionaryService.updateByKey(SysDictionaryEnum.BIN_LOG_NEXT_POSITION, String.valueOf(nextPosition));
            isBinlogChanged = false;
            return true;
        }
        return false;
    }

    /**
     * 执行收集逻辑
     *
     * @param event binlog 事件
     * @param type  事件类型
     * @throws {@link Exception} 收集失败抛出
     * @author nza
     * @createTime 2020/12/21 14:36
     */
    private void doCollection(Event event, EventType type) {
        try {
            // 查询表映射信息
            Map dbPosMap = getDbPosMap(dbName, tableName);

            // 构造 BinlogRowData 对象
            BinlogRowDataDTO rowData = buildRowData(event.getData(), type, dbPosMap);
            rowData.setNextPosition(((EventHeaderV4) event.getHeader()).getNextPosition());
            rowData.setCurPosition(((EventHeaderV4) event.getHeader()).getPosition());

            log.info("收集完成: {}", rowData);

            // 将数据变动同步到备份表
            doBackup(rowData);


        } catch (Exception ex) {
            log.error("收集增量数据发送异常, 异常信息: ", ex);
        } finally {
            // 重置库名和表名
            this.dbName = null;
            this.tableName = null;
        }
    }

    /**
     * 将数据变动同步到备份表
     *
     * @param rowData 源数据
     * @author nza
     * @createTime 2020/12/23 14:33
     */
    private void doBackup(BinlogRowDataDTO rowData) {
        String table = "sys_config_copy";
        for (String sql : rowData.getSql(table)) {
            int res = jdbcTemplate.update(sql);
            log.info("同步完成, 影响行: {}", res);
        }

        // 更新配置表
        sysDictionaryService.updateByKey(SysDictionaryEnum.BIN_LOG_NEXT_POSITION, String.valueOf(rowData.getNextPosition()));
    }

    /**
     * 校验是否可以收集
     *
     * @param type 事件类型
     * @return boolean true 可以 false 不可以
     * @author nza
     * @createTime 2020/12/21 14:34
     */
    private boolean canCollection(EventType type) {
        // 如果不是更新、插入、删除事件, 直接忽略即可
        if (!ALLOW_COLLECTION_TYPES.contains(type)) {
            return false;
        }

        // 表名和库名是否已经完成填充
        if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tableName)) {
            log.error("no meta data event");
            return false;
        }

        // 是否在数据库白名单中
        if (!ALLOW_COLLECTION_SCHEMAS.contains(dbName)) {
            return false;
        }

        // 是否在数据库表白名单中
        return ALLOW_COLLECTION_TABLES.contains(tableName);
    }
	
	...
}

,>

  可以看出,监听、解析 Binlog 的过程其实就是填充 BinlogRowData 对象的过程,毕竟 BinlogRowData 代表的就是 “格式化” 之后的增量数据。收集到增量数据之后,更新索引或缓存就是简单的业务逻辑了。

  • 总结

 Binlog 是 MySQL 日志系统的核心组成,对于线上应用来说,推荐打开 Binlog 开关。随着需求的发展,它的作用不仅仅是局限于主从同步和数据恢复,甚至可以用于增量数据的收集,这无疑是让人非常兴奋的

  • 参考资料

-  mysql-binlog-connector-java 

-  alibaba-canal