跳转至

MySQL CDC 到 PostgreSQL

为 SynchDB 准备 MySQL 数据库

在使用 SynchDB 从 MySQL 复制之前,需要按照此处 概述的步骤配置 MySQL

创建 MySQL 连接器

创建一个连接器,该连接器指向 MySQL 中 inventory 数据库下的所有表。

SELECT synchdb_add_conninfo(
'mysqlconn', '127.0.0.1', 3306, 'mysqluser',
'mysqlpwd', 'inventory', 'postgres',
'null', 'null', 'mysql');

初始快照 + CDC

使用 initial 模式启动连接器将对所有指定表(在本例中为所有表)执行初始快照。完成后,变更数据捕获 (CDC) 进程将开始流式传输新的变更。

SELECT synchdb_start_engine_bgw('mysqlconn', 'initial');



SELECT synchdb_start_engine_bgw('mysqlconn');

The stage of this connector should be in initial snapshot the first time it runs:

postgres=# select * from synchdb_state_view;
    name    | connector_type |  pid   |      stage       |  state  |   err    |                      last_dbz_offs
et
------------+----------------+--------+------------------+---------+----------+-----------------------------------
-------------------------
 mysqlconn2 | mysql          | 522195 | initial snapshot | polling | no error | {"ts_sec":1750375008,"file":"mysql
-bin.000003","pos":1500}
(1 row)

A new schema called inventory will be created and all tables streamed by the connector will be replicated under that schema.

postgres=# set search_path=public,inventory;
SET
postgres=# \d
                    List of relations
  Schema   |          Name           |   Type   | Owner
-----------+-------------------------+----------+--------
 inventory | addresses               | table    | ubuntu
 inventory | addresses_id_seq        | sequence | ubuntu
 inventory | customers               | table    | ubuntu
 inventory | customers_id_seq        | sequence | ubuntu
 inventory | geom                    | table    | ubuntu
 inventory | geom_id_seq             | sequence | ubuntu
 inventory | orders                  | table    | ubuntu
 inventory | orders_order_number_seq | sequence | ubuntu
 inventory | products                | table    | ubuntu
 inventory | products_id_seq         | sequence | ubuntu
 inventory | products_on_hand        | table    | ubuntu
 public    | synchdb_att_view        | view     | ubuntu
 public    | synchdb_attribute       | table    | ubuntu
 public    | synchdb_conninfo        | table    | ubuntu
 public    | synchdb_objmap          | table    | ubuntu
 public    | synchdb_state_view      | view     | ubuntu
 public    | synchdb_stats_view      | view     | ubuntu
(17 rows)

初始快照完成后,如果至少有一条后续更改被接收并处理,连接器阶段将从“初始快照”切换为“变更数据捕获”。

postgres=# select * from synchdb_state_view;
    name    | connector_type |  pid   |        stage        |  state  |   err    |                      last_dbz_o
ffset
------------+----------------+--------+---------------------+---------+----------+--------------------------------
----------------------------
 mysqlconn2 | mysql          | 522195 | change data capture | polling | no error | {"ts_sec":1750375008,"file":"my
sql-bin.000003","pos":1500}
(1 row)

这意味着连接器现在正在流式传输指定表的新更改。以“初始”模式重新启动连接器将从上次成功点开始继续复制,并且不会重新运行初始快照。

仅初始快照,无 CDC

使用 initial_only 模式启动连接器将仅对所有指定表(在本例中为全部)执行初始快照,之后将不再执行 CDC。

SELECT synchdb_start_engine_bgw('mysqlconn', 'initial_only');

连接器似乎仍在“轮询”其他连接器,但不会捕获任何更改,因为 Debzium 内部已停止 CDC。您可以选择关闭它。在 initial_only 模式下重新启动连接器不会重建表,因为它们已经构建好了。

postgres=# select * from synchdb_state_view;
    name    | connector_type |  pid   |      stage       |  state  |   err    |       last_dbz_offset
------------+----------------+--------+------------------+---------+----------+-----------------------------
 mysqlconn2 | mysql          | 522330 | initial snapshot | polling | no error | offset file not flushed yet
(1 row)

仅捕获表模式 + CDC

使用 no_data 模式启动连接器将仅执行模式捕获,在 PostgreSQL 中构建相应的表,并且不会复制现有表数据(跳过初始快照)。模式捕获完成后,连接器将进入 CDC 模式,并开始捕获对表的后续更改。

SELECT synchdb_start_engine_bgw('mysqlconn', 'no_data');

no_data 模式下重新启动连接器将不会再次重建模式,并且它将从上次成功点恢复 CDC。

仅 CDC

使用 never 模式启动连接器将完全跳过模式捕获和初始快照,并进入 CDC 模式以捕获后续更改。请注意,连接器要求在以 never 模式启动之前,所有捕获表都已在 PostgreSQL 中创建。如果表不存在,连接器在尝试将 CDC 更改应用于不存在的表时将遇到错误。

SELECT synchdb_start_engine_bgw('mysqlconn', 'never');

以“never”模式重启连接器将从上次成功点开始恢复 CDC。

始终执行初始快照 + CDC

使用 always 模式启动连接器将始终捕获捕获表的模式,始终重做初始快照,然后转到 CDC。这类似于重置按钮,因为使用此模式将重建所有内容。请谨慎使用,尤其是在捕获大量表时,这可能需要很长时间才能完成。重建后,CDC 将恢复正常。

SELECT synchdb_start_engine_bgw('mysqlconn', 'always');

但是,可以使用连接器的 snapshottable 选项选择部分表来重做初始快照。符合 snapshottable 中条件的表将重做初始快照,否则将跳过其初始快照。如果 snapshottable 为 null 或为空,默认情况下,连接器 table 选项中指定的所有表都将在 always 模式下重做初始快照。

此示例使连接器仅重做 inventory.customers 表的初始快照。所有其他表的快照将被跳过。

UPDATE synchdb_conninfo
SET data = jsonb_set(data, '{snapshottable}', '"inventory.customers"')
WHERE name = 'mysqlconn';

初始快照完成后,CDC 将开始。在 always 模式下重新启动连接器将重复上述过程。