From e5618557e6c1c55cfea5727f6deb250b6681f9d1 Mon Sep 17 00:00:00 2001 From: Dunsin Date: Wed, 20 May 2026 22:16:44 +0100 Subject: [PATCH 1/6] feat(live-store): add ScheduledTasksStore for cross-instance dedup --- internal/core/ports/live_store.go | 7 ++ .../live-store/inmemory/scheduled_tasks.go | 43 +++++++++++ .../live-store/inmemory/store.go | 5 ++ .../live-store/live_store_test.go | 73 +++++++++++++++++++ .../infrastructure/live-store/redis/round.go | 7 +- .../live-store/redis/scheduled_tasks.go | 43 +++++++++++ .../infrastructure/live-store/redis/store.go | 5 ++ 7 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 internal/infrastructure/live-store/inmemory/scheduled_tasks.go create mode 100644 internal/infrastructure/live-store/redis/scheduled_tasks.go diff --git a/internal/core/ports/live_store.go b/internal/core/ports/live_store.go index 86f49fb03..064825a47 100644 --- a/internal/core/ports/live_store.go +++ b/internal/core/ports/live_store.go @@ -17,6 +17,7 @@ type LiveStore interface { ConfirmationSessions() ConfirmationSessionsStore TreeSigingSessions() TreeSigningSessionsStore BoardingInputs() BoardingInputsStore + ScheduledTasks() ScheduledTasksStore } type IntentStore interface { @@ -89,6 +90,12 @@ type BoardingInputsStore interface { DeleteSignatures(ctx context.Context, batchId string) error } +type ScheduledTasksStore interface { + AddIfAbsent(ctx context.Context, id string) (bool, error) + Remove(ctx context.Context, id string) error + Has(ctx context.Context, id string) (bool, error) +} + type TimedIntent struct { domain.Intent BoardingInputs []BoardingInput diff --git a/internal/infrastructure/live-store/inmemory/scheduled_tasks.go b/internal/infrastructure/live-store/inmemory/scheduled_tasks.go new file mode 100644 index 000000000..ab0b8ab31 --- /dev/null +++ b/internal/infrastructure/live-store/inmemory/scheduled_tasks.go @@ -0,0 +1,43 @@ +package inmemorylivestore + +import ( + "context" + "sync" + + "github.com/arkade-os/arkd/internal/core/ports" +) + +type scheduledTasksStore struct { + lock sync.Mutex + ids map[string]struct{} +} + +func NewScheduledTasksStore() ports.ScheduledTasksStore { + return &scheduledTasksStore{ + ids: make(map[string]struct{}), + } +} + +func (s *scheduledTasksStore) AddIfAbsent(_ context.Context, id string) (bool, error) { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.ids[id]; ok { + return false, nil + } + s.ids[id] = struct{}{} + return true, nil +} + +func (s *scheduledTasksStore) Remove(_ context.Context, id string) error { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.ids, id) + return nil +} + +func (s *scheduledTasksStore) Has(_ context.Context, id string) (bool, error) { + s.lock.Lock() + defer s.lock.Unlock() + _, ok := s.ids[id] + return ok, nil +} diff --git a/internal/infrastructure/live-store/inmemory/store.go b/internal/infrastructure/live-store/inmemory/store.go index 40e7bb85c..aa941f06f 100644 --- a/internal/infrastructure/live-store/inmemory/store.go +++ b/internal/infrastructure/live-store/inmemory/store.go @@ -12,6 +12,7 @@ type inMemoryLiveStore struct { confirmationSessionsStore ports.ConfirmationSessionsStore treeSigningSessions ports.TreeSigningSessionsStore boardingInputsStore ports.BoardingInputsStore + scheduledTasksStore ports.ScheduledTasksStore } func NewLiveStore(txBuilder ports.TxBuilder) ports.LiveStore { @@ -23,6 +24,7 @@ func NewLiveStore(txBuilder ports.TxBuilder) ports.LiveStore { confirmationSessionsStore: NewConfirmationSessionsStore(), treeSigningSessions: NewTreeSigningSessionsStore(), boardingInputsStore: NewBoardingInputsStore(), + scheduledTasksStore: NewScheduledTasksStore(), } } @@ -47,3 +49,6 @@ func (s *inMemoryLiveStore) TreeSigingSessions() ports.TreeSigningSessionsStore func (s *inMemoryLiveStore) BoardingInputs() ports.BoardingInputsStore { return s.boardingInputsStore } +func (s *inMemoryLiveStore) ScheduledTasks() ports.ScheduledTasksStore { + return s.scheduledTasksStore +} diff --git a/internal/infrastructure/live-store/live_store_test.go b/internal/infrastructure/live-store/live_store_test.go index c9e35b8e3..4f7b01319 100644 --- a/internal/infrastructure/live-store/live_store_test.go +++ b/internal/infrastructure/live-store/live_store_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -764,6 +765,78 @@ func runLiveStoreTests(t *testing.T, store ports.LiveStore) { require.NoError(t, err) require.Empty(t, gotSigs) }) + t.Run("ScheduledTasksStore", func(t *testing.T) { + ctx := t.Context() + + // AddIfAbsent: first call claims the id. + claimed, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc") + require.NoError(t, err) + require.True(t, claimed, "first AddIfAbsent should succeed and return true") + + has, err := store.ScheduledTasks().Has(ctx, "tx-abc") + require.NoError(t, err) + require.True(t, has) + + // AddIfAbsent: second call with the same id loses the race + secondClaim, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc") + require.NoError(t, err) + require.False(t, secondClaim, "second AddIfAbsent for the same id must return false, not an error") + + // Has: unknown id is false. + has, err = store.ScheduledTasks().Has(ctx, "never-added") + require.NoError(t, err) + require.False(t, has) + + // Remove: releases the claim. + err = store.ScheduledTasks().Remove(ctx, "tx-abc") + require.NoError(t, err) + + has, err = store.ScheduledTasks().Has(ctx, "tx-abc") + require.NoError(t, err) + require.False(t, has, "Has must reflect Remove") + + // Remove: idempotent — unknown id is a no-op. + err = store.ScheduledTasks().Remove(ctx, "never-added") + require.NoError(t, err) + + // After Remove, the same id can be re-claimed. + claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc") + require.NoError(t, err) + require.True(t, claimed) + + require.NoError(t, store.ScheduledTasks().Remove(ctx, "tx-abc")) + + // Concurrent AddIfAbsent on the same id must produce exactly one + // winner. This is the load-bearing property of the whole fix: + // without atomicity, two arkd processes could both claim the same + // task and both broadcast the same sweep tx. + const goroutines = 100 + var wins atomic.Int32 + var wg sync.WaitGroup + start := make(chan struct{}) + + for range goroutines { + wg.Add(1) + go func() { + defer wg.Done() + <-start + claimed, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-race") + require.NoError(t, err) + if claimed { + wins.Add(1) + } + }() + } + + close(start) + wg.Wait() + + require.Equal(t, int32(1), wins.Load(), + "AddIfAbsent must be atomic: exactly one goroutine claims the id") + + require.NoError(t, store.ScheduledTasks().Remove(ctx, "tx-race")) + + }) } type intentPushFixture struct { diff --git a/internal/infrastructure/live-store/redis/round.go b/internal/infrastructure/live-store/redis/round.go index e8c69ae78..5efe9c0aa 100644 --- a/internal/infrastructure/live-store/redis/round.go +++ b/internal/infrastructure/live-store/redis/round.go @@ -14,9 +14,10 @@ import ( ) const ( - currentRoundKey = "currentRoundStore:round" - boardingInputsKey = "boardingInputsStore:numOfInputs" - boardingInputSigsKey = "boardingInputsStore:signatures" + currentRoundKey = "currentRoundStore:round" + boardingInputsKey = "boardingInputsStore:numOfInputs" + boardingInputSigsKey = "boardingInputsStore:signatures" + scheduledTaskKeyPrefix = "scheduled_task" ) type currentRoundStore struct { diff --git a/internal/infrastructure/live-store/redis/scheduled_tasks.go b/internal/infrastructure/live-store/redis/scheduled_tasks.go new file mode 100644 index 000000000..051c20342 --- /dev/null +++ b/internal/infrastructure/live-store/redis/scheduled_tasks.go @@ -0,0 +1,43 @@ +package redislivestore + +import ( + "context" + "fmt" + + "github.com/arkade-os/arkd/internal/core/ports" + "github.com/redis/go-redis/v9" +) + + +type scheduledTasksStore struct { + rdb *redis.Client +} + +func NewScheduledTasksStore(rdb *redis.Client) ports.ScheduledTasksStore { + return &scheduledTasksStore{rdb: rdb} +} + +func (s *scheduledTasksStore) AddIfAbsent(ctx context.Context, id string) (bool, error) { + // SETNX is atomic on the Redis server: returns true iff this call set + // the key. Multiple arkd processes racing to claim the same task id + // will see exactly one true and the rest false. + return s.rdb.SetNX(ctx, scheduledTaskKey(id), "1", 0).Result() +} + +func (s *scheduledTasksStore) Remove(ctx context.Context, id string) error { + // Del returns the number of keys removed; we don't care if it was 0 + // (Remove is idempotent per the interface contract). + return s.rdb.Del(ctx, scheduledTaskKey(id)).Err() +} + +func (s *scheduledTasksStore) Has(ctx context.Context, id string) (bool, error) { + n, err := s.rdb.Exists(ctx, scheduledTaskKey(id)).Result() + if err != nil { + return false, err + } + return n > 0, nil +} + +func scheduledTaskKey(id string) string { + return fmt.Sprintf("%s:%s", scheduledTaskKeyPrefix, id) +} diff --git a/internal/infrastructure/live-store/redis/store.go b/internal/infrastructure/live-store/redis/store.go index 306c32cc5..6491b237f 100644 --- a/internal/infrastructure/live-store/redis/store.go +++ b/internal/infrastructure/live-store/redis/store.go @@ -13,6 +13,7 @@ type redisLiveStore struct { confirmationSessionsStore ports.ConfirmationSessionsStore treeSigningSessions ports.TreeSigningSessionsStore boardingInputsStore ports.BoardingInputsStore + scheduledTasksStore ports.ScheduledTasksStore } func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int) ports.LiveStore { @@ -24,6 +25,7 @@ func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int) confirmationSessionsStore: NewConfirmationSessionsStore(rdb, numOfRetries), treeSigningSessions: NewTreeSigningSessionsStore(rdb, numOfRetries), boardingInputsStore: NewBoardingInputsStore(rdb, numOfRetries), + scheduledTasksStore: NewScheduledTasksStore(rdb), } } @@ -40,3 +42,6 @@ func (s *redisLiveStore) TreeSigingSessions() ports.TreeSigningSessionsStore { func (s *redisLiveStore) BoardingInputs() ports.BoardingInputsStore { return s.boardingInputsStore } +func (s *redisLiveStore) ScheduledTasks() ports.ScheduledTasksStore { + return s.scheduledTasksStore +} From 68d09467edfad943327a5daec5fb8fb0c02f842c Mon Sep 17 00:00:00 2001 From: Dunsin Date: Thu, 21 May 2026 00:36:55 +0100 Subject: [PATCH 2/6] fix(sweeper): use shared ScheduledTasksStore for task dedup --- internal/core/application/service.go | 11 +++- internal/core/application/sweeper.go | 55 ++++++++++--------- .../infrastructure/live-store/redis/round.go | 2 +- 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/internal/core/application/service.go b/internal/core/application/service.go index 7959fa386..9b4fd8d45 100644 --- a/internal/core/application/service.go +++ b/internal/core/application/service.go @@ -197,7 +197,7 @@ func NewService( cache: cache, scanner: scanner, sweeper: newSweeper( - wallet, repoManager, builder, scheduler, noteUriPrefix, + wallet, repoManager, builder, scheduler, noteUriPrefix, cache, ), boardingExitDelay: boardingExitDelay, operatorPrvkey: operatorSigningKey, @@ -3376,8 +3376,13 @@ func (s *service) listenToScannerNotifications() { // remove sweeper task for the associated checkpoint outputs for _, in := range ptx.UnsignedTx.TxIn { taskId := in.PreviousOutPoint.Hash.String() - s.sweeper.removeTask(taskId) - log.Debugf("sweeper: unscheduled task for tx %s", taskId) + if err := s.sweeper.removeTask(taskId); err != nil { + log.WithError(err).Warnf( + "sweeper: failed to unschedule task for tx %s", taskId, + ) + } else { + log.Debugf("sweeper: unscheduled task for tx %s", taskId) + } } }() } diff --git a/internal/core/application/sweeper.go b/internal/core/application/sweeper.go index 20e51ffb7..f7a26ff8f 100644 --- a/internal/core/application/sweeper.go +++ b/internal/core/application/sweeper.go @@ -37,26 +37,21 @@ type sweeper struct { scheduler ports.SchedulerService noteUriPrefix string - - // cache of scheduled tasks, avoid scheduling the same sweep event multiple times - locker *sync.Mutex - // TODO move the scheduled task map to LiveStore port - scheduledTasks map[string]struct{} - ctx context.Context + cache ports.LiveStore + ctx context.Context } func newSweeper( wallet ports.WalletService, repoManager ports.RepoManager, builder ports.TxBuilder, - scheduler ports.SchedulerService, noteUriPrefix string, + scheduler ports.SchedulerService, noteUriPrefix string, cache ports.LiveStore, ) *sweeper { return &sweeper{ wallet, repoManager, builder, scheduler, - noteUriPrefix, &sync.Mutex{}, make(map[string]struct{}), nil, + noteUriPrefix, cache, nil, } } func (s *sweeper) start(ctx context.Context) error { - s.scheduledTasks = make(map[string]struct{}) s.scheduler.Start() s.ctx = ctx @@ -209,11 +204,11 @@ func (s *sweeper) stop() { s.scheduler.Stop() } -// removeTask update the cached map of scheduled tasks -func (s *sweeper) removeTask(id string) { - s.locker.Lock() - defer s.locker.Unlock() - delete(s.scheduledTasks, id) +// removeTask releases the claim on a scheduled task id. Callers use this +// to cancel a pending sweep (e.g. when the underlying tx is observed +// spent before the sweep fires). +func (s *sweeper) removeTask(id string) error { + return s.cache.ScheduledTasks().Remove(s.ctx, id) } func (s *sweeper) scheduleCheckpointSweep( @@ -387,29 +382,37 @@ func (s *sweeper) scheduleTask(task sweeperTask) error { return task.execute() } - s.locker.Lock() - defer s.locker.Unlock() - - if _, scheduled := s.scheduledTasks[task.id]; scheduled { + claimed, err := s.cache.ScheduledTasks().AddIfAbsent(s.ctx, task.id) + if err != nil { + return fmt.Errorf("failed to claim scheduled task %s: %w", task.id, err) + } + if !claimed { + // another instance (or this one earlier) already claimed it return nil } - s.scheduledTasks[task.id] = struct{}{} - return s.scheduler.ScheduleTaskOnce(task.at, func() { - // check if the task is still scheduled before executing it - s.locker.Lock() - if _, scheduled := s.scheduledTasks[task.id]; !scheduled { + // Cancellation check: external code may have called removeTask + // because the underlying tx was already spent. If the claim is + // gone, skip execution. + has, err := s.cache.ScheduledTasks().Has(s.ctx, task.id) + if err != nil { + log.WithError(err).Errorf( + "sweeper: failed to check scheduled task %s, proceeding anyway", task.id, + ) + } else if !has { log.Debugf( "sweeper: task for sweeping tx %s has been unscheduled, nothing left to do", task.id, ) - s.locker.Unlock() return } - s.locker.Unlock() - s.removeTask(task.id) + if err := s.removeTask(task.id); err != nil { + log.WithError(err).Errorf( + "sweeper: failed to release scheduled task %s", task.id, + ) + } if err := task.execute(); err != nil { log.WithError(err).Errorf("failed to execute sweep of tx %s", task.id) diff --git a/internal/infrastructure/live-store/redis/round.go b/internal/infrastructure/live-store/redis/round.go index 5efe9c0aa..304bfc29f 100644 --- a/internal/infrastructure/live-store/redis/round.go +++ b/internal/infrastructure/live-store/redis/round.go @@ -17,7 +17,7 @@ const ( currentRoundKey = "currentRoundStore:round" boardingInputsKey = "boardingInputsStore:numOfInputs" boardingInputSigsKey = "boardingInputsStore:signatures" - scheduledTaskKeyPrefix = "scheduled_task" + scheduledTaskKeyPrefix = "scheduledTasksStore:task" ) type currentRoundStore struct { From 149209086ae48960459e15d7d380f883cf6abe15 Mon Sep 17 00:00:00 2001 From: Dunsin Date: Thu, 21 May 2026 14:59:53 +0100 Subject: [PATCH 3/6] fix(sweeper): prevent missed and duplicate sweeps across restarts --- internal/core/application/sweeper.go | 36 ++++++++++------ internal/core/ports/live_store.go | 1 + .../live-store/inmemory/scheduled_tasks.go | 7 ++++ .../live-store/live_store_test.go | 42 ++++++++++++++++--- .../live-store/redis/scheduled_tasks.go | 32 +++++++++++--- 5 files changed, 94 insertions(+), 24 deletions(-) diff --git a/internal/core/application/sweeper.go b/internal/core/application/sweeper.go index f7a26ff8f..898169c8f 100644 --- a/internal/core/application/sweeper.go +++ b/internal/core/application/sweeper.go @@ -52,6 +52,10 @@ func newSweeper( } func (s *sweeper) start(ctx context.Context) error { + if err := s.cache.ScheduledTasks().Clear(ctx); err != nil { + return fmt.Errorf("sweeper: failed to clear stale scheduled task claims: %w", err) + } + s.scheduler.Start() s.ctx = ctx @@ -316,11 +320,9 @@ func (s *sweeper) scheduleCheckpointSweep( vtxo, ) - // if the sweep checkpoint tapscript is available, execute the task immediately - if !s.scheduler.AfterNow(sweepAt) { - return execute() - } - + // scheduleTask handles the in-past case internally — calling it + // unconditionally keeps the cross-instance claim path consistent + // for both overdue and future checkpoint sweeps. if err := s.scheduleTask(sweeperTask{ id: checkpointTxid.String(), at: sweepAt, @@ -374,14 +376,9 @@ func (s *sweeper) scheduleTask(task sweeperTask) error { return nil } - if !s.scheduler.AfterNow(task.at) { - log.Debugf( - "sweeper: trying to schedule task in the past for tx %s, executing it immediately", - task.id, - ) - return task.execute() - } - + // Claim first, before the in-past fast-path: otherwise two instances + // recovering the same overdue task on restart would both fall through + // to execute() and both broadcast the same sweep tx. claimed, err := s.cache.ScheduledTasks().AddIfAbsent(s.ctx, task.id) if err != nil { return fmt.Errorf("failed to claim scheduled task %s: %w", task.id, err) @@ -391,6 +388,19 @@ func (s *sweeper) scheduleTask(task sweeperTask) error { return nil } + if !s.scheduler.AfterNow(task.at) { + log.Debugf( + "sweeper: trying to schedule task in the past for tx %s, executing it immediately", + task.id, + ) + if err := s.removeTask(task.id); err != nil { + log.WithError(err).Errorf( + "sweeper: failed to release scheduled task %s", task.id, + ) + } + return task.execute() + } + return s.scheduler.ScheduleTaskOnce(task.at, func() { // Cancellation check: external code may have called removeTask // because the underlying tx was already spent. If the claim is diff --git a/internal/core/ports/live_store.go b/internal/core/ports/live_store.go index 064825a47..2cee1556e 100644 --- a/internal/core/ports/live_store.go +++ b/internal/core/ports/live_store.go @@ -94,6 +94,7 @@ type ScheduledTasksStore interface { AddIfAbsent(ctx context.Context, id string) (bool, error) Remove(ctx context.Context, id string) error Has(ctx context.Context, id string) (bool, error) + Clear(ctx context.Context) error } type TimedIntent struct { diff --git a/internal/infrastructure/live-store/inmemory/scheduled_tasks.go b/internal/infrastructure/live-store/inmemory/scheduled_tasks.go index ab0b8ab31..314a4609e 100644 --- a/internal/infrastructure/live-store/inmemory/scheduled_tasks.go +++ b/internal/infrastructure/live-store/inmemory/scheduled_tasks.go @@ -41,3 +41,10 @@ func (s *scheduledTasksStore) Has(_ context.Context, id string) (bool, error) { _, ok := s.ids[id] return ok, nil } + +func (s *scheduledTasksStore) Clear(_ context.Context) error { + s.lock.Lock() + defer s.lock.Unlock() + s.ids = make(map[string]struct{}) + return nil +} diff --git a/internal/infrastructure/live-store/live_store_test.go b/internal/infrastructure/live-store/live_store_test.go index 4f7b01319..b2ee87d26 100644 --- a/internal/infrastructure/live-store/live_store_test.go +++ b/internal/infrastructure/live-store/live_store_test.go @@ -814,28 +814,60 @@ func runLiveStoreTests(t *testing.T, store ports.LiveStore) { var wins atomic.Int32 var wg sync.WaitGroup start := make(chan struct{}) + errCh := make(chan error, goroutines) for range goroutines { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { <-start claimed, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-race") - require.NoError(t, err) + if err != nil { + errCh <- err + return + } if claimed { wins.Add(1) } - }() + }) } close(start) wg.Wait() + close(errCh) + for err := range errCh { + require.NoError(t, err) + } require.Equal(t, int32(1), wins.Load(), "AddIfAbsent must be atomic: exactly one goroutine claims the id") require.NoError(t, store.ScheduledTasks().Remove(ctx, "tx-race")) + // Clear: wipes every claim so the ids can be claimed again. This is + // what sweeper.start() uses on boot to recover from a crash where + // the in-process timer died but the Redis claim survived. + _, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-1") + require.NoError(t, err) + _, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-2") + require.NoError(t, err) + + require.NoError(t, store.ScheduledTasks().Clear(ctx)) + + has, err = store.ScheduledTasks().Has(ctx, "tx-clear-1") + require.NoError(t, err) + require.False(t, has, "Clear must remove every claim") + has, err = store.ScheduledTasks().Has(ctx, "tx-clear-2") + require.NoError(t, err) + require.False(t, has, "Clear must remove every claim") + + // Re-claim proves the slot was actually freed, not just hidden. + claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-1") + require.NoError(t, err) + require.True(t, claimed) + claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-2") + require.NoError(t, err) + require.True(t, claimed) + + require.NoError(t, store.ScheduledTasks().Clear(ctx)) }) } diff --git a/internal/infrastructure/live-store/redis/scheduled_tasks.go b/internal/infrastructure/live-store/redis/scheduled_tasks.go index 051c20342..c73023677 100644 --- a/internal/infrastructure/live-store/redis/scheduled_tasks.go +++ b/internal/infrastructure/live-store/redis/scheduled_tasks.go @@ -3,11 +3,13 @@ package redislivestore import ( "context" "fmt" + "time" "github.com/arkade-os/arkd/internal/core/ports" "github.com/redis/go-redis/v9" ) +const scheduledTaskTTL = 30 * 24 * time.Hour type scheduledTasksStore struct { rdb *redis.Client @@ -18,15 +20,11 @@ func NewScheduledTasksStore(rdb *redis.Client) ports.ScheduledTasksStore { } func (s *scheduledTasksStore) AddIfAbsent(ctx context.Context, id string) (bool, error) { - // SETNX is atomic on the Redis server: returns true iff this call set - // the key. Multiple arkd processes racing to claim the same task id - // will see exactly one true and the rest false. - return s.rdb.SetNX(ctx, scheduledTaskKey(id), "1", 0).Result() + return s.rdb.SetNX(ctx, scheduledTaskKey(id), "1", scheduledTaskTTL).Result() } func (s *scheduledTasksStore) Remove(ctx context.Context, id string) error { - // Del returns the number of keys removed; we don't care if it was 0 - // (Remove is idempotent per the interface contract). + // Safe to call when the key isn't there — Del returns 0, not an error. return s.rdb.Del(ctx, scheduledTaskKey(id)).Err() } @@ -38,6 +36,28 @@ func (s *scheduledTasksStore) Has(ctx context.Context, id string) (bool, error) return n > 0, nil } +func (s *scheduledTasksStore) Clear(ctx context.Context) error { + pattern := scheduledTaskKeyPrefix + ":*" + iter := s.rdb.Scan(ctx, 0, pattern, 100).Iterator() + var batch []string + for iter.Next(ctx) { + batch = append(batch, iter.Val()) + if len(batch) >= 100 { + if err := s.rdb.Del(ctx, batch...).Err(); err != nil { + return err + } + batch = batch[:0] + } + } + if err := iter.Err(); err != nil { + return err + } + if len(batch) > 0 { + return s.rdb.Del(ctx, batch...).Err() + } + return nil +} + func scheduledTaskKey(id string) string { return fmt.Sprintf("%s:%s", scheduledTaskKeyPrefix, id) } From ea70ffc755d88d192edb24f183770f5c509011eb Mon Sep 17 00:00:00 2001 From: Dunsin Date: Thu, 21 May 2026 20:53:37 +0100 Subject: [PATCH 4/6] fix(sweeper): claim slot before running, release only after --- internal/core/application/sweeper.go | 34 +++++++-------- .../live-store/live_store_test.go | 41 +------------------ 2 files changed, 16 insertions(+), 59 deletions(-) diff --git a/internal/core/application/sweeper.go b/internal/core/application/sweeper.go index 898169c8f..751b60fd6 100644 --- a/internal/core/application/sweeper.go +++ b/internal/core/application/sweeper.go @@ -321,8 +321,8 @@ func (s *sweeper) scheduleCheckpointSweep( ) // scheduleTask handles the in-past case internally — calling it - // unconditionally keeps the cross-instance claim path consistent - // for both overdue and future checkpoint sweeps. + // unconditionally keeps both overdue and future checkpoint sweeps + // going through the same claim path. if err := s.scheduleTask(sweeperTask{ id: checkpointTxid.String(), at: sweepAt, @@ -376,35 +376,35 @@ func (s *sweeper) scheduleTask(task sweeperTask) error { return nil } - // Claim first, before the in-past fast-path: otherwise two instances - // recovering the same overdue task on restart would both fall through - // to execute() and both broadcast the same sweep tx. + // Claim the slot first so no other caller can pick up the same id. claimed, err := s.cache.ScheduledTasks().AddIfAbsent(s.ctx, task.id) if err != nil { return fmt.Errorf("failed to claim scheduled task %s: %w", task.id, err) } if !claimed { - // another instance (or this one earlier) already claimed it return nil } - if !s.scheduler.AfterNow(task.at) { - log.Debugf( - "sweeper: trying to schedule task in the past for tx %s, executing it immediately", - task.id, - ) + // Release only after execute() finishes, so the slot stays held while the sweep runs. + releaseClaim := func() { if err := s.removeTask(task.id); err != nil { log.WithError(err).Errorf( "sweeper: failed to release scheduled task %s", task.id, ) } + } + + if !s.scheduler.AfterNow(task.at) { + log.Debugf( + "sweeper: trying to schedule task in the past for tx %s, executing it immediately", + task.id, + ) + defer releaseClaim() return task.execute() } return s.scheduler.ScheduleTaskOnce(task.at, func() { - // Cancellation check: external code may have called removeTask - // because the underlying tx was already spent. If the claim is - // gone, skip execution. + // Skip if the claim was released externally (e.g. the tx was already spent). has, err := s.cache.ScheduledTasks().Has(s.ctx, task.id) if err != nil { log.WithError(err).Errorf( @@ -418,11 +418,7 @@ func (s *sweeper) scheduleTask(task sweeperTask) error { return } - if err := s.removeTask(task.id); err != nil { - log.WithError(err).Errorf( - "sweeper: failed to release scheduled task %s", task.id, - ) - } + defer releaseClaim() if err := task.execute(); err != nil { log.WithError(err).Errorf("failed to execute sweep of tx %s", task.id) diff --git a/internal/infrastructure/live-store/live_store_test.go b/internal/infrastructure/live-store/live_store_test.go index b2ee87d26..121682bbc 100644 --- a/internal/infrastructure/live-store/live_store_test.go +++ b/internal/infrastructure/live-store/live_store_test.go @@ -7,7 +7,6 @@ import ( "encoding/json" "fmt" "sync" - "sync/atomic" "testing" "time" @@ -806,45 +805,7 @@ func runLiveStoreTests(t *testing.T, store ports.LiveStore) { require.NoError(t, store.ScheduledTasks().Remove(ctx, "tx-abc")) - // Concurrent AddIfAbsent on the same id must produce exactly one - // winner. This is the load-bearing property of the whole fix: - // without atomicity, two arkd processes could both claim the same - // task and both broadcast the same sweep tx. - const goroutines = 100 - var wins atomic.Int32 - var wg sync.WaitGroup - start := make(chan struct{}) - errCh := make(chan error, goroutines) - - for range goroutines { - wg.Go(func() { - <-start - claimed, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-race") - if err != nil { - errCh <- err - return - } - if claimed { - wins.Add(1) - } - }) - } - - close(start) - wg.Wait() - close(errCh) - for err := range errCh { - require.NoError(t, err) - } - - require.Equal(t, int32(1), wins.Load(), - "AddIfAbsent must be atomic: exactly one goroutine claims the id") - - require.NoError(t, store.ScheduledTasks().Remove(ctx, "tx-race")) - - // Clear: wipes every claim so the ids can be claimed again. This is - // what sweeper.start() uses on boot to recover from a crash where - // the in-process timer died but the Redis claim survived. + // Clear: wipes every claim so the ids can be claimed again. _, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-1") require.NoError(t, err) _, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-2") From c5febd58bbfd0109c78c2eb5382592f22518d531 Mon Sep 17 00:00:00 2001 From: Dunsin Date: Fri, 22 May 2026 11:40:13 +0100 Subject: [PATCH 5/6] fix(live-store): size scheduled-task TTL by configured locktimes --- internal/config/config.go | 10 +++++++++- internal/core/application/sweeper.go | 9 ++++++++- internal/infrastructure/live-store/live_store_test.go | 2 +- .../infrastructure/live-store/redis/scheduled_tasks.go | 9 ++++----- internal/infrastructure/live-store/redis/store.go | 6 ++++-- 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index b1028536d..5acf6e5fb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -928,6 +928,14 @@ func (c *Config) liveStoreService() error { return fmt.Errorf("tx builder not set") } + // TTL must outlive the longest sweep schedule, with a safety buffer. + longestSweep := c.VtxoTreeExpiry.Seconds() + if d := c.CheckpointExitDelay.Seconds(); d > longestSweep { + longestSweep = d + } + + scheduledTaskTTL := time.Duration(longestSweep)*time.Second + 24*time.Hour + var liveStoreSvc ports.LiveStore var err error switch c.LiveStoreType { @@ -939,7 +947,7 @@ func (c *Config) liveStoreService() error { return fmt.Errorf("invalid REDIS_URL: %w", err) } rdb := redis.NewClient(redisOpts) - liveStoreSvc = redislivestore.NewLiveStore(rdb, c.txBuilder, c.RedisTxNumOfRetries) + liveStoreSvc = redislivestore.NewLiveStore(rdb, c.txBuilder, c.RedisTxNumOfRetries, scheduledTaskTTL) default: err = fmt.Errorf("unknown liveStore type") } diff --git a/internal/core/application/sweeper.go b/internal/core/application/sweeper.go index 751b60fd6..5a4e76fe9 100644 --- a/internal/core/application/sweeper.go +++ b/internal/core/application/sweeper.go @@ -403,7 +403,7 @@ func (s *sweeper) scheduleTask(task sweeperTask) error { return task.execute() } - return s.scheduler.ScheduleTaskOnce(task.at, func() { + err = s.scheduler.ScheduleTaskOnce(task.at, func() { // Skip if the claim was released externally (e.g. the tx was already spent). has, err := s.cache.ScheduledTasks().Has(s.ctx, task.id) if err != nil { @@ -424,6 +424,13 @@ func (s *sweeper) scheduleTask(task sweeperTask) error { log.WithError(err).Errorf("failed to execute sweep of tx %s", task.id) } }) + + if err != nil { + releaseClaim() + return err + } + + return nil } // createBatchSweepTask returns a function passed as handler in the scheduler diff --git a/internal/infrastructure/live-store/live_store_test.go b/internal/infrastructure/live-store/live_store_test.go index 121682bbc..236b86ca4 100644 --- a/internal/infrastructure/live-store/live_store_test.go +++ b/internal/infrastructure/live-store/live_store_test.go @@ -97,7 +97,7 @@ func TestLiveStoreImplementations(t *testing.T) { store ports.LiveStore }{ {"inmemory", inmemory.NewLiveStore(txBuilder)}, - {"redis", redislivestore.NewLiveStore(rdb, txBuilder, 5)}, + {"redis", redislivestore.NewLiveStore(rdb, txBuilder, 5, 30*24*time.Hour)}, } for _, tt := range stores { diff --git a/internal/infrastructure/live-store/redis/scheduled_tasks.go b/internal/infrastructure/live-store/redis/scheduled_tasks.go index c73023677..939d61fab 100644 --- a/internal/infrastructure/live-store/redis/scheduled_tasks.go +++ b/internal/infrastructure/live-store/redis/scheduled_tasks.go @@ -9,18 +9,17 @@ import ( "github.com/redis/go-redis/v9" ) -const scheduledTaskTTL = 30 * 24 * time.Hour - type scheduledTasksStore struct { rdb *redis.Client + ttl time.Duration } -func NewScheduledTasksStore(rdb *redis.Client) ports.ScheduledTasksStore { - return &scheduledTasksStore{rdb: rdb} +func NewScheduledTasksStore(rdb *redis.Client, ttl time.Duration) ports.ScheduledTasksStore { + return &scheduledTasksStore{rdb: rdb, ttl: ttl} } func (s *scheduledTasksStore) AddIfAbsent(ctx context.Context, id string) (bool, error) { - return s.rdb.SetNX(ctx, scheduledTaskKey(id), "1", scheduledTaskTTL).Result() + return s.rdb.SetNX(ctx, scheduledTaskKey(id), "1", s.ttl).Result() } func (s *scheduledTasksStore) Remove(ctx context.Context, id string) error { diff --git a/internal/infrastructure/live-store/redis/store.go b/internal/infrastructure/live-store/redis/store.go index 6491b237f..531b30c4e 100644 --- a/internal/infrastructure/live-store/redis/store.go +++ b/internal/infrastructure/live-store/redis/store.go @@ -1,6 +1,8 @@ package redislivestore import ( + "time" + "github.com/arkade-os/arkd/internal/core/ports" "github.com/redis/go-redis/v9" ) @@ -16,7 +18,7 @@ type redisLiveStore struct { scheduledTasksStore ports.ScheduledTasksStore } -func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int) ports.LiveStore { +func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int, scheduledTaskTTL time.Duration) ports.LiveStore { return &redisLiveStore{ intentStore: NewIntentStore(rdb, numOfRetries), forfeitTxsStore: NewForfeitTxsStore(rdb, builder, numOfRetries), @@ -25,7 +27,7 @@ func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int) confirmationSessionsStore: NewConfirmationSessionsStore(rdb, numOfRetries), treeSigningSessions: NewTreeSigningSessionsStore(rdb, numOfRetries), boardingInputsStore: NewBoardingInputsStore(rdb, numOfRetries), - scheduledTasksStore: NewScheduledTasksStore(rdb), + scheduledTasksStore: NewScheduledTasksStore(rdb, scheduledTaskTTL), } } From b6364be1952aa18240b07ab82e87fddfa09f88c4 Mon Sep 17 00:00:00 2001 From: Dunsin Date: Fri, 22 May 2026 11:54:25 +0100 Subject: [PATCH 6/6] fix: lint issue --- internal/config/config.go | 4 +++- internal/infrastructure/live-store/redis/store.go | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 5acf6e5fb..b5ff8a2fd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -947,7 +947,9 @@ func (c *Config) liveStoreService() error { return fmt.Errorf("invalid REDIS_URL: %w", err) } rdb := redis.NewClient(redisOpts) - liveStoreSvc = redislivestore.NewLiveStore(rdb, c.txBuilder, c.RedisTxNumOfRetries, scheduledTaskTTL) + liveStoreSvc = redislivestore.NewLiveStore( + rdb, c.txBuilder, c.RedisTxNumOfRetries, scheduledTaskTTL, + ) default: err = fmt.Errorf("unknown liveStore type") } diff --git a/internal/infrastructure/live-store/redis/store.go b/internal/infrastructure/live-store/redis/store.go index 531b30c4e..24a1abac4 100644 --- a/internal/infrastructure/live-store/redis/store.go +++ b/internal/infrastructure/live-store/redis/store.go @@ -18,7 +18,10 @@ type redisLiveStore struct { scheduledTasksStore ports.ScheduledTasksStore } -func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int, scheduledTaskTTL time.Duration) ports.LiveStore { +func NewLiveStore( + rdb *redis.Client, builder ports.TxBuilder, numOfRetries int, + scheduledTaskTTL time.Duration, +) ports.LiveStore { return &redisLiveStore{ intentStore: NewIntentStore(rdb, numOfRetries), forfeitTxsStore: NewForfeitTxsStore(rdb, builder, numOfRetries),