栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Windows安装MySQL服务并进行binlog解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Windows安装MySQL服务并进行binlog解析

1.官网下载 MySQL Community Server

MySQL :: Download MySQL Community Server

这里下载的是Windows (x86, 64-bit), ZIP Archive mysql-8.0.29-winx64.zip

2.下载完成后将ZIP解压到 D:Toolsmysql-8.0.29-winx64

3.初始化

命令行执行 mysqld --initialize

此时会在 D:Toolsmysql-8.0.29-winx64目录下生成data目录

4.安装MySQL服务

命令行执行 mysqld --install

5.启动MySQL服务

命令行执行 net start mysql

6.查看MySQL默认密码

  D:Toolsmysql-8.0.29-winx64data 目录下生成了一个.err文件,其中打印了密码

 此时使用 root/XZhgyCLcg7?t即可登录MySQL

7.配置文件

在D:Toolsmysql-8.0.29-winx64新建my.ini文件

[mysql]
# 设置mysql客户端默认字符集
default-character-set=utf8
[mysqld]
#设置3306端口
port = 3306
# 设置mysql的安装目录
basedir=D:Toolsmysql-8.0.29-winx64
# 设置mysql数据库的数据的存放目录
datadir=D:Toolsmysql-8.0.29-winx64data
# 允许最大连接数
max_connections=200
# 服务端使用的字符集默认为8比特编码的latin1字符集
character-set-server=utf8
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB

#开启binlog日志
log_bin=ON
#设置日志三种格式:STATEMENT、ROW、MIXED 。
binlog_format=ROW
#配置serverid
server-id=1
#设置binlog清理时间
expire_logs_days=7
#binlog每个日志文件大小
max_binlog_size=100m
#binlog缓存大小
binlog_cache_size=4m
#最大binlog缓存大小
max_binlog_cache_size=512m

重启MySQL服务,即可生效。

上面的配置文件中开启了binlog

8.解析binlog文件

执行insert update之后,对binlog文件进行解析

 


	com.github.shyiko
	mysql-binlog-connector-java
	0.21.0
public class BinlogTest extends BaseTests {

    private static final Logger log = LoggerFactory.getLogger(BinlogTest.class);

    @Test
    public void test() throws Exception {
        File file = new File("D:\Tools\mysql-8.0.29-winx64\data\ON.000002");
  
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        BinaryLogFileReader reader = new BinaryLogFileReader(file, eventDeserializer);
        try {
            for (Event event; (event = reader.readEvent()) != null; ) {
                EventData data = event.getData();
                System.out.println(data);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            reader.close();
        }

    }
}

9.实时监听binlog事件并解析

spring.datasource.host=localhost
spring.datasource.port=3306
spring.datasource.databaseName=test
spring.datasource.url=jdbc:mysql://${spring.datasource.host}:${spring.datasource.port}/${spring.datasource.databaseName}?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=
binlog.table.names=label,product
@Slf4j
@Component
public class CommandLineRunnerImpl implements CommandLineRunner {

    @Value("${spring.datasource.host}")
    private String dbHost;
    @Value("${spring.datasource.port}")
    private int dbPort;
    @Value("${spring.datasource.databaseName}")
    private String databaseName;
    @Value("${spring.datasource.username}")
    private String username;
    @Value("${spring.datasource.password}")
    private String password;
    @Value("#{'${binlog.table.names}'.split(',')}")
    private Set binlogTableNames;
    @Resource
    private ColumnsMapper columnsMapper;

    @Override
    public void run(String... args) throws Exception {
        BinaryLogClient client = new BinaryLogClient(dbHost, dbPort, username, password);
        client.setServerId(2);

        Map> tableNameColumnMap = Maps.newLinkedHashMap();
        Map> tableIdColumnMap = Maps.newLinkedHashMap();

        for (String binlogTableName : binlogTableNames) {
            tableNameColumnMap.put(binlogTableName, Maps.newLinkedHashMap());
            List columnsDOS = columnsMapper.queryList(databaseName, binlogTableName);
            for (ColumnsDO columnsDO : columnsDOS) {
                tableNameColumnMap.get(binlogTableName).put(columnsDO.getOrdinalPosition() - 1, columnsDO.getColumnName());
            }
        }

        client.registerEventListener(event -> {
            EventData eventData = event.getData();
            if (eventData instanceof TableMapEventData) {
                TableMapEventData tableMapEventData = (TableMapEventData) eventData;
                String tableName = tableMapEventData.getTable();
                if (binlogTableNames.contains(tableName)) {
                    tableIdColumnMap.put(tableMapEventData.getTableId(), tableNameColumnMap.get(tableName));
                }
            }
            if (eventData instanceof UpdateRowsEventData) {
                UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
                long tableId = updateRowsEventData.getTableId();
                Map columnMap = tableIdColumnMap.get(tableId);
                if (columnMap == null) {
                    return;
                }
                for (Map.Entry row : updateRowsEventData.getRows()) {
                    Serializable[] before = row.getKey();
                    Serializable[] after = row.getValue();

                    Map beforeMap = Maps.newLinkedHashMap();
                    Map afterMap = Maps.newLinkedHashMap();

                    for (int i = 0; i < before.length; i++) {
                        beforeMap.put(columnMap.get((long) i), before[i]);
                        afterMap.put(columnMap.get((long) i), after[i]);
                    }

                    log.info("beforeMap={}", JSON.toJSONString(beforeMap));
                    log.info("afterMap={}", JSON.toJSONString(afterMap));
                }
            } else if (eventData instanceof WriteRowsEventData) {
                WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
                long tableId = writeRowsEventData.getTableId();
                Map columnMap = tableIdColumnMap.get(tableId);
                if (columnMap == null) {
                    return;
                }
                List rows = writeRowsEventData.getRows();
                Map afterMap = Maps.newLinkedHashMap();
                for (Serializable[] row : rows) {
                    for (int i = 0; i < row.length; i++) {
                        afterMap.put(columnMap.get((long) i), row[i]);
                    }
                    log.info("afterMap={}", JSON.toJSONString(afterMap));
                }
            } else if (eventData instanceof DeleteRowsEventData) {
                DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
                long tableId = deleteRowsEventData.getTableId();
                Map columnMap = tableIdColumnMap.get(tableId);
                if (columnMap == null) {
                    return;
                }
                List rows = deleteRowsEventData.getRows();
                Map beforeMap = Maps.newLinkedHashMap();
                for (Serializable[] row : rows) {
                    for (int i = 0; i < row.length; i++) {
                        beforeMap.put(columnMap.get((long) i), row[i]);
                    }
                    log.info("beforeMap={}", JSON.toJSONString(beforeMap));
                }
            } else {
                System.out.println(eventData.getClass().getName());
                System.out.println(eventData.toString());
            }
        });

        try {
            client.connect();
        } catch (IOException e) {
            log.error("client.connect error.", e);
        }
    }
}

需要读取表结构信息,以便能正确解析binlog事件 

public interface ColumnsMapper {

    List queryList(@Param("tableSchema") String tableSchema, @Param("tableName") String tableName);
}


@Data
public class ColumnsDO {

    
    private String tableCatalog;

    
    private String tableSchema;

    
    private String tableName;

    
    private String columnName;

    
    private Long ordinalPosition;

    
    private String columnDefault;

    
    private String isNullable;

    
    private String dataType;

    
    private Long characterMaximumLength;

    
    private Long characterOctetLength;

    
    private Long numericPrecision;

    
    private Long numericScale;

    
    private Long datetimePrecision;

    
    private String characterSetName;

    
    private String collationName;

    
    private String columnType;

    
    private String columnKey;

    
    private String extra;

    
    private String privileges;

    
    private String columnComment;

    
    private String isGenerated;

    
    private String generationExpression;
}





    
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
    

    
        table_catalog,
        table_schema,
        table_name,
        column_name,
        ordinal_position,
        column_default,
        is_nullable,
        data_type,
        character_maximum_length,
        character_octet_length,
        numeric_precision,
        numeric_scale,
        datetime_precision,
        character_set_name,
        collation_name,
        column_type,
        column_key,
        extra,
        privileges,
        column_comment,
        is_generated,
        generation_expression
    

    


PS:MariaDB和MySQL的binlog文件有差异,使用以上代码对MariaDB的binlog文件进行解析时会出现异常。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/850812.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号