diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 4de195ac0b..b544042003 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -173,7 +173,12 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f zap.Bool("retryable", isRetryable), zap.Error(err)) - s.sink = nil + if s.sink != nil { + s.sink.Close() + s.sink = nil + log.Info("close the ddl sink, rebuild it when trying again", + zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID)) + } if isRetryable { s.reportWarning(err) } else { @@ -199,7 +204,7 @@ func (s *ddlSinkImpl) observedRetrySinkAction(ctx context.Context, name string, defer ticker.Stop() for { select { - case err := <-errCh: + case err = <-errCh: return err case <-ticker.C: log.Info("owner ddl sink performs an action too long", diff --git a/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go index f3abfd54b2..559ac8f08f 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go @@ -69,9 +69,13 @@ func (k *kafkaDDLProducer) SyncBroadcastMessage(ctx context.Context, topic strin case <-ctx.Done(): return ctx.Err() default: +<<<<<<< HEAD err := k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message.Key, message.Value) return cerror.WrapError(cerror.ErrKafkaSendMessage, err) +======= + return k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message) +>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) } } @@ -89,9 +93,13 @@ func (k *kafkaDDLProducer) SyncSendMessage(ctx context.Context, topic string, case <-ctx.Done(): return errors.Trace(ctx.Err()) default: +<<<<<<< HEAD err := k.syncProducer.SendMessage(ctx, topic, partitionNum, message.Key, message.Value) return cerror.WrapError(cerror.ErrKafkaSendMessage, err) +======= + return k.syncProducer.SendMessage(ctx, topic, partitionNum, message) +>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) } } diff --git a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go index f77571db39..f822d24571 100644 --- a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go @@ -112,7 +112,11 @@ func NewKafkaDDLSink( } ddlProducer := producerCreator(ctx, changefeedID, syncProducer) +<<<<<<< HEAD s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol) +======= + s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol) +>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) return s, nil } diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink.go b/cdc/sink/ddlsink/mq/mq_ddl_sink.go index d8167212b8..60c7d160db 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink.go @@ -16,7 +16,6 @@ package mq import ( "context" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -65,8 +64,7 @@ type DDLSink struct { // topicManager used to manage topics. // It is also responsible for creating topics. topicManager manager.TopicManager - // encoderBuilder builds encoder for the sink. - encoderBuilder codec.RowEventEncoderBuilder + encoder codec.RowEventEncoder // producer used to send events to the MQ system. // Usually it is a sync producer. producer ddlproducer.DDLProducer @@ -82,10 +80,11 @@ func newDDLSink(ctx context.Context, adminClient kafka.ClusterAdminClient, topicManager manager.TopicManager, eventRouter *dispatcher.EventRouter, - encoderBuilder codec.RowEventEncoderBuilder, + encoder codec.RowEventEncoder, protocol config.Protocol, ) *DDLSink { return &DDLSink{ +<<<<<<< HEAD id: changefeedID, protocol: protocol, eventRouter: eventRouter, @@ -94,15 +93,24 @@ func newDDLSink(ctx context.Context, producer: producer, statistics: metrics.NewStatistics(ctx, changefeedID, sink.RowSink), admin: adminClient, +======= + id: changefeedID, + protocol: protocol, + eventRouter: eventRouter, + topicManager: topicManager, + encoder: encoder, + producer: producer, + statistics: metrics.NewStatistics(changefeedID, sink.RowSink), + admin: adminClient, +>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) } } // WriteDDLEvent encodes the DDL event and sends it to the MQ system. func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - encoder := k.encoderBuilder.Build() - msg, err := encoder.EncodeDDLEvent(ddl) + msg, err := k.encoder.EncodeDDLEvent(ddl) if err != nil { - return errors.Trace(err) + return err } if msg == nil { log.Info("Skip ddl event", zap.Uint64("commitTs", ddl.CommitTs), @@ -126,29 +134,30 @@ func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // then the auto-created topic will not be created as configured by ticdc. partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic) if err != nil { - return errors.Trace(err) + return err } if partitionRule == PartitionAll { - err = k.statistics.RecordDDLExecution(func() error { + return k.statistics.RecordDDLExecution(func() error { return k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) }) - return errors.Trace(err) } - err = k.statistics.RecordDDLExecution(func() error { + return k.statistics.RecordDDLExecution(func() error { return k.producer.SyncSendMessage(ctx, topic, 0, msg) }) - return errors.Trace(err) } // WriteCheckpointTs sends the checkpoint ts to the MQ system. func (k *DDLSink) WriteCheckpointTs(ctx context.Context, ts uint64, tables []*model.TableInfo, ) error { - encoder := k.encoderBuilder.Build() - msg, err := encoder.EncodeCheckpointEvent(ts) + var ( + err error + partitionNum int32 + ) + msg, err := k.encoder.EncodeCheckpointEvent(ts) if err != nil { - return errors.Trace(err) + return err } if msg == nil { return nil @@ -158,14 +167,13 @@ func (k *DDLSink) WriteCheckpointTs(ctx context.Context, // This will be compatible with the old behavior. if len(tables) == 0 { topic := k.eventRouter.GetDefaultTopic() - partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic) + partitionNum, err = k.topicManager.GetPartitionNum(ctx, topic) if err != nil { - return errors.Trace(err) + return err } log.Debug("Emit checkpointTs to default topic", zap.String("topic", topic), zap.Uint64("checkpointTs", ts)) - err = k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) - return errors.Trace(err) + return k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) } var tableNames []model.TableName for _, table := range tables { @@ -173,13 +181,13 @@ func (k *DDLSink) WriteCheckpointTs(ctx context.Context, } topics := k.eventRouter.GetActiveTopics(tableNames) for _, topic := range topics { - partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic) + partitionNum, err = k.topicManager.GetPartitionNum(ctx, topic) if err != nil { - return errors.Trace(err) + return err } err = k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg) if err != nil { - return errors.Trace(err) + return err } } return nil diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go index 4cebead0ac..803a67cd5d 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -100,7 +100,11 @@ func NewPulsarDDLSink( return nil, errors.Trace(err) } +<<<<<<< HEAD s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) +======= + s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol) +>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) return s, nil } diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index 22a3f6d9c5..1041a8ff98 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -86,7 +86,6 @@ type AsyncProducer interface { type saramaSyncProducer struct { id model.ChangeFeedID - client sarama.Client producer sarama.SyncProducer } @@ -101,13 +100,17 @@ func (p *saramaSyncProducer) SendMessage( Value: sarama.ByteEncoder(value), Partition: partitionNum, }) - return err + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } +<<<<<<< HEAD func (p *saramaSyncProducer) SendMessages(ctx context.Context, topic string, partitionNum int32, key []byte, value []byte, ) error { +======= +func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, partitionNum int32, message *common.Message) error { +>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) msgs := make([]*sarama.ProducerMessage, partitionNum) for i := 0; i < int(partitionNum); i++ { msgs[i] = &sarama.ProducerMessage{ @@ -117,49 +120,25 @@ func (p *saramaSyncProducer) SendMessages(ctx context.Context, Partition: int32(i), } } - return p.producer.SendMessages(msgs) + err := p.producer.SendMessages(msgs) + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } func (p *saramaSyncProducer) Close() { - go func() { - // We need to close it asynchronously. Otherwise, we might get stuck - // with an unhealthy(i.e. Network jitter, isolation) state of Kafka. - // Factory has a background thread to fetch and update the metadata. - // If we close the client synchronously, we might get stuck. - // Safety: - // * If the kafka cluster is running well, it will be closed as soon as possible. - // * If there is a problem with the kafka cluster, - // no data will be lost because this is a synchronous client. - // * There is a risk of goroutine leakage, but it is acceptable and our main - // goal is not to get stuck with the owner tick. - start := time.Now() - if err := p.client.Close(); err != nil { - log.Warn("Close Kafka DDL client with error", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Duration("duration", time.Since(start)), - zap.Error(err)) - } else { - log.Info("Kafka DDL client closed", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Duration("duration", time.Since(start))) - } - start = time.Now() - err := p.producer.Close() - if err != nil { - log.Error("Close Kafka DDL producer with error", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Duration("duration", time.Since(start)), - zap.Error(err)) - } else { - log.Info("Kafka DDL producer closed", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Duration("duration", time.Since(start))) - } - }() + start := time.Now() + err := p.producer.Close() + if err != nil { + log.Error("Close Kafka DDL producer with error", + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + } else { + log.Info("Kafka DDL producer closed", + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID), + zap.Duration("duration", time.Since(start))) + } } type saramaAsyncProducer struct { diff --git a/pkg/sink/kafka/sarama_factory.go b/pkg/sink/kafka/sarama_factory.go index 7c74e92cc0..19c5f5306b 100644 --- a/pkg/sink/kafka/sarama_factory.go +++ b/pkg/sink/kafka/sarama_factory.go @@ -108,19 +108,22 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error) return nil, err } config.MetricRegistry = f.registry +<<<<<<< HEAD client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config) if err != nil { return nil, errors.Trace(err) } p, err := newSaramaSyncProducerFromClientImpl(client) +======= + p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config) +>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) if err != nil { closeSaramaClientOnFailure(f.changefeedID, client, "close sarama client after sync producer init failed") return nil, errors.Trace(err) } return &saramaSyncProducer{ id: f.changefeedID, - client: client, producer: p, }, nil } diff --git a/pkg/sink/kafka/v2/factory.go b/pkg/sink/kafka/v2/factory.go index 870ec48cf7..d6377ccdd4 100644 --- a/pkg/sink/kafka/v2/factory.go +++ b/pkg/sink/kafka/v2/factory.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/security" pkafka "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/pingcap/tiflow/pkg/util" @@ -277,12 +278,13 @@ func (s *syncWriter) SendMessage( topic string, partitionNum int32, key []byte, value []byte, ) error { - return s.w.WriteMessages(ctx, kafka.Message{ + err := s.w.WriteMessages(ctx, kafka.Message{ Topic: topic, Partition: int(partitionNum), Key: key, Value: value, }) + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } // SendMessages produces a given set of messages, and returns only when all @@ -303,19 +305,17 @@ func (s *syncWriter) SendMessages( Partition: i, } } - return s.w.WriteMessages(ctx, msgs...) + err := s.w.WriteMessages(ctx, msgs...) + return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } // Close shuts down the producer; you must call this function before a producer // object passes out of scope, as it may otherwise leak memory. // You must call this before calling Close on the underlying client. func (s *syncWriter) Close() { - log.Info("kafka sync producer start closing", - zap.String("namespace", s.changefeedID.Namespace), - zap.String("changefeed", s.changefeedID.ID)) start := time.Now() if err := s.w.Close(); err != nil { - log.Warn("Close kafka sync producer failed", + log.Warn("Close kafka sync producer meet error", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID), zap.Duration("duration", time.Since(start)),