fackyou200 2025-12-02 10:13 采纳率: 0%
浏览 12

您好,seatunnel在同步postgresql时【PostgreSQL CDC槽名已存在】问题

问题描述:

在运行前,Postgresql数据库槽名都已删除,但是在运行后一直提示【org.postgresql.util.PSQLException: 错误: 复制槽名 "seatunnel" 已经存在】,因项目着急第一次使用,弄的一头雾水,望老师帮我分下原因,下面有环境、配置、日志!谢谢...

1、环境
seatunnel2.3.12
PostgreSQL 10.15,wal_level=logical已配置

2、配置

env {
  parallelism = 1
  job.mode = "STREAMING"       # 流式处理模式
  checkpoint.interval = 3000   # 3checkpoint 一次
}

source {
  Postgres-CDC {
    base-url = "jdbc:postgresql://192.168.1.17:5432/test"
    username = "postgres"
    password = "xxxx"
    database-names = ["test"]
    table-names = ["test.public.abc"]
    startup-mode = "initial"   # 首次全量同步,之后增量同步
    debezium.slot.drop.on.stop=true  # 任务停止时自动删除槽
    lot.drop_on_stop=true
    decoding.cleanup=true
  }
}

sink {
  Console {
  }
}

3、日志


```java
2025-11-30 02:29:43,036 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] turned from state DEPLOYING to RUNNING.
2025-11-30 02:29:43,037 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] current state equals target state: RUNNING, skip
2025-11-30 02:29:43,037 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] state process is start
2025-11-30 02:29:43,040 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] turned from state CREATED to DEPLOYING.
2025-11-30 02:29:43,074 INFO  [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-3] - [localhost]:5801 [seatunnel-130186] [5.1] received deploying task executionId [1047459405051789313]
2025-11-30 02:29:43,094 INFO  [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-3] - [localhost]:5801 [seatunnel-130186] [5.1] deploying task TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}, executionId [1047459405051789313]
2025-11-30 02:29:43,095 INFO  [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-3] - [localhost]:5801 [seatunnel-130186] [5.1] deploying TaskGroup TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2} init success
2025-11-30 02:29:43,098 INFO  [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-3] - [localhost]:5801 [seatunnel-130186] [5.1] deploying TaskGroup TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2} success
2025-11-30 02:29:43,103 INFO  [s.e.s.t.TransformSeaTunnelTask] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - starting seatunnel transform task, index 0
2025-11-30 02:29:43,103 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] turned from state DEPLOYING to RUNNING.
2025-11-30 02:29:43,104 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] current state equals target state: RUNNING, skip
2025-11-30 02:29:43,110 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (1047459396566646785), Pipeline: [(1/1)] turned from state DEPLOYING to RUNNING.
2025-11-30 02:29:43,115 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (1047459396566646785) turned from state SCHEDULED to RUNNING.
2025-11-30 02:29:43,115 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [hz.main.seaTunnel.task.thread-5] - output rowType: id<INT>, sj<TIMESTAMP>, user_id_<STRING>
2025-11-30 02:29:43,120 INFO  [a.s.a.s.m.MultiTableSinkWriter] [hz.main.seaTunnel.task.thread-5] - init multi table sink writer, queue size: 1
2025-11-30 02:29:43,122 INFO  [.a.s.e.s.t.SourceSeaTunnelTask] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - starting seatunnel source task, index 0
2025-11-30 02:29:43,226 INFO  [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Open Source Reader.
2025-11-30 02:29:43,227 INFO  [o.a.s.a.e.LoggingEventHandler ] [hz.main.generic-operation.thread-13] - log event: ReaderOpenEvent(createdTime=1764498583226, jobId=1047459396566646785, eventType=LIFECYCLE_READER_OPEN)
2025-11-30 02:29:43,227 INFO  [.s.t.SourceSplitEnumeratorTask] [hz.main.seaTunnel.task.thread-5] - received reader register, readerID: TaskLocation{taskGroupLocation=TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}, taskID=1000200000000, index=0}
2025-11-30 02:29:43,328 INFO  [i.d.j.JdbcConnection          ] [pool-13-thread-1] - Connection gracefully closed
2025-11-30 02:29:43,618 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [hz.main.seaTunnel.task.thread-2] - Read list of available databases
2025-11-30 02:29:43,621 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [hz.main.seaTunnel.task.thread-2] -      list of available databases is: [postgres, template1, template0, abc, anyBI, fowmp, market, tj.scm, liulin, oauth, edu_screenmonitor, sxlq.hsb, timescale, timescale1, iot_data, bi, market_iot, dxhflowable, test]
2025-11-30 02:29:43,621 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [hz.main.seaTunnel.task.thread-2] - Read list of available tables in each database
2025-11-30 02:29:43,637 INFO  [.c.s.c.p.u.TableDiscoveryUtils] [hz.main.seaTunnel.task.thread-2] -      including 'test.public.abc' for further processing
2025-11-30 02:29:43,644 INFO  [i.d.j.JdbcConnection          ] [pool-14-thread-1] - Connection gracefully closed
2025-11-30 02:29:43,648 INFO  [.c.b.s.e.SnapshotSplitAssigner] [hz.main.seaTunnel.task.thread-2] - SnapshotSplitAssigner created with remaining tables: [test.public.abc]
2025-11-30 02:29:43,649 INFO  [.c.b.s.e.SnapshotSplitAssigner] [hz.main.seaTunnel.task.thread-2] - SnapshotSplitAssigner created with remaining splits: []
2025-11-30 02:29:43,649 INFO  [.c.b.s.e.SnapshotSplitAssigner] [hz.main.seaTunnel.task.thread-2] - SnapshotSplitAssigner created with assigned splits: []
2025-11-30 02:29:43,654 INFO  [o.a.s.a.e.LoggingEventHandler ] [hz.main.generic-operation.thread-15] - log event: EnumeratorOpenEvent(createdTime=1764498583653, jobId=1047459396566646785, eventType=LIFECYCLE_ENUMERATOR_OPEN)
2025-11-30 02:29:43,750 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-9] - checkpoint is enabled, start schedule trigger pending checkpoint.
2025-11-30 02:29:43,844 INFO  [.s.t.SourceSplitEnumeratorTask] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - received enough reader, starting enumerator...
2025-11-30 02:29:44,024 INFO  [i.d.j.JdbcConnection          ] [pool-15-thread-1] - Connection gracefully closed
2025-11-30 02:29:44,216 INFO  [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Start splitting table test.public.abc into chunks...
2025-11-30 02:29:44,256 INFO  [.a.s.c.c.b.u.CatalogTableUtils] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Override primary key([id]) for catalog table public.abc
2025-11-30 02:29:44,259 INFO  [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Config snapshotSplitColumn not exists for table test.public.abc
2025-11-30 02:29:44,270 INFO  [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Splitting table test.public.abc into chunks, split column: id, min: 1, max: 115, chunk size: 8096, distribution factor upper: 100.0, distribution factor lower: 0.05, sample sharding threshold: 1000
2025-11-30 02:29:44,272 INFO  [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Use unevenly-sized chunks for table test.public.abc, the chunk size is 8096
2025-11-30 02:29:44,273 INFO  [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Split table test.public.abc into 1 chunks, time cost: 57ms.
2025-11-30 02:29:44,276 INFO  [i.d.j.JdbcConnection          ] [pool-16-thread-1] - Connection gracefully closed
2025-11-30 02:29:44,281 INFO  [.b.s.r.IncrementalSourceReader] [hz.main.generic-operation.thread-22] - subtask 0 add splits: test.public.abc:0
2025-11-30 02:29:44,285 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Starting split fetcher 0
2025-11-30 02:29:44,457 INFO  [i.d.j.JdbcConnection          ] [pool-17-thread-1] - Connection gracefully closed
2025-11-30 02:29:44,658 INFO  [.a.s.c.c.b.u.CatalogTableUtils] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Override primary key([id]) for catalog table public.abc
2025-11-30 02:29:44,674 INFO  [o.a.k.c.j.JsonConverterConfig ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2025-11-30 02:29:44,946 INFO  [i.d.j.JdbcConnection          ] [pool-18-thread-1] - Connection gracefully closed
2025-11-30 02:29:45,026 INFO  [PostgresSourceFetchTaskContext] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - user 'postgres' connected to database 'test' on PostgreSQL 10.15, compiled by Visual C++ build 1800, 64-bit with roles:
    role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
2025-11-30 02:29:45,029 INFO  [i.d.c.p.c.PostgresConnection  ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=null, catalogXmin=null]
2025-11-30 02:29:45,029 INFO  [PostgresSourceFetchTaskContext] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='postgres_cdc_source'db='test', lsn=LSN{0/0}, timestamp=-290308-12-21T19:59:05.224192Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
2025-11-30 02:29:45,029 INFO  [i.d.c.p.s.InitialSnapshotter  ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Previous snapshot has completed successfully, streaming logical changes from last known position
2025-11-30 02:29:45,029 INFO  [i.d.c.p.PostgresObjectUtils   ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Creating a new replication connection for io.debezium.connector.postgresql.PostgresTaskContext@69ec11da
2025-11-30 02:29:45,221 INFO  [.PostgresReplicationConnection] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Initializing PgOutput logical decoder publication
2025-11-30 02:29:45,227 INFO  [.PostgresReplicationConnection] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Creating replication slot with command CREATE_REPLICATION_SLOT "seatunnel"  LOGICAL pgoutput
2025-11-30 02:29:45,255 INFO  [e.IncrementalSourceScanFetcher] [debezium-snapshot-reader-0] - Start snapshot read task for snapshot split: SnapshotSplit(tableId=test.public.abc, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) exactly-once: false
2025-11-30 02:29:45,262 INFO  [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 1 - Determining low watermark {lsn=6294073432, txId=1032754, ts_usec=-9223372036854775808} for split SnapshotSplit(tableId=test.public.abc, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)
2025-11-30 02:29:45,264 INFO  [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 2 - Snapshotting data
2025-11-30 02:29:45,264 INFO  [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Exporting data from split 'test.public.abc:0' of table public.abc
2025-11-30 02:29:45,264 INFO  [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - For split 'test.public.abc:0' of table public.abc using select statement: 'SELECT * FROM "public"."abc"'
2025-11-30 02:29:45,279 INFO  [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Finished exporting 16 records for split 'test.public.abc:0', total duration '00:00:00.015'
2025-11-30 02:29:45,281 INFO  [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 3 - Determining high watermark {lsn=6294073480, txId=1032755, ts_usec=-9223372036854775808} for split SnapshotSplit(tableId=test.public.abc, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)
2025-11-30 02:29:45,747 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Finished reading from splits [test.public.abc:0]
2025-11-30 02:29:45,953 INFO  [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Finished reading split(s) [test.public.abc:0]
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 1, 2020-07-30T18:25:10, 1,2,3,4,5,6,81
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 2, 2020-07-30T18:25:10, 10,11,22
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 5, 2025-11-22T02:08:04, 888888888
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=4:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 103, 2025-11-26T18:40:12, vvvv
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=5:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 104, 2025-11-26T18:43:46, hhh111
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=6:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 105, 2025-11-26T18:59:58, aba1233
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=7:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 106, 2025-11-26T19:54:37, kkkk
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=8:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 107, 2025-11-26T20:06:45, uuuu1234555588
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=9:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 108, 2025-11-27T10:39:25, 6779900
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=10:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 109, 2025-11-27T12:13:28, 88888
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=11:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 110, 2025-11-27T12:14:08, 9999
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=12:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 111, 2025-11-27T12:14:53, ppppp1
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=13:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 112, 2025-11-27T13:07:27, lll2233
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=14:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 113, 2025-11-27T13:45:48, 668890000
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=15:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 114, 2025-11-27T14:19:04, 555888000
2025-11-30 02:29:45,956 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=16:  SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 115, 2025-11-27T19:10:44, 9696000bbbb18888
2025-11-30 02:29:45,959 INFO  [.c.b.s.e.SnapshotSplitAssigner] [hz.main.generic-operation.thread-23] - Snapshot split assigner received all splits completed and the job parallelism is 1, snapshot split assigner is turn into completed status.
2025-11-30 02:29:45,965 INFO  [.b.s.r.IncrementalSourceReader] [hz.main.generic-operation.thread-25] - subtask 0 add splits: incremental-split-0
2025-11-30 02:29:45,966 INFO  [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - It's turn to read incremental split, close current snapshot fetcher.
2025-11-30 02:29:45,968 INFO  [i.d.j.JdbcConnection          ] [pool-20-thread-1] - Connection gracefully closed
2025-11-30 02:29:45,973 INFO  [i.d.j.JdbcConnection          ] [pool-21-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,170 INFO  [i.d.j.JdbcConnection          ] [pool-22-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,355 INFO  [o.a.k.c.j.JsonConverterConfig ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2025-11-30 02:29:46,526 INFO  [i.d.j.JdbcConnection          ] [pool-23-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,528 INFO  [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Stream fetcher is created.
2025-11-30 02:29:46,704 INFO  [i.d.j.JdbcConnection          ] [pool-25-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,757 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-8] - wait checkpoint completed: 1
2025-11-30 02:29:46,845 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-8] - pending checkpoint(1/1@1047459396566646785) notify finished!
2025-11-30 02:29:46,846 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-8] - start notify checkpoint completed, job id: 1047459396566646785, pipeline id: 1, checkpoint id:1
2025-11-30 02:29:46,901 INFO  [i.d.j.JdbcConnection          ] [pool-26-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,959 INFO  [PostgresSourceFetchTaskContext] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - user 'postgres' connected to database 'test' on PostgreSQL 10.15, compiled by Visual C++ build 1800, 64-bit with roles:
    role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
2025-11-30 02:29:46,960 INFO  [i.d.c.p.c.PostgresConnection  ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{1/7727F058}, catalogXmin=1032754]
2025-11-30 02:29:46,961 INFO  [PostgresSourceFetchTaskContext] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='postgres_cdc_source'db='test', lsn=LSN{1/7727F058}, txId=1032754, timestamp=-290308-12-21T19:59:05.224192Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
2025-11-30 02:29:46,961 INFO  [i.d.c.p.s.InitialSnapshotter  ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Previous snapshot has completed successfully, streaming logical changes from last known position
2025-11-30 02:29:46,961 INFO  [i.d.c.p.PostgresObjectUtils   ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Creating a new replication connection for io.debezium.connector.postgresql.PostgresTaskContext@7ad4287d
2025-11-30 02:29:47,142 INFO  [.PostgresReplicationConnection] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Initializing PgOutput logical decoder publication
2025-11-30 02:29:47,145 INFO  [.PostgresReplicationConnection] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Creating replication slot with command CREATE_REPLICATION_SLOT "seatunnel"  LOGICAL pgoutput
2025-11-30 02:29:47,146 ERROR [.s.c.s.r.f.SplitFetcherManager] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[seatunnel-transforms-v2.jar:2.3.12]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) [seatunnel-transforms-v2.jar:2.3.12]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_201]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: io.debezium.DebeziumException: Creation of replication slot failed
    at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:218) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:98) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) ~[seatunnel-transforms-v2.jar:2.3.12]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[seatunnel-transforms-v2.jar:2.3.12]
    ... 6 more
Caused by: org.postgresql.util.PSQLException: 错误: 复制槽名 "seatunnel" 已经存在
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2565) ~[postgresql-42.2.25.jar:42.2.25]
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2297) ~[postgresql-42.2.25.jar:42.2.25]
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322) ~[postgresql-42.2.25.jar:42.2.25]
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481) ~[postgresql-42.2.25.jar:42.2.25]
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401) ~[postgresql-42.2.25.jar:42.2.25]
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:322) ~[postgresql-42.2.25.jar:42.2.25]
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:308) ~[postgresql-42.2.25.jar:42.2.25]
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:284) ~[postgresql-42.2.25.jar:42.2.25]
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:279) ~[postgresql-42.2.25.jar:42.2.25]
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:394) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:210) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:98) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147) ~[?:?]
    at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) ~[seatunnel-transforms-v2.jar:2.3.12]
    at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[seatunnel-transforms-v2.jar:2.3.12]
    ... 6 more


  • 写回答

2条回答 默认 最新

  • 檀越@新空间 2025-12-02 10:21
    关注

    上午好☀️☀️☀️️
    本答案参考通义千问

    您在使用 SeaTunnel 同步 PostgreSQL 数据库时遇到的 “PostgreSQL CDC 槽名已存在” 错误,是一个非常常见的问题。根据您的描述和配置信息,我将详细分析原因,并提供解决方案。


    一、问题分析

    1. 错误信息

    org.postgresql.util.PSQLException: 错误: 复制槽名 "seatunnel" 已经存在
    

    这表示 PostgreSQL 在尝试创建一个名为 seatunnel 的复制槽(replication slot)时,发现该槽已经存在,因此抛出异常。

    2. 可能的原因

    • 重复运行任务:即使您手动删除了复制槽,但某些情况下,SeaTunnel 或者 PostgreSQL 可能没有正确清理旧的槽。
    • 配置中未正确设置自动删除槽:您配置了 debezium.slot.drop.on.stop=truelot.drop_on_stop=true,但可能存在拼写错误或配置不完整。
    • PostgreSQL 中的复制槽未被彻底清除:即使您手动执行了 DROP REPLICATION SLOT,也有可能因为权限或连接问题未能真正删除槽。
    • 多个任务同时运行:如果之前有其他任务正在使用相同的槽名,可能导致冲突。

    二、解决方案

    1. 确保复制槽已彻底删除

    执行以下 SQL 命令,确保复制槽已被删除:

    -- 查看当前存在的复制槽
    SELECT * FROM pg_replication_slots;
    
    -- 如果存在名为 seatunnel 的槽,执行删除
    DROP REPLICATION SLOT IF EXISTS seatunnel;
    

    注意:请确保您使用的是具有足够权限的用户(如 postgres),并且数据库处于 logical replication 模式。


    2. 检查并修正 SeaTunnel 配置

    您的配置中有两个参数拼写错误:

    | 参数名 | 正确值 | 说明 | |--------|--------|------| | lot.drop_on_stop | 应为 debezium.slot.drop.on.stop | 用于控制任务停止时是否自动删除复制槽 | | decoding.cleanup | 通常不需要设置 | 可能是多余配置 |

    ✅ 修改后的配置如下:

    source {
      Postgres-CDC {
        base-url = "jdbc:postgresql://192.168.1.17:5432/test"
        username = "postgres"
        password = "xxxx"
        database-names = ["test"]
        table-names = ["test.public.abc"]
        startup-mode = "initial"   # 首次全量同步,之后增量同步
        debezium.slot.drop.on.stop = true  # 任务停止时自动删除槽
      }
    }
    

    重点:确保 debezium.slot.drop.on.stop = true 是正确的配置项,而不是 lot.drop_on_stop


    3. 使用唯一槽名

    为了避免多个任务之间发生冲突,建议为每个任务指定唯一的复制槽名称。

    修改配置示例:

    source {
      Postgres-CDC {
        ...
        debezium.slot.name = "seatunnel_unique_001"  # 设置唯一槽名
        ...
      }
    }
    

    重点:使用唯一槽名可以避免多任务间冲突。


    4. 检查 PostgreSQL 的 wal_level 和逻辑复制支持

    您提到 wal_level=logical 已配置,这是正确的。但请确认以下几点:

    • 检查 PostgreSQL 的 pg_hba.conf 是否允许从您的 SeaTunnel 节点访问。
    • 确保 max_wal_senderswal_keep_segments 等参数设置合理,以支持逻辑复制。

    5. 添加日志调试信息(可选)

    如果您仍然无法定位问题,可以在 SeaTunnel 配置中添加日志级别调整,查看更详细的日志信息:

    log.level = "DEBUG"
    

    这有助于进一步排查是否因连接问题、权限问题等导致槽无法创建或删除。


    三、总结

    | 问题 | 解决方案 | |------|----------| | 复制槽未删除 | 手动执行 DROP REPLICATION SLOT | | 配置错误 | 修正 debezium.slot.drop.on.stop = true | | 槽名冲突 | 使用唯一槽名(如 seatunnel_unique_001) | | PostgreSQL 权限或配置问题 | 检查 pg_hba.confwal_level |


    四、修改后的完整配置示例

    env {
      parallelism = 1
      job.mode = "STREAMING"       # 流式处理模式
      checkpoint.interval = 3000   # 3秒 checkpoint 一次
    }
    
    source {
      Postgres-CDC {
        base-url = "jdbc:postgresql://192.168.1.17:5432/test"
        username = "postgres"
        password = "xxxx"
        database-names = ["test"]
        table-names = ["test.public.abc"]
        startup-mode = "initial"   # 首次全量同步,之后增量同步
        debezium.slot.name = "seatunnel_unique_001"  # 设置唯一槽名
        debezium.slot.drop.on.stop = true  # 任务停止时自动删除槽
      }
    }
    
    sink {
      Console {
      }
    }
    

    如果您按照上述步骤操作后仍存在问题,请提供完整的日志文件内容(尤其是包含 PSQLException 的部分),我可以进一步帮助您分析。希望以上内容对您有所帮助!

    评论

报告相同问题?

问题事件

  • 创建了问题 12月2日