Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func (a *ApplicationHandler) mountControlPlaneRoutes(router chi.Router, handler
r.Post("/dynamic", handler.CreateDynamicEvent)
})

// Batch replay route (different middleware - no rate limiting)
eventRouter.With(handler.RequireEnabledProject(), handler.RequireEnabledOrganisation()).
Post("/batchreplay", handler.BatchReplayEvents)

Expand Down
13 changes: 9 additions & 4 deletions api/handlers/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,7 @@ func (h *Handler) BatchReplayEvents(w http.ResponseWriter, r *http.Request) {
}

data.Filter.Project = p
ep := datastore.Pageable{}
if data.Filter.Pageable == ep {
data.Filter.Pageable.PerPage = 2000000000
}
data.Filter.Pageable = services.NormalizeBatchReplayPageable(data.Filter.Pageable)

bs := services.BatchReplayEventService{
EndpointRepo: endpoints.New(h.A.Logger, h.A.DB),
Expand All @@ -373,6 +370,14 @@ func (h *Handler) BatchReplayEvents(w http.ResponseWriter, r *http.Request) {

successes, failures, err := bs.Run(r.Context())
if err != nil {
if successes > 0 || failures > 0 {
_ = render.Render(w, r, util.NewServerResponse(
fmt.Sprintf("%d successful, %d failed; batch replay incomplete: %s", successes, failures, err.Error()),
nil,
http.StatusInternalServerError,
))
return
}
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}
Expand Down
59 changes: 47 additions & 12 deletions services/batch_replay_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@ package services

import (
"context"
"fmt"

"github.com/frain-dev/convoy/datastore"
log "github.com/frain-dev/convoy/pkg/logger"
"github.com/frain-dev/convoy/queue"
)

const BatchReplayPageSize = 1000

func NormalizeBatchReplayPageable(pageable datastore.Pageable) datastore.Pageable {
pageable.PerPage = BatchReplayPageSize
pageable.Direction = datastore.Next
pageable.NextCursor = ""
pageable.PrevCursor = ""
pageable.SetCursors()
return pageable
}

type BatchReplayEventService struct {
EndpointRepo datastore.EndpointRepository
Queue queue.Queuer
Expand All @@ -18,28 +30,51 @@ type BatchReplayEventService struct {
}

func (e *BatchReplayEventService) Run(ctx context.Context) (int, int, error) {
events, _, err := e.EventRepo.LoadEventsPaged(ctx, e.Filter.Project.UID, e.Filter)
if err != nil {
e.Logger.ErrorContext(ctx, "failed to fetch events", "error", err)
return 0, 0, &ServiceError{ErrMsg: "failed to fetch event deliveries", Err: err}
}
filter := *e.Filter
filter.Pageable = NormalizeBatchReplayPageable(filter.Pageable)

rs := ReplayEventService{
EndpointRepo: e.EndpointRepo,
Queue: e.Queue,
Logger: e.Logger,
}

failures := 0
for _, ev := range events {
rs.Event = &ev
err = rs.Run(ctx)
successes, failures := 0, 0

for {
events, pagination, err := e.EventRepo.LoadEventsPaged(ctx, e.Filter.Project.UID, &filter)
if err != nil {
failures++
e.Logger.ErrorContext(ctx, "an item in the batch replay failed", "error", err)
e.Logger.ErrorContext(ctx, "failed to fetch events", "error", err, "successes", successes, "failures", failures)
errMsg := "failed to fetch event deliveries"
if successes > 0 || failures > 0 {
errMsg = fmt.Sprintf("batch replay incomplete after %d successful and %d failed replays", successes, failures)
}
return successes, failures, &ServiceError{ErrMsg: errMsg, Err: err}
}

if len(events) == 0 {
break
}

pageFailures := 0
for i := range events {
rs.Event = &events[i]
if err = rs.Run(ctx); err != nil {
pageFailures++
e.Logger.ErrorContext(ctx, "an item in the batch replay failed", "error", err)
}
}

successes += len(events) - pageFailures
failures += pageFailures

if !pagination.HasNextPage {
break
}

filter.Pageable.NextCursor = pagination.NextPageCursor
filter.Pageable.PrevCursor = pagination.PrevPageCursor
}

successes := len(events) - failures
return successes, failures, nil
}
103 changes: 102 additions & 1 deletion services/batch_replay_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,40 @@ func provideBatchReplayEventService(ctrl *gomock.Controller, f *datastore.Filter
}
}

func TestNormalizeBatchReplayPageable(t *testing.T) {
t.Run("defaults empty pageable", func(t *testing.T) {
got := NormalizeBatchReplayPageable(datastore.Pageable{})
require.Equal(t, BatchReplayPageSize, got.PerPage)
require.Equal(t, datastore.Next, got.Direction)
require.NotEmpty(t, got.NextCursor)
})

t.Run("caps oversized pageable", func(t *testing.T) {
got := NormalizeBatchReplayPageable(datastore.Pageable{PerPage: 2000000000})
require.Equal(t, BatchReplayPageSize, got.PerPage)
})

t.Run("coerces invalid direction", func(t *testing.T) {
got := NormalizeBatchReplayPageable(datastore.Pageable{Direction: "invalid"})
require.Equal(t, datastore.Next, got.Direction)
})

t.Run("resets list view pagination from dashboard batch replay", func(t *testing.T) {
got := NormalizeBatchReplayPageable(datastore.Pageable{
PerPage: 20,
Sort: "DESC",
Direction: datastore.Next,
NextCursor: "01J5XKQWZ8YN3M4P2R6T9V1C7D",
})

require.Equal(t, BatchReplayPageSize, got.PerPage)
require.Equal(t, datastore.Next, got.Direction)
require.Equal(t, "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF", got.NextCursor)
require.Empty(t, got.PrevCursor)
require.Equal(t, "DESC", got.Sort)
})
}

func TestBatchReplayEventService_Run(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -99,6 +133,35 @@ func TestBatchReplayEventService_Run(t *testing.T) {
wantErr: false,
wantErrMsg: "",
},
{
name: "should_paginate_through_all_events",
dbFn: func(br *BatchReplayEventService) {
e, _ := br.EventRepo.(*mocks.MockEventRepository)
gomock.InOrder(
e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return(
[]datastore.Event{{UID: "event1", ProjectID: "proj0"}},
datastore.PaginationData{HasNextPage: true, NextPageCursor: "cursor-2"},
nil,
),
e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return(
[]datastore.Event{{UID: "event2", ProjectID: "proj0"}},
datastore.PaginationData{},
nil,
),
)

q, _ := br.Queue.(*mocks.MockQueuer)
q.EXPECT().Write(gomock.Any(), convoy.CreateEventProcessor, convoy.CreateEventQueue, gomock.Any()).Times(2).Return(nil)
},
args: args{
ctx: ctx,
f: &datastore.Filter{
Project: &datastore.Project{UID: "1234"},
},
},
wantSuccesses: 2,
wantFailures: 0,
},
{
name: "should_fail_to_load_events",
dbFn: func(br *BatchReplayEventService) {
Expand All @@ -110,7 +173,7 @@ func TestBatchReplayEventService_Run(t *testing.T) {
)

ml, _ := br.Logger.(*mocks.MockLogger)
ml.EXPECT().ErrorContext(gomock.Any(), "failed to fetch events", "error", gomock.Any()).Times(1)
ml.EXPECT().ErrorContext(gomock.Any(), "failed to fetch events", "error", gomock.Any(), "successes", 0, "failures", 0).Times(1)
},
args: args{
ctx: ctx,
Expand All @@ -121,6 +184,40 @@ func TestBatchReplayEventService_Run(t *testing.T) {
wantErr: true,
wantErrMsg: "failed to fetch event deliveries",
},
{
name: "should_return_partial_progress_when_later_page_fetch_fails",
dbFn: func(br *BatchReplayEventService) {
e, _ := br.EventRepo.(*mocks.MockEventRepository)
gomock.InOrder(
e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return(
[]datastore.Event{{UID: "event1", ProjectID: "proj0"}},
datastore.PaginationData{HasNextPage: true, NextPageCursor: "cursor-2"},
nil,
),
e.EXPECT().LoadEventsPaged(gomock.Any(), "1234", gomock.Any()).Times(1).Return(
[]datastore.Event{},
datastore.PaginationData{},
errors.New("failed"),
),
)

q, _ := br.Queue.(*mocks.MockQueuer)
q.EXPECT().Write(gomock.Any(), convoy.CreateEventProcessor, convoy.CreateEventQueue, gomock.Any()).Times(1).Return(nil)

ml, _ := br.Logger.(*mocks.MockLogger)
ml.EXPECT().ErrorContext(gomock.Any(), "failed to fetch events", "error", gomock.Any(), "successes", 1, "failures", 0).Times(1)
},
args: args{
ctx: ctx,
f: &datastore.Filter{
Project: &datastore.Project{UID: "1234"},
},
},
wantSuccesses: 1,
wantFailures: 0,
wantErr: true,
wantErrMsg: "batch replay incomplete after 1 successful and 0 failed replays",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -137,6 +234,10 @@ func TestBatchReplayEventService_Run(t *testing.T) {
if tt.wantErr {
require.NotNil(t, err)
require.Equal(t, tt.wantErrMsg, err.(*ServiceError).Error())
if tt.wantSuccesses > 0 || tt.wantFailures > 0 {
require.Equal(t, tt.wantSuccesses, successes)
require.Equal(t, tt.wantFailures, failures)
}
return
}

Expand Down
Loading