From ef2a027604dacb468958fdac6f71115b52094c43 Mon Sep 17 00:00:00 2001 From: olalekan odukoya Date: Wed, 17 Jun 2026 12:15:43 +0100 Subject: [PATCH] fix(dataplane): fix batch replay oom --- api/api.go | 1 - api/handlers/event.go | 13 ++-- services/batch_replay_event.go | 59 ++++++++++++---- services/batch_replay_event_test.go | 103 +++++++++++++++++++++++++++- 4 files changed, 158 insertions(+), 18 deletions(-) diff --git a/api/api.go b/api/api.go index e792eeb213..cfef1d047e 100644 --- a/api/api.go +++ b/api/api.go @@ -343,7 +343,6 @@ func (a *ApplicationHandler) mountControlPlaneRoutes(router chi.Router, handler r.Post("/dynamic", handler.CreateDynamicEvent) }) - // Batch replay route (different middleware - no rate limiting) eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()). Post("/batchreplay", handler.BatchReplayEvents) diff --git a/api/handlers/event.go b/api/handlers/event.go index 99e11c6ec8..530a27d27c 100644 --- a/api/handlers/event.go +++ b/api/handlers/event.go @@ -358,10 +358,7 @@ func (h *Handler) BatchReplayEvents(w http.ResponseWriter, r *http.Request) { } data.Filter.Project = p - ep := datastore.Pageable{} - if data.Filter.Pageable == ep { - data.Filter.Pageable.PerPage = 2000000000 - } + data.Filter.Pageable = services.NormalizeBatchReplayPageable(data.Filter.Pageable) bs := services.BatchReplayEventService{ EndpointRepo: endpoints.New(h.A.Logger, h.A.DB), @@ -373,6 +370,14 @@ func (h *Handler) BatchReplayEvents(w http.ResponseWriter, r *http.Request) { successes, failures, err := bs.Run(r.Context()) if err != nil { + if successes > 0 || failures > 0 { + _ = render.Render(w, r, util.NewServerResponse( + fmt.Sprintf("%d successful, %d failed; batch replay incomplete: %s", successes, failures, err.Error()), + nil, + http.StatusInternalServerError, + )) + return + } _ = render.Render(w, r, util.NewServiceErrResponse(err)) return } diff --git a/services/batch_replay_event.go b/services/batch_replay_event.go index 3b08996c02..600298325e 100644 --- a/services/batch_replay_event.go +++ b/services/batch_replay_event.go @@ -2,12 +2,24 @@ package services import ( "context" + "fmt" "github.com/frain-dev/convoy/datastore" log "github.com/frain-dev/convoy/pkg/logger" "github.com/frain-dev/convoy/queue" ) +const BatchReplayPageSize = 1000 + +func NormalizeBatchReplayPageable(pageable datastore.Pageable) datastore.Pageable { + pageable.PerPage = BatchReplayPageSize + pageable.Direction = datastore.Next + pageable.NextCursor = "" + pageable.PrevCursor = "" + pageable.SetCursors() + return pageable +} + type BatchReplayEventService struct { EndpointRepo datastore.EndpointRepository Queue queue.Queuer @@ -18,11 +30,8 @@ type BatchReplayEventService struct { } func (e *BatchReplayEventService) Run(ctx context.Context) (int, int, error) { - events, _, err := e.EventRepo.LoadEventsPaged(ctx, e.Filter.Project.UID, e.Filter) - if err != nil { - e.Logger.ErrorContext(ctx, "failed to fetch events", "error", err) - return 0, 0, &ServiceError{ErrMsg: "failed to fetch event deliveries", Err: err} - } + filter := *e.Filter + filter.Pageable = NormalizeBatchReplayPageable(filter.Pageable) rs := ReplayEventService{ EndpointRepo: e.EndpointRepo, @@ -30,16 +39,42 @@ func (e *BatchReplayEventService) Run(ctx context.Context) (int, int, error) { Logger: e.Logger, } - failures := 0 - for _, ev := range events { - rs.Event = &ev - err = rs.Run(ctx) + successes, failures := 0, 0 + + for { + events, pagination, err := e.EventRepo.LoadEventsPaged(ctx, e.Filter.Project.UID, &filter) if err != nil { - failures++ - e.Logger.ErrorContext(ctx, "an item in the batch replay failed", "error", err) + e.Logger.ErrorContext(ctx, "failed to fetch events", "error", err, "successes", successes, "failures", failures) + errMsg := "failed to fetch event deliveries" + if successes > 0 || failures > 0 { + errMsg = fmt.Sprintf("batch replay incomplete after %d successful and %d failed replays", successes, failures) + } + return successes, failures, &ServiceError{ErrMsg: errMsg, Err: err} + } + + if len(events) == 0 { + break } + + pageFailures := 0 + for i := range events { + rs.Event = &events[i] + if err = rs.Run(ctx); err != nil { + pageFailures++ + e.Logger.ErrorContext(ctx, "an item in the batch replay failed", "error", err) + } + } + + successes += len(events) - pageFailures + failures += pageFailures + + if !pagination.HasNextPage { + break + } + + filter.Pageable.NextCursor = pagination.NextPageCursor + filter.Pageable.PrevCursor = pagination.PrevPageCursor } - successes := len(events) - failures return successes, failures, nil } diff --git a/services/batch_replay_event_test.go b/services/batch_replay_event_test.go index da35ce41fb..cba3958470 100644 --- a/services/batch_replay_event_test.go +++ b/services/batch_replay_event_test.go @@ -23,6 +23,40 @@ func provideBatchReplayEventService(ctrl *gomock.Controller, f *datastore.Filter } } +func TestNormalizeBatchReplayPageable(t *testing.T) { + t.Run("defaults empty pageable", func(t *testing.T) { + got := NormalizeBatchReplayPageable(datastore.Pageable{}) + require.Equal(t, BatchReplayPageSize, got.PerPage) + require.Equal(t, datastore.Next, got.Direction) + require.NotEmpty(t, got.NextCursor) + }) + + t.Run("caps oversized pageable", func(t *testing.T) { + got := NormalizeBatchReplayPageable(datastore.Pageable{PerPage: 2000000000}) + require.Equal(t, BatchReplayPageSize, got.PerPage) + }) + + t.Run("coerces invalid direction", func(t *testing.T) { + got := NormalizeBatchReplayPageable(datastore.Pageable{Direction: "invalid"}) + require.Equal(t, datastore.Next, got.Direction) + }) + + t.Run("resets list view pagination from dashboard batch replay", func(t *testing.T) { + got := NormalizeBatchReplayPageable(datastore.Pageable{ + PerPage: 20, + Sort: "DESC", + Direction: datastore.Next, + NextCursor: "01J5XKQWZ8YN3M4P2R6T9V1C7D", + }) + + require.Equal(t, BatchReplayPageSize, got.PerPage) + require.Equal(t, datastore.Next, got.Direction) + require.Equal(t, "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF", got.NextCursor) + require.Empty(t, got.PrevCursor) + require.Equal(t, "DESC", got.Sort) + }) +} + func TestBatchReplayEventService_Run(t *testing.T) { ctx := context.Background() @@ -99,6 +133,35 @@ func TestBatchReplayEventService_Run(t *testing.T) { wantErr: false, wantErrMsg: "", }, + { + name: "should_paginate_through_all_events", + dbFn: func(br *BatchReplayEventService) { + e, _ := br.EventRepo.(*mocks.MockEventRepository) + gomock.InOrder( + e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return( + []datastore.Event{{UID: "event1", ProjectID: "proj0"}}, + datastore.PaginationData{HasNextPage: true, NextPageCursor: "cursor-2"}, + nil, + ), + e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return( + []datastore.Event{{UID: "event2", ProjectID: "proj0"}}, + datastore.PaginationData{}, + nil, + ), + ) + + q, _ := br.Queue.(*mocks.MockQueuer) + q.EXPECT().Write(gomock.Any(), convoy.CreateEventProcessor, convoy.CreateEventQueue, gomock.Any()).Times(2).Return(nil) + }, + args: args{ + ctx: ctx, + f: &datastore.Filter{ + Project: &datastore.Project{UID: "1234"}, + }, + }, + wantSuccesses: 2, + wantFailures: 0, + }, { name: "should_fail_to_load_events", dbFn: func(br *BatchReplayEventService) { @@ -110,7 +173,7 @@ func TestBatchReplayEventService_Run(t *testing.T) { ) ml, _ := br.Logger.(*mocks.MockLogger) - ml.EXPECT().ErrorContext(gomock.Any(), "failed to fetch events", "error", gomock.Any()).Times(1) + ml.EXPECT().ErrorContext(gomock.Any(), "failed to fetch events", "error", gomock.Any(), "successes", 0, "failures", 0).Times(1) }, args: args{ ctx: ctx, @@ -121,6 +184,40 @@ func TestBatchReplayEventService_Run(t *testing.T) { wantErr: true, wantErrMsg: "failed to fetch event deliveries", }, + { + name: "should_return_partial_progress_when_later_page_fetch_fails", + dbFn: func(br *BatchReplayEventService) { + e, _ := br.EventRepo.(*mocks.MockEventRepository) + gomock.InOrder( + e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return( + []datastore.Event{{UID: "event1", ProjectID: "proj0"}}, + datastore.PaginationData{HasNextPage: true, NextPageCursor: "cursor-2"}, + nil, + ), + e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return( + []datastore.Event{}, + datastore.PaginationData{}, + errors.New("failed"), + ), + ) + + q, _ := br.Queue.(*mocks.MockQueuer) + q.EXPECT().Write(gomock.Any(), convoy.CreateEventProcessor, convoy.CreateEventQueue, gomock.Any()).Times(1).Return(nil) + + ml, _ := br.Logger.(*mocks.MockLogger) + ml.EXPECT().ErrorContext(gomock.Any(), "failed to fetch events", "error", gomock.Any(), "successes", 1, "failures", 0).Times(1) + }, + args: args{ + ctx: ctx, + f: &datastore.Filter{ + Project: &datastore.Project{UID: "1234"}, + }, + }, + wantSuccesses: 1, + wantFailures: 0, + wantErr: true, + wantErrMsg: "batch replay incomplete after 1 successful and 0 failed replays", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -137,6 +234,10 @@ func TestBatchReplayEventService_Run(t *testing.T) { if tt.wantErr { require.NotNil(t, err) require.Equal(t, tt.wantErrMsg, err.(*ServiceError).Error()) + if tt.wantSuccesses > 0 || tt.wantFailures > 0 { + require.Equal(t, tt.wantSuccesses, successes) + require.Equal(t, tt.wantFailures, failures) + } return }