From e6203b1074992d808e736d8775da6834676b429b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 26 Mar 2025 15:34:11 +0800 Subject: [PATCH] fix ddl sink --- cdc/owner/ddl_sink.go | 9 ++- .../mq/ddlproducer/kafka_ddl_producer.go | 6 +- cdc/sink/ddlsink/mq/kafka_ddl_sink.go | 2 +- cdc/sink/ddlsink/mq/mq_ddl_sink.go | 57 ++++++++--------- cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 2 +- pkg/sink/kafka/factory.go | 61 ++++++------------- pkg/sink/kafka/sarama_factory.go | 9 +-- pkg/sink/kafka/v2/factory.go | 12 ++-- 8 files changed, 63 insertions(+), 95 deletions(-) diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index ecbe8c3444..4c5d9250ee 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -173,7 +173,12 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f zap.Bool("retryable", isRetryable), zap.Error(err)) - s.sink = nil + if s.sink != nil { + s.sink.Close() + s.sink = nil + log.Info("close the ddl sink, rebuild it when trying again", + zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID)) + } if isRetryable { s.reportWarning(err) } else { @@ -199,7 +204,7 @@ func (s *ddlSinkImpl) observedRetrySinkAction(ctx context.Context, name string, defer ticker.Stop() for { select { - case err := <-errCh: + case err = <-errCh: return err case <-ticker.C: log.Info("owner ddl sink performs an action too long", diff --git a/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go index 7a2d2b3847..d69bd2d6b7 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go @@ -69,8 +69,7 @@ func (k *kafkaDDLProducer) SyncBroadcastMessage(ctx context.Context, topic strin case <-ctx.Done(): return ctx.Err() default: - err := k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message) - return cerror.WrapError(cerror.ErrKafkaSendMessage, err) + return k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message) } } @@ -88,8 +87,7 @@ func (k *kafkaDDLProducer) SyncSendMessage(ctx context.Context, topic string, case <-ctx.Done(): return errors.Trace(ctx.Err()) default: - err := k.syncProducer.SendMessage(ctx, topic, partitionNum, message) - return cerror.WrapError(cerror.ErrKafkaSendMessage, err) + return k.syncProducer.SendMessage(ctx, topic, partitionNum, message) } } diff --git a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go index d40defd1f2..81591d701c 100644 --- a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go @@ -112,7 +112,7 @@ func NewKafkaDDLSink( } ddlProducer := producerCreator(ctx, changefeedID, syncProducer) - s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol) + s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol) log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) return s, nil } diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink.go b/cdc/sink/ddlsink/mq/mq_ddl_sink.go index 0c42711811..b20288d802 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink.go @@ -16,7 +16,6 @@ package mq import ( "context" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -65,8 +64,7 @@ type DDLSink struct { // topicManager used to manage topics. // It is also responsible for creating topics. topicManager manager.TopicManager - // encoderBuilder builds encoder for the sink. - encoderBuilder codec.RowEventEncoderBuilder + encoder codec.RowEventEncoder // producer used to send events to the MQ system. // Usually it is a sync producer. producer ddlproducer.DDLProducer @@ -82,27 +80,26 @@ func newDDLSink( adminClient kafka.ClusterAdminClient, topicManager manager.TopicManager, eventRouter *dispatcher.EventRouter, - encoderBuilder codec.RowEventEncoderBuilder, + encoder codec.RowEventEncoder, protocol config.Protocol, ) *DDLSink { return &DDLSink{ - id: changefeedID, - protocol: protocol, - eventRouter: eventRouter, - topicManager: topicManager, - encoderBuilder: encoderBuilder, - producer: producer, - statistics: metrics.NewStatistics(changefeedID, sink.RowSink), - admin: adminClient, + id: changefeedID, + protocol: protocol, + eventRouter: eventRouter, + topicManager: topicManager, + encoder: encoder, + producer: producer, + statistics: metrics.NewStatistics(changefeedID, sink.RowSink), + admin: adminClient, } } // WriteDDLEvent encodes the DDL event and sends it to the MQ system. func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - encoder := k.encoderBuilder.Build() - msg, err := encoder.EncodeDDLEvent(ddl) + msg, err := k.encoder.EncodeDDLEvent(ddl) if err != nil { - return errors.Trace(err) + return err } if msg == nil { log.Info("Skip ddl event", zap.Uint64("commitTs", ddl.CommitTs), @@ -126,29 +123,30 @@ func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // then the auto-created topic will not be created as configured by ticdc. partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic) if err != nil { - return errors.Trace(err) + return err } if partitionRule == PartitionAll { - err = k.statistics.RecordDDLExecution(func() error { + return k.statistics.RecordDDLExecution(func() error { return k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) }) - return errors.Trace(err) } - err = k.statistics.RecordDDLExecution(func() error { + return k.statistics.RecordDDLExecution(func() error { return k.producer.SyncSendMessage(ctx, topic, 0, msg) }) - return errors.Trace(err) } // WriteCheckpointTs sends the checkpoint ts to the MQ system. func (k *DDLSink) WriteCheckpointTs(ctx context.Context, ts uint64, tables []*model.TableInfo, ) error { - encoder := k.encoderBuilder.Build() - msg, err := encoder.EncodeCheckpointEvent(ts) + var ( + err error + partitionNum int32 + ) + msg, err := k.encoder.EncodeCheckpointEvent(ts) if err != nil { - return errors.Trace(err) + return err } if msg == nil { return nil @@ -158,14 +156,13 @@ func (k *DDLSink) WriteCheckpointTs(ctx context.Context, // This will be compatible with the old behavior. if len(tables) == 0 { topic := k.eventRouter.GetDefaultTopic() - partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic) + partitionNum, err = k.topicManager.GetPartitionNum(ctx, topic) if err != nil { - return errors.Trace(err) + return err } log.Debug("Emit checkpointTs to default topic", zap.String("topic", topic), zap.Uint64("checkpointTs", ts)) - err = k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) - return errors.Trace(err) + return k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) } var tableNames []model.TableName for _, table := range tables { @@ -173,13 +170,13 @@ func (k *DDLSink) WriteCheckpointTs(ctx context.Context, } topics := k.eventRouter.GetActiveTopics(tableNames) for _, topic := range topics { - partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic) + partitionNum, err = k.topicManager.GetPartitionNum(ctx, topic) if err != nil { - return errors.Trace(err) + return err } err = k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) if err != nil { - return errors.Trace(err) + return err } } return nil diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go index 5587852b04..058ee65482 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -100,7 +100,7 @@ func NewPulsarDDLSink( return nil, errors.Trace(err) } - s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) + s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol) return s, nil } diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index a8a8f05e00..ddec891b0d 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -84,7 +84,6 @@ type AsyncProducer interface { type saramaSyncProducer struct { id model.ChangeFeedID - client sarama.Client producer sarama.SyncProducer } @@ -99,10 +98,10 @@ func (p *saramaSyncProducer) SendMessage( Value: sarama.ByteEncoder(message.Value), Partition: partitionNum, }) - return err + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } -func (p *saramaSyncProducer) SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error { +func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, partitionNum int32, message *common.Message) error { msgs := make([]*sarama.ProducerMessage, partitionNum) for i := 0; i < int(partitionNum); i++ { msgs[i] = &sarama.ProducerMessage{ @@ -112,49 +111,25 @@ func (p *saramaSyncProducer) SendMessages(ctx context.Context, topic string, par Partition: int32(i), } } - return p.producer.SendMessages(msgs) + err := p.producer.SendMessages(msgs) + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } func (p *saramaSyncProducer) Close() { - go func() { - // We need to close it asynchronously. Otherwise, we might get stuck - // with an unhealthy(i.e. Network jitter, isolation) state of Kafka. - // Factory has a background thread to fetch and update the metadata. - // If we close the client synchronously, we might get stuck. - // Safety: - // * If the kafka cluster is running well, it will be closed as soon as possible. - // * If there is a problem with the kafka cluster, - // no data will be lost because this is a synchronous client. - // * There is a risk of goroutine leakage, but it is acceptable and our main - // goal is not to get stuck with the owner tick. - start := time.Now() - if err := p.client.Close(); err != nil { - log.Warn("Close Kafka DDL client with error", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Duration("duration", time.Since(start)), - zap.Error(err)) - } else { - log.Info("Kafka DDL client closed", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Duration("duration", time.Since(start))) - } - start = time.Now() - err := p.producer.Close() - if err != nil { - log.Error("Close Kafka DDL producer with error", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Duration("duration", time.Since(start)), - zap.Error(err)) - } else { - log.Info("Kafka DDL producer closed", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Duration("duration", time.Since(start))) - } - }() + start := time.Now() + err := p.producer.Close() + if err != nil { + log.Error("Close Kafka DDL producer with error", + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + } else { + log.Info("Kafka DDL producer closed", + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID), + zap.Duration("duration", time.Since(start))) + } } type saramaAsyncProducer struct { diff --git a/pkg/sink/kafka/sarama_factory.go b/pkg/sink/kafka/sarama_factory.go index 1d569a1456..9c4c06bc7e 100644 --- a/pkg/sink/kafka/sarama_factory.go +++ b/pkg/sink/kafka/sarama_factory.go @@ -90,19 +90,12 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error) return nil, err } config.MetricRegistry = f.registry - - client, err := sarama.NewClient(f.option.BrokerEndpoints, config) - if err != nil { - return nil, errors.Trace(err) - } - - p, err := sarama.NewSyncProducerFromClient(client) + p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config) if err != nil { return nil, errors.Trace(err) } return &saramaSyncProducer{ id: f.changefeedID, - client: client, producer: p, }, nil } diff --git a/pkg/sink/kafka/v2/factory.go b/pkg/sink/kafka/v2/factory.go index 7d0346f746..a4b346b210 100644 --- a/pkg/sink/kafka/v2/factory.go +++ b/pkg/sink/kafka/v2/factory.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/sink/codec/common" pkafka "github.com/pingcap/tiflow/pkg/sink/kafka" @@ -278,12 +279,13 @@ func (s *syncWriter) SendMessage( topic string, partitionNum int32, message *common.Message, ) error { - return s.w.WriteMessages(ctx, kafka.Message{ + err := s.w.WriteMessages(ctx, kafka.Message{ Topic: topic, Partition: int(partitionNum), Key: message.Key, Value: message.Value, }) + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } // SendMessages produces a given set of messages, and returns only when all @@ -300,19 +302,17 @@ func (s *syncWriter) SendMessages(ctx context.Context, topic string, partitionNu Partition: i, } } - return s.w.WriteMessages(ctx, msgs...) + err := s.w.WriteMessages(ctx, msgs...) + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } // Close shuts down the producer; you must call this function before a producer // object passes out of scope, as it may otherwise leak memory. // You must call this before calling Close on the underlying client. func (s *syncWriter) Close() { - log.Info("kafka sync producer start closing", - zap.String("namespace", s.changefeedID.Namespace), - zap.String("changefeed", s.changefeedID.ID)) start := time.Now() if err := s.w.Close(); err != nil { - log.Warn("Close kafka sync producer failed", + log.Warn("Close kafka sync producer meet error", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID), zap.Duration("duration", time.Since(start)),