切換語言為:簡體
基於Spring監聽Binlog日誌說明

基於Spring監聽Binlog日誌說明

  • 爱糖宝
  • 2024-09-06
  • 2048
  • 0
  • 0

binlog的三種模式

MySQL 的二進制日誌(binlog)有三種不同的格式,通常被稱為 binlog 模式。這三種模式分別是 Statement 模式、Row 模式和Mixed 模式。

Statement 模式:

  • 在 Statement 模式下,MySQL 記錄每個會更改資料的 SQL 語句。

  • binlog 記錄的是執行的 SQL 語句本身,而不是具體的資料變化。

  • 例如,如果執行了 UPDATE 語句,binlog 記錄的是這個 UPDATE 語句的文字。

Row 模式:

  • 在 Row 模式下,MySQL 記錄每一行資料的變化。

  • binlog 記錄的是行資料的變化,而不是 SQL 語句。

  • 例如,如果執行了 UPDATE 語句,binlog 記錄的是被修改的行的實際資料。

Mixed 模式:

  • Mixed 模式是 Statement 模式和 Row 模式的結合。

  • 在 Mixed 模式下,MySQL 根據執行的 SQL 語句的型別來決定是記錄語句還是記錄行。

  • 通常,對於簡單的語句,使用 Statement 模式,對於涉及到行變化的複雜語句,使用 Row 模式。

這些模式可以透過 MySQL 配置檔案中的 binlog_format 引數進行配置。例如:

[mysqld]
binlog_format=mixed


其中,statementrowmixed 分別代表 Statement 模式、Row 模式和 Mixed 模式。選擇適當的 binlog 模式取決於應用的特定需求和效能要求。不同的模式具有不同的優劣勢,例如,Statement 模式可能會更輕量,而 Row 模式可能提供更詳細的資料變化資訊。

以Mixed 為例

檢視binlog是否開啟

show variables like '%log_bin%'


基於Spring監聽Binlog日誌說明

啟動springboot程式

基於Spring監聽Binlog日誌說明

新建資料庫

基於Spring監聽Binlog日誌說明

這個事件是一個 binlog 事件,其內容表示一個 SQL 查詢事件。讓我解釋一下這個事件的各個部分:

  • 事件型別 (eventType): 該事件的型別是 QUERY,表示這是一個 SQL 查詢事件。

  • 時間戳 (timestamp): 事件的時間戳為 1700045267000,表示事件發生的時間。

  • 執行緒ID (threadId): 執行緒ID 是 189,表示執行這個查詢的執行緒的識別符號。

  • 執行時間 (executionTime): 執行時間為 0,表示執行這個查詢所花費的時間。

  • 錯誤程式碼 (errorCode): 錯誤程式碼為 0,表示查詢執行沒有錯誤。

  • 資料庫 (database): 資料庫為 test2023,表示這個查詢發生在 test2023 資料庫中。

  • SQL 查詢 (sql): 實際的 SQL 查詢為 CREATE DATABASE test2023 CHARACTER SET utf8 COLLATE utf8_general_ci,表示執行了建立資料庫的操作。

這個事件的作用是在 test2023 資料庫中執行了一個建立資料庫的 SQL 查詢。這是 binlog 中的一部分,用於記錄資料庫中的變化,以便進行資料備份、主從同步等操作。

基於Spring監聽Binlog日誌說明

新建表資料

CREATE TABLE `t_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `userName` varchar(100) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;


基於Spring監聽Binlog日誌說明

這個事件也是一個 binlog 事件,表示一個 SQL 查詢事件。讓我解釋一下這個事件的各個部分:

  • 事件型別 (eventType): 該事件的型別是 QUERY,表示這是一個 SQL 查詢事件。

  • 時間戳 (timestamp): 事件的時間戳為 1700045422000,表示事件發生的時間。

  • 執行緒ID (threadId): 執行緒ID 是 204,表示執行這個查詢的執行緒的識別符號。

  • 執行時間 (executionTime): 執行時間為 0,表示執行這個查詢所花費的時間。

  • 錯誤程式碼 (errorCode): 錯誤程式碼為 0,表示查詢執行沒有錯誤。

  • 資料庫 (database): 資料庫為 test2023,表示這個查詢發生在 test2023 資料庫中。

  • SQL 查詢 (sql): 實際的 SQL 查詢為 CREATE TABLE t_user(idbigint(20) NOT NULL AUTO_INCREMENT,userName varchar(100) NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4,表示執行了在 test2023 資料庫中建立名為 t_user 的表的操作。

這個事件的作用是在 test2023 資料庫中建立了一個名為 t_user 的表,該表包含 iduserName 兩個欄位,其中 id 是自增的主鍵。這種型別的事件常常用於記錄資料庫結構的變化,以便進行資料備份、遷移和版本控制等操作。

基於Spring監聽Binlog日誌說明

插入表資料

INSERT INTO `test2023`.`t_user` (`id`, `userName`)
VALUES
    (
        "10086",
        "用心記錄技術,走心分享,始於後端,不止於後端,勵志成為一名優秀的全棧架構師,真正的實現碼中致富。"
    );


基於Spring監聽Binlog日誌說明

這個事件也是一個 binlog 事件,表示一個 SQL 查詢事件,具體如下:

  • 事件型別 (eventType): 該事件的型別是 QUERY,表示這是一個 SQL 查詢事件。

  • 時間戳 (timestamp): 事件的時間戳為 1700045547000,表示事件發生的時間。

  • 執行緒ID (threadId): 執行緒ID 是 204,表示執行這個查詢的執行緒的識別符號。

  • 執行時間 (executionTime): 執行時間為 0,表示執行這個查詢所花費的時間。

  • 錯誤程式碼 (errorCode): 錯誤程式碼為 0,表示查詢執行沒有錯誤。

  • 資料庫 (database): 資料庫為 test2023,表示這個查詢發生在 test2023 資料庫中。

  • SQL 查詢 (sql): 實際的 SQL 查詢為 INSERT INTO test2023.t_user (id, userName) VALUES ( "10086", "用心記錄技術,走心分享,始於後端,不止於後端,勵志成為一名優秀的全棧架構師,真正的實現碼中致富。",表示執行了向 test2023 資料庫的 t_user 表中插入一行資料的操作。

這個事件的作用是向 t_user 表中插入了一行資料,包含了 iduserName 兩個欄位的值。這種型別的事件通常用於記錄資料的變化,以便進行資料備份、同步和遷移等操作。

基於Spring監聽Binlog日誌說明

修改表資料修改表資料

修改表資料

UPDATE `test2023`.`t_user`
SET `id` = '10086',
 `userName` = '我的修改資料!!!'
WHERE
	(`id` = '10086');


基於Spring監聽Binlog日誌說明

這個事件同樣是一個 binlog 事件,表示一個 SQL 查詢事件,具體如下:

  • 事件型別 (eventType): 該事件的型別是 QUERY,表示這是一個 SQL 查詢事件。

  • 時間戳 (timestamp): 事件的時間戳為 1700045675000,表示事件發生的時間。

  • 執行緒ID (threadId): 執行緒ID 是 204,表示執行這個查詢的執行緒的識別符號。

  • 執行時間 (executionTime): 執行時間為 0,表示執行這個查詢所花費的時間。

  • 錯誤程式碼 (errorCode): 錯誤程式碼為 0,表示查詢執行沒有錯誤。

  • 資料庫 (database): 資料庫為 test2023,表示這個查詢發生在 test2023 資料庫中。

  • SQL 查詢 (sql): 實際的 SQL 查詢為 UPDATE test2023.t_userSETid= '10086', userName = '我的修改資料!!!' WHERE (id = '10086'),表示執行了更新 test2023 資料庫中的 t_user 表中一行資料的操作。

這個事件的作用是將 t_user 表中 id10086 的行的資料進行更新,將 id 修改爲 10086userName 修改爲 '我的修改資料!!!'。這種型別的事件通常用於記錄資料的變化,以便進行資料備份、同步和遷移等操作。

基於Spring監聽Binlog日誌說明

刪除表資料

DELETE
FROM
	t_user
WHERE
	id = '10086';


基於Spring監聽Binlog日誌說明

這個事件同樣是一個 binlog 事件,表示一個 SQL 查詢事件,具體如下:

  • 事件型別 (eventType): 該事件的型別是 QUERY,表示這是一個 SQL 查詢事件。

  • 時間戳 (timestamp): 事件的時間戳為 1700045755000,表示事件發生的時間。

  • 執行緒ID (threadId): 執行緒ID 是 204,表示執行這個查詢的執行緒的識別符號。

  • 執行時間 (executionTime): 執行時間為 0,表示執行這個查詢所花費的時間。

  • 錯誤程式碼 (errorCode): 錯誤程式碼為 0,表示查詢執行沒有錯誤。

  • 資料庫 (database): 資料庫為 test2023,表示這個查詢發生在 test2023 資料庫中。

  • SQL 查詢 (sql): 實際的 SQL 查詢為 DELETE FROM t_user WHERE id = '10086',表示執行了刪除 test2023 資料庫中的 t_user 表中一行資料的操作。

這個事件的作用是刪除 t_user 表中 id10086 的行。這種型別的事件通常用於記錄資料的刪除操作,以便進行資料備份、同步和遷移等操作。

基於Spring監聽Binlog日誌說明

總結: binlog_format 設定為 mixed 時,對於 INSERT、UPDATE 和 DELETE 操作,它們在 binlog 中的事件型別都會被表示為 QUERY 事件。這是因為在 mixed 模式下,MySQL 使用了不同的方式來記錄不同型別的操作,但在 binlog 中,它們都被包裝成了 QUERY 事件。

在 mixed 模式下:

  • 對於某些語句級別的操作(例如非確定性的語句或不支援事務的儲存引擎),會使用 STATEMENT 事件。

  • 對於其他一些情況,會使用 ROW 事件,將變更的行作為事件的一部分進行記錄。

這就是為什麼看到的 INSERT、UPDATE 和 DELETE 操作的事件型別都是 QUERY。在處理這些事件時,需要根據具體的 SQL 查詢語句或其他資訊來確定操作的型別。

原始碼示例

pom.xml

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.48</version> <!-- 檢視最新版本 -->
		</dependency>
			
		<dependency>
			<groupId>com.github.shyiko</groupId>
			<artifactId>mysql-binlog-connector-java</artifactId>
			<version>0.21.0</version>
		</dependency>


Java示例

package com.example.demo.listener;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.naming.AuthenticationException;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeoutException;

public class BinlogListenerMixed {

    private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class);

    private static final String MYSQL_HOST = "8.130.74.105";
    private static final int MYSQL_PORT = 3306;
    private static final String MYSQL_USERNAME = "root";
    private static final String MYSQL_PASSWORD = "zhang.ting.123";

    public static void main(String[] args) {
        try {
            BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
//            client.setBinlogFilename(null);
//            client.setBinlogPosition(-1); // 或者設定為其他適當的初始位置
//            client.setServerId(1);
//            client.setBinlogFilename("mysql-bin.000005");
//            client.setBinlogPosition(154);
            EventDeserializer eventDeserializer = new EventDeserializer();
            eventDeserializer.setCompatibilityMode(
                    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
            );
            logger.info("使用主機={}, 埠={}, 使用者名稱={}, 密碼={} 連線到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
            client.setEventDeserializer(eventDeserializer);
            client.registerEventListener(BinlogListenerMixed::handleEvent);
            client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() {
                @Override
                public void onConnect(BinaryLogClient client) {
                    logger.info("Connected to MySQL server");
                }

                @Override
                public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
                    logger.error("Communication failure with MySQL server", ex);
                }

                @Override
                public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
                    logger.error("Event deserialization failure", ex);
                }

                @Override
                public void onDisconnect(BinaryLogClient client) {
                    logger.warn("Disconnected from MySQL server");
                    // 在這裏新增重新連線或其他處理邏輯
                }
            });

            client.connect();
        } catch (IOException e) {
            logger.error("@@ 連線到 MySQL 時發生錯誤", e);
            logger.error("@@ Error connecting to MySQL", e);
        }
    }

    private static void handleEvent(Event event) {
        logger.info("@@ 列印 event: {}", event);
        logger.info("@@ Received event type: {}", event.getHeader().getEventType());

        switch (event.getHeader().getEventType()) {
            case WRITE_ROWS:
            case EXT_WRITE_ROWS:
                handleWriteRowsEvent((WriteRowsEventData) event.getData());
                break;
            case QUERY:
                handleQueryEvent((QueryEventData) event.getData());
                break;
            case TABLE_MAP:
                handleTableMapEvent((TableMapEventData) event.getData());
                break;
            // 其他事件處理...
        }
    }

    private static void handleWriteRowsEvent(WriteRowsEventData eventData) {
        List<Serializable[]> rows = eventData.getRows();

        // 獲取表名
        String tableName = getTableName(eventData);

        // 處理每一行資料
        for (Serializable[] row : rows) {
            // 根據需要調整以下程式碼以獲取具體的列值
            String column1Value = row[0].toString();
            String column2Value = row[1].toString();

            // 將資料備份到另一個數據庫
            backupToAnotherDatabase(tableName, column1Value, column2Value);
        }
    }

    private static void handleQueryEvent(QueryEventData eventData) {
        String sql = eventData.getSql();
        logger.info("@@ handleQueryEvent函式執行Query event SQL: {}", sql);

        // 解析SQL語句,根據需要處理
        // 例如,檢查是否包含寫入操作,然後執行相應的邏輯
    }

    private static void handleTableMapEvent(TableMapEventData eventData) {
        // 獲取表對映資訊,根據需要處理
        logger.info("@@ handleTableMapEvent函式執行TableMap event: {}", eventData);
    }

    private static String getTableName(EventData eventData) {
        // 獲取表名的邏輯,可以使用TableMapEventData等資訊
        // 根據實際情況實現
        return "example_table";
    }

    private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) {
        // 將資料備份到另一個數據庫的邏輯
        logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value);
    }
}

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.