From bae8a59192162d51be5cc48836b4f5bcc9a522f9 Mon Sep 17 00:00:00 2001 From: gakki-Zone Date: Wed, 22 Apr 2026 21:59:47 +0800 Subject: [PATCH 1/2] fix: dedupe delayed dingtalk bot callbacks --- backend/pkg/bot/dingtalk/stream.go | 112 +++++++++++++++++--- backend/pkg/bot/dingtalk/stream_test.go | 133 ++++++++++++++++++++++++ 2 files changed, 228 insertions(+), 17 deletions(-) create mode 100644 backend/pkg/bot/dingtalk/stream_test.go diff --git a/backend/pkg/bot/dingtalk/stream.go b/backend/pkg/bot/dingtalk/stream.go index 47ec8a7b9..b71ce901b 100644 --- a/backend/pkg/bot/dingtalk/stream.go +++ b/backend/pkg/bot/dingtalk/stream.go @@ -39,7 +39,12 @@ type DingTalkClient struct { accessToken string expireAt time.Time } - tokenMutex sync.RWMutex + tokenMutex sync.RWMutex + messageMu sync.Mutex + messageSeenAt map[string]time.Time + messageTTL time.Duration + nowFunc func() time.Time + processMessageFn func(ctx context.Context, data *chatbot.BotCallbackDataModel) error } func NewDingTalkClient(ctx context.Context, cancel context.CancelFunc, clientId, clientSecret, templateID string, logger *log.Logger, getQA bot.GetQAFun) (*DingTalkClient, error) { @@ -54,17 +59,22 @@ func NewDingTalkClient(ctx context.Context, cancel context.CancelFunc, clientId, if err != nil { return nil, fmt.Errorf("failed to create card client: %w", err) } - return &DingTalkClient{ - ctx: ctx, - cancel: cancel, - clientID: clientId, - clientSecret: clientSecret, - templateID: templateID, - oauthClient: oauthClient, - cardClient: cardClient, - getQA: getQA, - logger: logger, - }, nil + client := &DingTalkClient{ + ctx: ctx, + cancel: cancel, + clientID: clientId, + clientSecret: clientSecret, + templateID: templateID, + oauthClient: oauthClient, + cardClient: cardClient, + getQA: getQA, + logger: logger, + messageSeenAt: make(map[string]time.Time), + messageTTL: 5 * time.Minute, + nowFunc: time.Now, + } + client.startMessageCleanup() + return client, nil } func (c *DingTalkClient) GetAccessToken() (string, error) { @@ -191,6 +201,53 @@ func (c *DingTalkClient) CreateAndDeliverCard(ctx context.Context, trackID strin return nil } +func (c *DingTalkClient) startMessageCleanup() { + go func() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.cleanupExpiredMessages() + } + } + }() +} + +func (c *DingTalkClient) cleanupExpiredMessages() { + now := c.nowFunc() + + c.messageMu.Lock() + defer c.messageMu.Unlock() + + for msgID, seenAt := range c.messageSeenAt { + if now.Sub(seenAt) > c.messageTTL { + delete(c.messageSeenAt, msgID) + } + } +} + +func (c *DingTalkClient) tryMarkMessage(msgID string) bool { + if strings.TrimSpace(msgID) == "" { + return true + } + + now := c.nowFunc() + + c.messageMu.Lock() + defer c.messageMu.Unlock() + + if seenAt, ok := c.messageSeenAt[msgID]; ok && now.Sub(seenAt) <= c.messageTTL { + return false + } + + c.messageSeenAt[msgID] = now + return true +} + func (c *DingTalkClient) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { select { case <-c.ctx.Done(): @@ -199,6 +256,27 @@ func (c *DingTalkClient) OnChatBotMessageReceived(ctx context.Context, data *cha default: } + if !c.tryMarkMessage(data.MsgId) { + c.logger.Info("ignore duplicate dingtalk message", log.String("msg_id", data.MsgId)) + return []byte(""), nil + } + + processor := c.processMessageFn + if processor == nil { + processor = c.processMessage + } + + payload := *data + go func() { + if err := processor(c.ctx, &payload); err != nil { + c.logger.Error("process dingtalk message failed", log.String("msg_id", payload.MsgId), log.Error(err)) + } + }() + + return []byte(""), nil +} + +func (c *DingTalkClient) processMessage(ctx context.Context, data *chatbot.BotCallbackDataModel) error { question := data.Text.Content question = strings.TrimSpace(question) trackID := uuid.New().String() @@ -207,14 +285,14 @@ func (c *DingTalkClient) OnChatBotMessageReceived(ctx context.Context, data *cha // create and deliver card if err := c.CreateAndDeliverCard(ctx, trackID, data); err != nil { c.logger.Error("CreateAndDeliverCard", log.Error(err)) - return nil, err + return err } initialContent := fmt.Sprintf("**%s**\n\n%s", question, "稍等,让我想一想……") if err := c.UpdateAIStreamCard(trackID, initialContent, false); err != nil { c.logger.Error("UpdateInteractiveCard", log.Error(err)) - return nil, nil + return nil } // 初始化 默认为空 convInfo := &domain.ConversationInfo{ @@ -245,7 +323,7 @@ func (c *DingTalkClient) OnChatBotMessageReceived(ctx context.Context, data *cha if err := c.UpdateAIStreamCard(trackID, "出错了,请稍后再试", true); err != nil { c.logger.Error("UpdateInteractiveCard in contentCh failed", log.Error(err)) } - return nil, nil + return nil } updateTicker := time.NewTicker(1500 * time.Millisecond) @@ -263,7 +341,7 @@ func (c *DingTalkClient) OnChatBotMessageReceived(ctx context.Context, data *cha c.logger.Error("UpdateInteractiveCard in contentCh failed", log.Error(err)) } } - return []byte(""), nil + return nil } fullContent += content case <-updateTicker.C: @@ -275,7 +353,7 @@ func (c *DingTalkClient) OnChatBotMessageReceived(ctx context.Context, data *cha if err := c.UpdateAIStreamCard(trackID, "出错了,请稍后再试", true); err != nil { c.logger.Error("UpdateInteractiveCard in ticker failed", log.Error(err)) } - return []byte(""), nil + return nil } } } diff --git a/backend/pkg/bot/dingtalk/stream_test.go b/backend/pkg/bot/dingtalk/stream_test.go new file mode 100644 index 000000000..a645f6bb6 --- /dev/null +++ b/backend/pkg/bot/dingtalk/stream_test.go @@ -0,0 +1,133 @@ +package dingtalk + +import ( + "context" + "io" + "log/slog" + "testing" + "time" + + "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pwlog "github.com/chaitin/panda-wiki/log" +) + +func newTestLogger() *pwlog.Logger { + return &pwlog.Logger{ + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } +} + +func newTestDingTalkClient(t *testing.T) *DingTalkClient { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + client, err := NewDingTalkClient( + ctx, + cancel, + "client-id", + "client-secret", + "template-id", + newTestLogger(), + nil, + ) + require.NoError(t, err) + + client.messageTTL = time.Minute + + return client +} + +func TestTryMarkMessageDeduplicatesWithinTTL(t *testing.T) { + client := newTestDingTalkClient(t) + + now := time.Now() + client.nowFunc = func() time.Time { + return now + } + + require.True(t, client.tryMarkMessage("msg-1")) + require.False(t, client.tryMarkMessage("msg-1")) + + now = now.Add(client.messageTTL + time.Second) + + require.True(t, client.tryMarkMessage("msg-1")) +} + +func TestOnChatBotMessageReceivedIgnoresDuplicateMsgID(t *testing.T) { + client := newTestDingTalkClient(t) + + processed := make(chan struct{}, 2) + client.processMessageFn = func(context.Context, *chatbot.BotCallbackDataModel) error { + processed <- struct{}{} + return nil + } + + data := &chatbot.BotCallbackDataModel{ + MsgId: "msg-1", + Text: chatbot.BotCallbackDataTextModel{ + Content: "hello", + }, + } + + resp, err := client.OnChatBotMessageReceived(context.Background(), data) + require.NoError(t, err) + assert.Equal(t, []byte(""), resp) + + resp, err = client.OnChatBotMessageReceived(context.Background(), data) + require.NoError(t, err) + assert.Equal(t, []byte(""), resp) + + select { + case <-processed: + case <-time.After(time.Second): + t.Fatal("expected first message to be processed") + } + + select { + case <-processed: + t.Fatal("expected duplicate message to be ignored") + case <-time.After(100 * time.Millisecond): + } +} + +func TestOnChatBotMessageReceivedReturnsBeforeProcessingCompletes(t *testing.T) { + client := newTestDingTalkClient(t) + + started := make(chan struct{}) + unblock := make(chan struct{}) + client.processMessageFn = func(context.Context, *chatbot.BotCallbackDataModel) error { + close(started) + <-unblock + return nil + } + + done := make(chan struct{}) + go func() { + _, _ = client.OnChatBotMessageReceived(context.Background(), &chatbot.BotCallbackDataModel{ + MsgId: "msg-2", + Text: chatbot.BotCallbackDataTextModel{ + Content: "slow question", + }, + }) + close(done) + }() + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatal("expected callback to return before background processing finishes") + } + + select { + case <-started: + case <-time.After(time.Second): + t.Fatal("expected background processing to start") + } + + close(unblock) +} From 0c41bb9d3749380b0c1b64533ff0225ebfe4f95f Mon Sep 17 00:00:00 2001 From: gakki-Zone Date: Wed, 22 Apr 2026 22:21:00 +0800 Subject: [PATCH 2/2] fix: harden dingtalk callback dedupe flow --- backend/pkg/bot/dingtalk/stream.go | 60 +++++++++++++++---- backend/pkg/bot/dingtalk/stream_test.go | 78 +++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 11 deletions(-) diff --git a/backend/pkg/bot/dingtalk/stream.go b/backend/pkg/bot/dingtalk/stream.go index b71ce901b..dfe6d703f 100644 --- a/backend/pkg/bot/dingtalk/stream.go +++ b/backend/pkg/bot/dingtalk/stream.go @@ -248,6 +248,28 @@ func (c *DingTalkClient) tryMarkMessage(msgID string) bool { return true } +func (c *DingTalkClient) markMessageCompleted(msgID string) { + if strings.TrimSpace(msgID) == "" { + return + } + + c.messageMu.Lock() + defer c.messageMu.Unlock() + + c.messageSeenAt[msgID] = c.nowFunc() +} + +func (c *DingTalkClient) clearMessageMark(msgID string) { + if strings.TrimSpace(msgID) == "" { + return + } + + c.messageMu.Lock() + defer c.messageMu.Unlock() + + delete(c.messageSeenAt, msgID) +} + func (c *DingTalkClient) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { select { case <-c.ctx.Done(): @@ -267,13 +289,26 @@ func (c *DingTalkClient) OnChatBotMessageReceived(ctx context.Context, data *cha } payload := *data - go func() { - if err := processor(c.ctx, &payload); err != nil { - c.logger.Error("process dingtalk message failed", log.String("msg_id", payload.MsgId), log.Error(err)) + go c.processMessageAsync(c.ctx, &payload, processor) + + return []byte(""), nil +} + +func (c *DingTalkClient) processMessageAsync(ctx context.Context, data *chatbot.BotCallbackDataModel, processor func(context.Context, *chatbot.BotCallbackDataModel) error) { + defer func() { + if r := recover(); r != nil { + c.clearMessageMark(data.MsgId) + c.logger.Error("process dingtalk message panicked", log.String("msg_id", data.MsgId), log.Any("panic", r)) } }() - return []byte(""), nil + if err := processor(ctx, data); err != nil { + c.clearMessageMark(data.MsgId) + c.logger.Error("process dingtalk message failed", log.String("msg_id", data.MsgId), log.Error(err)) + return + } + + c.markMessageCompleted(data.MsgId) } func (c *DingTalkClient) processMessage(ctx context.Context, data *chatbot.BotCallbackDataModel) error { @@ -292,7 +327,7 @@ func (c *DingTalkClient) processMessage(ctx context.Context, data *chatbot.BotCa if err := c.UpdateAIStreamCard(trackID, initialContent, false); err != nil { c.logger.Error("UpdateInteractiveCard", log.Error(err)) - return nil + return err } // 初始化 默认为空 convInfo := &domain.ConversationInfo{ @@ -320,8 +355,9 @@ func (c *DingTalkClient) processMessage(ctx context.Context, data *chatbot.BotCa contentCh, err := c.getQA(ctx, question, *convInfo, "") if err != nil { c.logger.Error("dingtalk client failed to get answer", log.Error(err)) - if err := c.UpdateAIStreamCard(trackID, "出错了,请稍后再试", true); err != nil { - c.logger.Error("UpdateInteractiveCard in contentCh failed", log.Error(err)) + if updateErr := c.UpdateAIStreamCard(trackID, "出错了,请稍后再试", true); updateErr != nil { + c.logger.Error("UpdateInteractiveCard in contentCh failed", log.Error(updateErr)) + return fmt.Errorf("get answer failed: %w; update error card failed: %w", err, updateErr) } return nil } @@ -337,8 +373,9 @@ func (c *DingTalkClient) processMessage(ctx context.Context, data *chatbot.BotCa if !ok { if err := c.UpdateAIStreamCard(trackID, fullContent, true); err != nil { c.logger.Error("UpdateInteractiveCard in contentCh", log.Error(err)) - if err := c.UpdateAIStreamCard(trackID, "出错了,请稍后再试", true); err != nil { - c.logger.Error("UpdateInteractiveCard in contentCh failed", log.Error(err)) + if updateErr := c.UpdateAIStreamCard(trackID, "出错了,请稍后再试", true); updateErr != nil { + c.logger.Error("UpdateInteractiveCard in contentCh failed", log.Error(updateErr)) + return fmt.Errorf("final update card failed: %w; fallback update failed: %w", err, updateErr) } } return nil @@ -350,8 +387,9 @@ func (c *DingTalkClient) processMessage(ctx context.Context, data *chatbot.BotCa } if err := c.UpdateAIStreamCard(trackID, fullContent, false); err != nil { c.logger.Error("UpdateInteractiveCard in ticker", log.Error(err)) - if err := c.UpdateAIStreamCard(trackID, "出错了,请稍后再试", true); err != nil { - c.logger.Error("UpdateInteractiveCard in ticker failed", log.Error(err)) + if updateErr := c.UpdateAIStreamCard(trackID, "出错了,请稍后再试", true); updateErr != nil { + c.logger.Error("UpdateInteractiveCard in ticker failed", log.Error(updateErr)) + return fmt.Errorf("stream update card failed: %w; fallback update failed: %w", err, updateErr) } return nil } diff --git a/backend/pkg/bot/dingtalk/stream_test.go b/backend/pkg/bot/dingtalk/stream_test.go index a645f6bb6..1e63589d7 100644 --- a/backend/pkg/bot/dingtalk/stream_test.go +++ b/backend/pkg/bot/dingtalk/stream_test.go @@ -131,3 +131,81 @@ func TestOnChatBotMessageReceivedReturnsBeforeProcessingCompletes(t *testing.T) close(unblock) } + +func TestOnChatBotMessageReceivedAllowsRetryAfterProcessingError(t *testing.T) { + client := newTestDingTalkClient(t) + + attempts := make(chan struct{}, 4) + client.processMessageFn = func(context.Context, *chatbot.BotCallbackDataModel) error { + attempts <- struct{}{} + return assert.AnError + } + + data := &chatbot.BotCallbackDataModel{ + MsgId: "msg-retry", + Text: chatbot.BotCallbackDataTextModel{ + Content: "retry please", + }, + } + + resp, err := client.OnChatBotMessageReceived(context.Background(), data) + require.NoError(t, err) + assert.Equal(t, []byte(""), resp) + + select { + case <-attempts: + case <-time.After(time.Second): + t.Fatal("expected first message to be processed") + } + + require.Eventually(t, func() bool { + _, callErr := client.OnChatBotMessageReceived(context.Background(), data) + require.NoError(t, callErr) + + select { + case <-attempts: + return true + default: + return false + } + }, time.Second, 20*time.Millisecond) +} + +func TestOnChatBotMessageReceivedRecoversBackgroundPanic(t *testing.T) { + client := newTestDingTalkClient(t) + + attempts := make(chan struct{}, 4) + client.processMessageFn = func(context.Context, *chatbot.BotCallbackDataModel) error { + attempts <- struct{}{} + panic("boom") + } + + data := &chatbot.BotCallbackDataModel{ + MsgId: "msg-panic", + Text: chatbot.BotCallbackDataTextModel{ + Content: "panic please", + }, + } + + resp, err := client.OnChatBotMessageReceived(context.Background(), data) + require.NoError(t, err) + assert.Equal(t, []byte(""), resp) + + select { + case <-attempts: + case <-time.After(time.Second): + t.Fatal("expected background processing to start") + } + + require.Eventually(t, func() bool { + _, callErr := client.OnChatBotMessageReceived(context.Background(), data) + require.NoError(t, callErr) + + select { + case <-attempts: + return true + default: + return false + } + }, 200*time.Millisecond, 20*time.Millisecond) +}