Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f
zap.Bool("retryable", isRetryable),
zap.Error(err))

s.sink = nil
if s.sink != nil {
s.sink.Close()
s.sink = nil
log.Info("close the ddl sink, rebuild it when trying again",
zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID))
}
if isRetryable {
s.reportWarning(err)
} else {
Expand All @@ -199,7 +204,7 @@ func (s *ddlSinkImpl) observedRetrySinkAction(ctx context.Context, name string,
defer ticker.Stop()
for {
select {
case err := <-errCh:
case err = <-errCh:
return err
case <-ticker.C:
log.Info("owner ddl sink performs an action too long",
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func (k *kafkaDDLProducer) SyncBroadcastMessage(ctx context.Context, topic strin
case <-ctx.Done():
return ctx.Err()
default:
err := k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
return k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message)
}
}

Expand All @@ -88,8 +87,7 @@ func (k *kafkaDDLProducer) SyncSendMessage(ctx context.Context, topic string,
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
err := k.syncProducer.SendMessage(ctx, topic, partitionNum, message)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
return k.syncProducer.SendMessage(ctx, topic, partitionNum, message)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewKafkaDDLSink(
}

ddlProducer := producerCreator(ctx, changefeedID, syncProducer)
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
return s, nil
}
57 changes: 27 additions & 30 deletions cdc/sink/ddlsink/mq/mq_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package mq
import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
Expand Down Expand Up @@ -65,8 +64,7 @@ type DDLSink struct {
// topicManager used to manage topics.
// It is also responsible for creating topics.
topicManager manager.TopicManager
// encoderBuilder builds encoder for the sink.
encoderBuilder codec.RowEventEncoderBuilder
encoder codec.RowEventEncoder
// producer used to send events to the MQ system.
// Usually it is a sync producer.
producer ddlproducer.DDLProducer
Expand All @@ -82,27 +80,26 @@ func newDDLSink(
adminClient kafka.ClusterAdminClient,
topicManager manager.TopicManager,
eventRouter *dispatcher.EventRouter,
encoderBuilder codec.RowEventEncoderBuilder,
encoder codec.RowEventEncoder,
protocol config.Protocol,
) *DDLSink {
return &DDLSink{
id: changefeedID,
protocol: protocol,
eventRouter: eventRouter,
topicManager: topicManager,
encoderBuilder: encoderBuilder,
producer: producer,
statistics: metrics.NewStatistics(changefeedID, sink.RowSink),
admin: adminClient,
id: changefeedID,
protocol: protocol,
eventRouter: eventRouter,
topicManager: topicManager,
encoder: encoder,
producer: producer,
statistics: metrics.NewStatistics(changefeedID, sink.RowSink),
admin: adminClient,
}
}

// WriteDDLEvent encodes the DDL event and sends it to the MQ system.
func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
encoder := k.encoderBuilder.Build()
msg, err := encoder.EncodeDDLEvent(ddl)
msg, err := k.encoder.EncodeDDLEvent(ddl)
if err != nil {
return errors.Trace(err)
return err
}
if msg == nil {
log.Info("Skip ddl event", zap.Uint64("commitTs", ddl.CommitTs),
Expand All @@ -126,29 +123,30 @@ func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
// then the auto-created topic will not be created as configured by ticdc.
partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
return err
}

if partitionRule == PartitionAll {
err = k.statistics.RecordDDLExecution(func() error {
return k.statistics.RecordDDLExecution(func() error {
return k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
})
return errors.Trace(err)
}
err = k.statistics.RecordDDLExecution(func() error {
return k.statistics.RecordDDLExecution(func() error {
return k.producer.SyncSendMessage(ctx, topic, 0, msg)
})
return errors.Trace(err)
}

// WriteCheckpointTs sends the checkpoint ts to the MQ system.
func (k *DDLSink) WriteCheckpointTs(ctx context.Context,
ts uint64, tables []*model.TableInfo,
) error {
encoder := k.encoderBuilder.Build()
msg, err := encoder.EncodeCheckpointEvent(ts)
var (
err error
partitionNum int32
)
msg, err := k.encoder.EncodeCheckpointEvent(ts)
if err != nil {
return errors.Trace(err)
return err
}
if msg == nil {
return nil
Expand All @@ -158,28 +156,27 @@ func (k *DDLSink) WriteCheckpointTs(ctx context.Context,
// This will be compatible with the old behavior.
if len(tables) == 0 {
topic := k.eventRouter.GetDefaultTopic()
partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic)
partitionNum, err = k.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
return err
}
log.Debug("Emit checkpointTs to default topic",
zap.String("topic", topic), zap.Uint64("checkpointTs", ts))
err = k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
return errors.Trace(err)
return k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
}
var tableNames []model.TableName
for _, table := range tables {
tableNames = append(tableNames, table.TableName)
}
topics := k.eventRouter.GetActiveTopics(tableNames)
for _, topic := range topics {
partitionNum, err := k.topicManager.GetPartitionNum(ctx, topic)
partitionNum, err = k.topicManager.GetPartitionNum(ctx, topic)
if err != nil {
return errors.Trace(err)
return err
}
err = k.producer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
if err != nil {
return errors.Trace(err)
return err
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mq/pulsar_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewPulsarDDLSink(
return nil, errors.Trace(err)
}

s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol)
s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)

return s, nil
}
61 changes: 18 additions & 43 deletions pkg/sink/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type AsyncProducer interface {

type saramaSyncProducer struct {
id model.ChangeFeedID
client sarama.Client
producer sarama.SyncProducer
}

Expand All @@ -99,10 +98,10 @@ func (p *saramaSyncProducer) SendMessage(
Value: sarama.ByteEncoder(message.Value),
Partition: partitionNum,
})
return err
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
}

func (p *saramaSyncProducer) SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error {
func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, partitionNum int32, message *common.Message) error {
msgs := make([]*sarama.ProducerMessage, partitionNum)
for i := 0; i < int(partitionNum); i++ {
msgs[i] = &sarama.ProducerMessage{
Expand All @@ -112,49 +111,25 @@ func (p *saramaSyncProducer) SendMessages(ctx context.Context, topic string, par
Partition: int32(i),
}
}
return p.producer.SendMessages(msgs)
err := p.producer.SendMessages(msgs)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
}

func (p *saramaSyncProducer) Close() {
go func() {
// We need to close it asynchronously. Otherwise, we might get stuck
// with an unhealthy(i.e. Network jitter, isolation) state of Kafka.
// Factory has a background thread to fetch and update the metadata.
// If we close the client synchronously, we might get stuck.
// Safety:
// * If the kafka cluster is running well, it will be closed as soon as possible.
// * If there is a problem with the kafka cluster,
// no data will be lost because this is a synchronous client.
// * There is a risk of goroutine leakage, but it is acceptable and our main
// goal is not to get stuck with the owner tick.
start := time.Now()
if err := p.client.Close(); err != nil {
Comment thread
3AceShowHand marked this conversation as resolved.
log.Warn("Close Kafka DDL client with error",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
} else {
log.Info("Kafka DDL client closed",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)))
}
start = time.Now()
err := p.producer.Close()
if err != nil {
log.Error("Close Kafka DDL producer with error",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
} else {
log.Info("Kafka DDL producer closed",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)))
}
}()
start := time.Now()
err := p.producer.Close()
if err != nil {
log.Error("Close Kafka DDL producer with error",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
} else {
log.Info("Kafka DDL producer closed",
zap.String("namespace", p.id.Namespace),
zap.String("changefeed", p.id.ID),
zap.Duration("duration", time.Since(start)))
}
}

type saramaAsyncProducer struct {
Expand Down
9 changes: 1 addition & 8 deletions pkg/sink/kafka/sarama_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,12 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error)
return nil, err
}
config.MetricRegistry = f.registry

client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}

p, err := sarama.NewSyncProducerFromClient(client)
p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}
return &saramaSyncProducer{
id: f.changefeedID,
client: client,
producer: p,
}, nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/sink/kafka/v2/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
Expand Down Expand Up @@ -278,12 +279,13 @@ func (s *syncWriter) SendMessage(
topic string, partitionNum int32,
message *common.Message,
) error {
return s.w.WriteMessages(ctx, kafka.Message{
err := s.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partitionNum),
Key: message.Key,
Value: message.Value,
})
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
}

// SendMessages produces a given set of messages, and returns only when all
Expand All @@ -300,19 +302,17 @@ func (s *syncWriter) SendMessages(ctx context.Context, topic string, partitionNu
Partition: i,
}
}
return s.w.WriteMessages(ctx, msgs...)
err := s.w.WriteMessages(ctx, msgs...)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
}

// Close shuts down the producer; you must call this function before a producer
// object passes out of scope, as it may otherwise leak memory.
// You must call this before calling Close on the underlying client.
func (s *syncWriter) Close() {
log.Info("kafka sync producer start closing",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
start := time.Now()
if err := s.w.Close(); err != nil {
log.Warn("Close kafka sync producer failed",
log.Warn("Close kafka sync producer meet error",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Duration("duration", time.Since(start)),
Expand Down