问题描述:
在运行前,Postgresql数据库槽名都已删除,但是在运行后一直提示【org.postgresql.util.PSQLException: 错误: 复制槽名 "seatunnel" 已经存在】,因项目着急第一次使用,弄的一头雾水,望老师帮我分下原因,下面有环境、配置、日志!谢谢...
1、环境
seatunnel2.3.12
PostgreSQL 10.15,wal_level=logical已配置
2、配置
env {
parallelism = 1
job.mode = "STREAMING" # 流式处理模式
checkpoint.interval = 3000 # 3秒 checkpoint 一次
}
source {
Postgres-CDC {
base-url = "jdbc:postgresql://192.168.1.17:5432/test"
username = "postgres"
password = "xxxx"
database-names = ["test"]
table-names = ["test.public.abc"]
startup-mode = "initial" # 首次全量同步,之后增量同步
debezium.slot.drop.on.stop=true # 任务停止时自动删除槽
lot.drop_on_stop=true
decoding.cleanup=true
}
}
sink {
Console {
}
}
3、日志
```java
2025-11-30 02:29:43,036 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] turned from state DEPLOYING to RUNNING.
2025-11-30 02:29:43,037 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] current state equals target state: RUNNING, skip
2025-11-30 02:29:43,037 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] state process is start
2025-11-30 02:29:43,040 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] turned from state CREATED to DEPLOYING.
2025-11-30 02:29:43,074 INFO [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-3] - [localhost]:5801 [seatunnel-130186] [5.1] received deploying task executionId [1047459405051789313]
2025-11-30 02:29:43,094 INFO [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-3] - [localhost]:5801 [seatunnel-130186] [5.1] deploying task TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}, executionId [1047459405051789313]
2025-11-30 02:29:43,095 INFO [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-3] - [localhost]:5801 [seatunnel-130186] [5.1] deploying TaskGroup TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2} init success
2025-11-30 02:29:43,098 INFO [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-3] - [localhost]:5801 [seatunnel-130186] [5.1] deploying TaskGroup TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2} success
2025-11-30 02:29:43,103 INFO [s.e.s.t.TransformSeaTunnelTask] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - starting seatunnel transform task, index 0
2025-11-30 02:29:43,103 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] turned from state DEPLOYING to RUNNING.
2025-11-30 02:29:43,104 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [seatunnel-coordinator-service-3] - Job (1047459396566646785), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] current state equals target state: RUNNING, skip
2025-11-30 02:29:43,110 INFO [o.a.s.e.s.d.p.SubPlan ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (1047459396566646785), Pipeline: [(1/1)] turned from state DEPLOYING to RUNNING.
2025-11-30 02:29:43,115 INFO [o.a.s.e.s.d.p.PhysicalPlan ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (1047459396566646785) turned from state SCHEDULED to RUNNING.
2025-11-30 02:29:43,115 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [hz.main.seaTunnel.task.thread-5] - output rowType: id<INT>, sj<TIMESTAMP>, user_id_<STRING>
2025-11-30 02:29:43,120 INFO [a.s.a.s.m.MultiTableSinkWriter] [hz.main.seaTunnel.task.thread-5] - init multi table sink writer, queue size: 1
2025-11-30 02:29:43,122 INFO [.a.s.e.s.t.SourceSeaTunnelTask] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - starting seatunnel source task, index 0
2025-11-30 02:29:43,226 INFO [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Open Source Reader.
2025-11-30 02:29:43,227 INFO [o.a.s.a.e.LoggingEventHandler ] [hz.main.generic-operation.thread-13] - log event: ReaderOpenEvent(createdTime=1764498583226, jobId=1047459396566646785, eventType=LIFECYCLE_READER_OPEN)
2025-11-30 02:29:43,227 INFO [.s.t.SourceSplitEnumeratorTask] [hz.main.seaTunnel.task.thread-5] - received reader register, readerID: TaskLocation{taskGroupLocation=TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}, taskID=1000200000000, index=0}
2025-11-30 02:29:43,328 INFO [i.d.j.JdbcConnection ] [pool-13-thread-1] - Connection gracefully closed
2025-11-30 02:29:43,618 INFO [.c.s.c.p.u.TableDiscoveryUtils] [hz.main.seaTunnel.task.thread-2] - Read list of available databases
2025-11-30 02:29:43,621 INFO [.c.s.c.p.u.TableDiscoveryUtils] [hz.main.seaTunnel.task.thread-2] - list of available databases is: [postgres, template1, template0, abc, anyBI, fowmp, market, tj.scm, liulin, oauth, edu_screenmonitor, sxlq.hsb, timescale, timescale1, iot_data, bi, market_iot, dxhflowable, test]
2025-11-30 02:29:43,621 INFO [.c.s.c.p.u.TableDiscoveryUtils] [hz.main.seaTunnel.task.thread-2] - Read list of available tables in each database
2025-11-30 02:29:43,637 INFO [.c.s.c.p.u.TableDiscoveryUtils] [hz.main.seaTunnel.task.thread-2] - including 'test.public.abc' for further processing
2025-11-30 02:29:43,644 INFO [i.d.j.JdbcConnection ] [pool-14-thread-1] - Connection gracefully closed
2025-11-30 02:29:43,648 INFO [.c.b.s.e.SnapshotSplitAssigner] [hz.main.seaTunnel.task.thread-2] - SnapshotSplitAssigner created with remaining tables: [test.public.abc]
2025-11-30 02:29:43,649 INFO [.c.b.s.e.SnapshotSplitAssigner] [hz.main.seaTunnel.task.thread-2] - SnapshotSplitAssigner created with remaining splits: []
2025-11-30 02:29:43,649 INFO [.c.b.s.e.SnapshotSplitAssigner] [hz.main.seaTunnel.task.thread-2] - SnapshotSplitAssigner created with assigned splits: []
2025-11-30 02:29:43,654 INFO [o.a.s.a.e.LoggingEventHandler ] [hz.main.generic-operation.thread-15] - log event: EnumeratorOpenEvent(createdTime=1764498583653, jobId=1047459396566646785, eventType=LIFECYCLE_ENUMERATOR_OPEN)
2025-11-30 02:29:43,750 INFO [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-9] - checkpoint is enabled, start schedule trigger pending checkpoint.
2025-11-30 02:29:43,844 INFO [.s.t.SourceSplitEnumeratorTask] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - received enough reader, starting enumerator...
2025-11-30 02:29:44,024 INFO [i.d.j.JdbcConnection ] [pool-15-thread-1] - Connection gracefully closed
2025-11-30 02:29:44,216 INFO [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Start splitting table test.public.abc into chunks...
2025-11-30 02:29:44,256 INFO [.a.s.c.c.b.u.CatalogTableUtils] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Override primary key([id]) for catalog table public.abc
2025-11-30 02:29:44,259 INFO [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Config snapshotSplitColumn not exists for table test.public.abc
2025-11-30 02:29:44,270 INFO [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Splitting table test.public.abc into chunks, split column: id, min: 1, max: 115, chunk size: 8096, distribution factor upper: 100.0, distribution factor lower: 0.05, sample sharding threshold: 1000
2025-11-30 02:29:44,272 INFO [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Use unevenly-sized chunks for table test.public.abc, the chunk size is 8096
2025-11-30 02:29:44,273 INFO [bstractJdbcSourceChunkSplitter] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=1}] - Split table test.public.abc into 1 chunks, time cost: 57ms.
2025-11-30 02:29:44,276 INFO [i.d.j.JdbcConnection ] [pool-16-thread-1] - Connection gracefully closed
2025-11-30 02:29:44,281 INFO [.b.s.r.IncrementalSourceReader] [hz.main.generic-operation.thread-22] - subtask 0 add splits: test.public.abc:0
2025-11-30 02:29:44,285 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Starting split fetcher 0
2025-11-30 02:29:44,457 INFO [i.d.j.JdbcConnection ] [pool-17-thread-1] - Connection gracefully closed
2025-11-30 02:29:44,658 INFO [.a.s.c.c.b.u.CatalogTableUtils] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Override primary key([id]) for catalog table public.abc
2025-11-30 02:29:44,674 INFO [o.a.k.c.j.JsonConverterConfig ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2025-11-30 02:29:44,946 INFO [i.d.j.JdbcConnection ] [pool-18-thread-1] - Connection gracefully closed
2025-11-30 02:29:45,026 INFO [PostgresSourceFetchTaskContext] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - user 'postgres' connected to database 'test' on PostgreSQL 10.15, compiled by Visual C++ build 1800, 64-bit with roles:
role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
2025-11-30 02:29:45,029 INFO [i.d.c.p.c.PostgresConnection ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=null, catalogXmin=null]
2025-11-30 02:29:45,029 INFO [PostgresSourceFetchTaskContext] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='postgres_cdc_source'db='test', lsn=LSN{0/0}, timestamp=-290308-12-21T19:59:05.224192Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
2025-11-30 02:29:45,029 INFO [i.d.c.p.s.InitialSnapshotter ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Previous snapshot has completed successfully, streaming logical changes from last known position
2025-11-30 02:29:45,029 INFO [i.d.c.p.PostgresObjectUtils ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Creating a new replication connection for io.debezium.connector.postgresql.PostgresTaskContext@69ec11da
2025-11-30 02:29:45,221 INFO [.PostgresReplicationConnection] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Initializing PgOutput logical decoder publication
2025-11-30 02:29:45,227 INFO [.PostgresReplicationConnection] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Creating replication slot with command CREATE_REPLICATION_SLOT "seatunnel" LOGICAL pgoutput
2025-11-30 02:29:45,255 INFO [e.IncrementalSourceScanFetcher] [debezium-snapshot-reader-0] - Start snapshot read task for snapshot split: SnapshotSplit(tableId=test.public.abc, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) exactly-once: false
2025-11-30 02:29:45,262 INFO [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 1 - Determining low watermark {lsn=6294073432, txId=1032754, ts_usec=-9223372036854775808} for split SnapshotSplit(tableId=test.public.abc, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)
2025-11-30 02:29:45,264 INFO [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 2 - Snapshotting data
2025-11-30 02:29:45,264 INFO [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Exporting data from split 'test.public.abc:0' of table public.abc
2025-11-30 02:29:45,264 INFO [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - For split 'test.public.abc:0' of table public.abc using select statement: 'SELECT * FROM "public"."abc"'
2025-11-30 02:29:45,279 INFO [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Finished exporting 16 records for split 'test.public.abc:0', total duration '00:00:00.015'
2025-11-30 02:29:45,281 INFO [.PostgresSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 3 - Determining high watermark {lsn=6294073480, txId=1032755, ts_usec=-9223372036854775808} for split SnapshotSplit(tableId=test.public.abc, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)
2025-11-30 02:29:45,747 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Finished reading from splits [test.public.abc:0]
2025-11-30 02:29:45,953 INFO [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Finished reading split(s) [test.public.abc:0]
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 1, 2020-07-30T18:25:10, 1,2,3,4,5,6,81
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 2, 2020-07-30T18:25:10, 10,11,22
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 5, 2025-11-22T02:08:04, 888888888
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 103, 2025-11-26T18:40:12, vvvv
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 104, 2025-11-26T18:43:46, hhh111
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 105, 2025-11-26T18:59:58, aba1233
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 106, 2025-11-26T19:54:37, kkkk
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 107, 2025-11-26T20:06:45, uuuu1234555588
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 108, 2025-11-27T10:39:25, 6779900
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 109, 2025-11-27T12:13:28, 88888
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 110, 2025-11-27T12:14:08, 9999
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 111, 2025-11-27T12:14:53, ppppp1
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 112, 2025-11-27T13:07:27, lll2233
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 113, 2025-11-27T13:45:48, 668890000
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 114, 2025-11-27T14:19:04, 555888000
2025-11-30 02:29:45,956 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=test.public.abc SeaTunnelRow#kind=INSERT : 115, 2025-11-27T19:10:44, 9696000bbbb18888
2025-11-30 02:29:45,959 INFO [.c.b.s.e.SnapshotSplitAssigner] [hz.main.generic-operation.thread-23] - Snapshot split assigner received all splits completed and the job parallelism is 1, snapshot split assigner is turn into completed status.
2025-11-30 02:29:45,965 INFO [.b.s.r.IncrementalSourceReader] [hz.main.generic-operation.thread-25] - subtask 0 add splits: incremental-split-0
2025-11-30 02:29:45,966 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - It's turn to read incremental split, close current snapshot fetcher.
2025-11-30 02:29:45,968 INFO [i.d.j.JdbcConnection ] [pool-20-thread-1] - Connection gracefully closed
2025-11-30 02:29:45,973 INFO [i.d.j.JdbcConnection ] [pool-21-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,170 INFO [i.d.j.JdbcConnection ] [pool-22-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,355 INFO [o.a.k.c.j.JsonConverterConfig ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2025-11-30 02:29:46,526 INFO [i.d.j.JdbcConnection ] [pool-23-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,528 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Stream fetcher is created.
2025-11-30 02:29:46,704 INFO [i.d.j.JdbcConnection ] [pool-25-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,757 INFO [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-8] - wait checkpoint completed: 1
2025-11-30 02:29:46,845 INFO [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-8] - pending checkpoint(1/1@1047459396566646785) notify finished!
2025-11-30 02:29:46,846 INFO [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-8] - start notify checkpoint completed, job id: 1047459396566646785, pipeline id: 1, checkpoint id:1
2025-11-30 02:29:46,901 INFO [i.d.j.JdbcConnection ] [pool-26-thread-1] - Connection gracefully closed
2025-11-30 02:29:46,959 INFO [PostgresSourceFetchTaskContext] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - user 'postgres' connected to database 'test' on PostgreSQL 10.15, compiled by Visual C++ build 1800, 64-bit with roles:
role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
2025-11-30 02:29:46,960 INFO [i.d.c.p.c.PostgresConnection ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{1/7727F058}, catalogXmin=1032754]
2025-11-30 02:29:46,961 INFO [PostgresSourceFetchTaskContext] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='postgres_cdc_source'db='test', lsn=LSN{1/7727F058}, txId=1032754, timestamp=-290308-12-21T19:59:05.224192Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
2025-11-30 02:29:46,961 INFO [i.d.c.p.s.InitialSnapshotter ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Previous snapshot has completed successfully, streaming logical changes from last known position
2025-11-30 02:29:46,961 INFO [i.d.c.p.PostgresObjectUtils ] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Creating a new replication connection for io.debezium.connector.postgresql.PostgresTaskContext@7ad4287d
2025-11-30 02:29:47,142 INFO [.PostgresReplicationConnection] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Initializing PgOutput logical decoder publication
2025-11-30 02:29:47,145 INFO [.PostgresReplicationConnection] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Creating replication slot with command CREATE_REPLICATION_SLOT "seatunnel" LOGICAL pgoutput
2025-11-30 02:29:47,146 ERROR [.s.c.s.r.f.SplitFetcherManager] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=1047459396566646785, pipelineId=1, taskGroupId=2}] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[seatunnel-transforms-v2.jar:2.3.12]
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) [seatunnel-transforms-v2.jar:2.3.12]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_201]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: io.debezium.DebeziumException: Creation of replication slot failed
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:218) ~[?:?]
at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:98) ~[?:?]
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147) ~[?:?]
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) ~[seatunnel-transforms-v2.jar:2.3.12]
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[seatunnel-transforms-v2.jar:2.3.12]
... 6 more
Caused by: org.postgresql.util.PSQLException: 错误: 复制槽名 "seatunnel" 已经存在
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2565) ~[postgresql-42.2.25.jar:42.2.25]
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2297) ~[postgresql-42.2.25.jar:42.2.25]
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322) ~[postgresql-42.2.25.jar:42.2.25]
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481) ~[postgresql-42.2.25.jar:42.2.25]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401) ~[postgresql-42.2.25.jar:42.2.25]
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:322) ~[postgresql-42.2.25.jar:42.2.25]
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:308) ~[postgresql-42.2.25.jar:42.2.25]
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:284) ~[postgresql-42.2.25.jar:42.2.25]
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:279) ~[postgresql-42.2.25.jar:42.2.25]
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:394) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:210) ~[?:?]
at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:98) ~[?:?]
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147) ~[?:?]
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) ~[seatunnel-transforms-v2.jar:2.3.12]
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[seatunnel-transforms-v2.jar:2.3.12]
... 6 more