Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f
zap.Bool("retryable", isRetryable),
zap.Error(err))

s.sink = nil
if s.sink != nil {
s.sink.Close()
s.sink = nil
log.Info("close the ddl sink, rebuild it when trying again",
zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID))
}
if isRetryable {
s.reportWarning(err)
} else {
Expand All @@ -199,7 +204,7 @@ func (s *ddlSinkImpl) observedRetrySinkAction(ctx context.Context, name string,
defer ticker.Stop()
for {
select {
case err := <-errCh:
case err = <-errCh:
return err
case <-ticker.C:
log.Info("owner ddl sink performs an action too long",
Expand Down
8 changes: 8 additions & 0 deletions cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ func (k *kafkaDDLProducer) SyncBroadcastMessage(ctx context.Context, topic strin
case <-ctx.Done():
return ctx.Err()
default:
<<<<<<< HEAD
err := k.syncProducer.SendMessages(ctx, topic,
totalPartitionsNum, message.Key, message.Value)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
=======
return k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))
Comment on lines +72 to +78

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Since the SyncProducer interface in pkg/sink/kafka/factory.go expects key []byte, value []byte for SendMessages, choosing the incoming branch version (message *common.Message) will break the interface implementation. Please resolve the conflict by keeping the HEAD version.

		err := k.syncProducer.SendMessages(ctx, topic,
			totalPartitionsNum, message.Key, message.Value)
		return cerror.WrapError(cerror.ErrKafkaSendMessage, err)

}
}

Expand All @@ -89,9 +93,13 @@ func (k *kafkaDDLProducer) SyncSendMessage(ctx context.Context, topic string,
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
<<<<<<< HEAD
err := k.syncProducer.SendMessage(ctx, topic,
partitionNum, message.Key, message.Value)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
=======
return k.syncProducer.SendMessage(ctx, topic, partitionNum, message)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))
Comment on lines +96 to +102

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Since SyncProducer.SendMessage expects key []byte, value []byte, please resolve the conflict by keeping the HEAD version.

		err := k.syncProducer.SendMessage(ctx, topic,
			partitionNum, message.Key, message.Value)
		return cerror.WrapError(cerror.ErrKafkaSendMessage, err)

}
}

Expand Down
4 changes: 4 additions & 0 deletions cdc/sink/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func NewKafkaDDLSink(
}

ddlProducer := producerCreator(ctx, changefeedID, syncProducer)
<<<<<<< HEAD
s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
=======
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))
Comment on lines +115 to +119

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Based on the signature of newDDLSink in mq_ddl_sink.go, it requires ctx as the first argument and codec.RowEventEncoder (which is obtained via encoderBuilder.Build()). Please resolve the conflict accordingly.

	s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)

log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
return s, nil
}
52 changes: 30 additions & 22 deletions cdc/sink/ddlsink/mq/mq_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package mq
import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
Expand Down Expand Up @@ -65,8 +64,7 @@ type DDLSink struct {
// topicManager used to manage topics.
// It is also responsible for creating topics.
topicManager manager.TopicManager
// encoderBuilder builds encoder for the sink.
encoderBuilder codec.RowEventEncoderBuilder
encoder codec.RowEventEncoder
// producer used to send events to the MQ system.
// Usually it is a sync producer.
producer ddlproducer.DDLProducer
Expand All @@ -82,10 +80,11 @@ func newDDLSink(ctx context.Context,
adminClient kafka.ClusterAdminClient,
topicManager manager.TopicManager,
eventRouter *dispatcher.EventRouter,
encoderBuilder codec.RowEventEncoderBuilder,
encoder codec.RowEventEncoder,
protocol config.Protocol,
) *DDLSink {
return &DDLSink{
<<<<<<< HEAD
id: changefeedID,
protocol: protocol,
eventRouter: eventRouter,
Expand All @@ -94,15 +93,24 @@ func newDDLSink(ctx context.Context,
producer: producer,
statistics: metrics.NewStatistics(ctx, changefeedID, sink.RowSink),
admin: adminClient,
=======
id: changefeedID,
protocol: protocol,
eventRouter: eventRouter,
topicManager: topicManager,
encoder: encoder,
producer: producer,
statistics: metrics.NewStatistics(changefeedID, sink.RowSink),
admin: adminClient,
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))
}
}

// WriteDDLEvent encodes the DDL event and sends it to the MQ system.
func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
encoder := k.encoderBuilder.Build()
msg, err := encoder.EncodeDDLEvent(ddl)
msg, err := k.encoder.EncodeDDLEvent(ddl)
if err != nil {
return errors.Trace(err)
return err
}
if msg == nil {
log.Info("Skip ddl event", zap.Uint64("commitTs", ddl.CommitTs),
Expand All @@ -126,29 +134,30 @@ func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
// then the auto-created topic will not be created as configured by ticdc.
partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
return err
}

if partitionRule == PartitionAll {
err = k.statistics.RecordDDLExecution(func() error {
return k.statistics.RecordDDLExecution(func() error {
return k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
})
return errors.Trace(err)
}
err = k.statistics.RecordDDLExecution(func() error {
return k.statistics.RecordDDLExecution(func() error {
return k.producer.SyncSendMessage(ctx, topic, 0, msg)
})
return errors.Trace(err)
}

// WriteCheckpointTs sends the checkpoint ts to the MQ system.
func (k *DDLSink) WriteCheckpointTs(ctx context.Context,
ts uint64, tables []*model.TableInfo,
) error {
encoder := k.encoderBuilder.Build()
msg, err := encoder.EncodeCheckpointEvent(ts)
var (
err error
partitionNum int32
)
msg, err := k.encoder.EncodeCheckpointEvent(ts)
if err != nil {
return errors.Trace(err)
return err
}
if msg == nil {
return nil
Expand All @@ -158,28 +167,27 @@ func (k *DDLSink) WriteCheckpointTs(ctx context.Context,
// This will be compatible with the old behavior.
if len(tables) == 0 {
topic := k.eventRouter.GetDefaultTopic()
partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic)
partitionNum, err = k.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
return err
}
log.Debug("Emit checkpointTs to default topic",
zap.String("topic", topic), zap.Uint64("checkpointTs", ts))
err = k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
return errors.Trace(err)
return k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
}
var tableNames []model.TableName
for _, table := range tables {
tableNames = append(tableNames, table.TableName)
}
topics := k.eventRouter.GetActiveTopics(tableNames)
for _, topic := range topics {
partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic)
partitionNum, err = k.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
return err
}
err = k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
if err != nil {
return errors.Trace(err)
return err
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions cdc/sink/ddlsink/mq/pulsar_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ func NewPulsarDDLSink(
return nil, errors.Trace(err)
}

<<<<<<< HEAD
s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol)
=======
s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))
Comment on lines +103 to +107

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Based on the signature of newDDLSink in mq_ddl_sink.go, it requires ctx as the first argument and codec.RowEventEncoder (which is obtained via encoderBuilder.Build()). Please resolve the conflict accordingly.

	s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)


return s, nil
}
63 changes: 21 additions & 42 deletions pkg/sink/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type AsyncProducer interface {

type saramaSyncProducer struct {
id model.ChangeFeedID
client sarama.Client
producer sarama.SyncProducer
}

Expand All @@ -101,13 +100,17 @@ func (p *saramaSyncProducer) SendMessage(
Value: sarama.ByteEncoder(value),
Partition: partitionNum,
})
return err
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
}

<<<<<<< HEAD
func (p *saramaSyncProducer) SendMessages(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
) error {
=======
func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, partitionNum int32, message *common.Message) error {
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))
Comment on lines +106 to +113

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Since the SyncProducer interface expects key []byte, value []byte for SendMessages, please keep the HEAD version of the signature to avoid compilation errors.

func (p *saramaSyncProducer) SendMessages(ctx context.Context,
	topic string, partitionNum int32,
	key []byte, value []byte,
) error {

msgs := make([]*sarama.ProducerMessage, partitionNum)
for i := 0; i < int(partitionNum); i++ {
msgs[i] = &sarama.ProducerMessage{
Expand All @@ -117,49 +120,25 @@ func (p *saramaSyncProducer) SendMessages(ctx context.Context,
Partition: int32(i),
}
}
return p.producer.SendMessages(msgs)
err := p.producer.SendMessages(msgs)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
}

func (p *saramaSyncProducer) Close() {
go func() {
// We need to close it asynchronously. Otherwise, we might get stuck
// with an unhealthy(i.e. Network jitter, isolation) state of Kafka.
// Factory has a background thread to fetch and update the metadata.
// If we close the client synchronously, we might get stuck.
// Safety:
// * If the kafka cluster is running well, it will be closed as soon as possible.
// * If there is a problem with the kafka cluster,
// no data will be lost because this is a synchronous client.
// * There is a risk of goroutine leakage, but it is acceptable and our main
// goal is not to get stuck with the owner tick.
start := time.Now()
if err := p.client.Close(); err != nil {
log.Warn("Close Kafka DDL client with error",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
} else {
log.Info("Kafka DDL client closed",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)))
}
start = time.Now()
err := p.producer.Close()
if err != nil {
log.Error("Close Kafka DDL producer with error",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
} else {
log.Info("Kafka DDL producer closed",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)))
}
}()
start := time.Now()
err := p.producer.Close()
if err != nil {
log.Error("Close Kafka DDL producer with error",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
} else {
log.Info("Kafka DDL producer closed",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)))
}
}

type saramaAsyncProducer struct {
Expand Down
5 changes: 4 additions & 1 deletion pkg/sink/kafka/sarama_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,22 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error)
return nil, err
}
config.MetricRegistry = f.registry
<<<<<<< HEAD

client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}
p, err := newSaramaSyncProducerFromClientImpl(client)
=======
p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))
Comment on lines +111 to +120

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. To resolve this correctly and avoid leaking the Kafka client, you should choose the incoming branch version (sarama.NewSyncProducer). Additionally, please make sure to remove the closeSaramaClientOnFailure call on line 122 (which is currently outside the diff), as the client variable will no longer be defined.

	p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)

if err != nil {
closeSaramaClientOnFailure(f.changefeedID, client, "close sarama client after sync producer init failed")
return nil, errors.Trace(err)
}
return &saramaSyncProducer{
id: f.changefeedID,
client: client,
producer: p,
}, nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/sink/kafka/v2/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
Comment on lines 27 to +28

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Duplicate import of the same package github.com/pingcap/tiflow/pkg/errors. Please remove the redundant cerror alias and use errors consistently.

Suggested change
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errors"

"github.com/pingcap/tiflow/pkg/security"
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -277,12 +278,13 @@ func (s *syncWriter) SendMessage(
topic string, partitionNum int32,
key []byte, value []byte,
) error {
return s.w.WriteMessages(ctx, kafka.Message{
err := s.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partitionNum),
Key: key,
Value: value,
})
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
Comment on lines +281 to +287

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the errors package directly instead of the redundant cerror alias.

Suggested change
err := s.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partitionNum),
Key: key,
Value: value,
})
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
err := s.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partitionNum),
Key: key,
Value: value,
})
return errors.WrapError(errors.ErrKafkaSendMessage, err)

}

// SendMessages produces a given set of messages, and returns only when all
Expand All @@ -303,19 +305,17 @@ func (s *syncWriter) SendMessages(
Partition: i,
}
}
return s.w.WriteMessages(ctx, msgs...)
err := s.w.WriteMessages(ctx, msgs...)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
Comment on lines +308 to +309

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the errors package directly instead of the redundant cerror alias.

Suggested change
err := s.w.WriteMessages(ctx, msgs...)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
err := s.w.WriteMessages(ctx, msgs...)
return errors.WrapError(errors.ErrKafkaSendMessage, err)

}

// Close shuts down the producer; you must call this function before a producer
// object passes out of scope, as it may otherwise leak memory.
// You must call this before calling Close on the underlying client.
func (s *syncWriter) Close() {
log.Info("kafka sync producer start closing",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
start := time.Now()
if err := s.w.Close(); err != nil {
log.Warn("Close kafka sync producer failed",
log.Warn("Close kafka sync producer meet error",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Duration("duration", time.Since(start)),
Expand Down
Loading