diff --git a/internal/config/config.go b/internal/config/config.go index b1028536d..b5ff8a2fd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -928,6 +928,14 @@ func (c *Config) liveStoreService() error { return fmt.Errorf("tx builder not set") } + // TTL must outlive the longest sweep schedule, with a safety buffer. + longestSweep := c.VtxoTreeExpiry.Seconds() + if d := c.CheckpointExitDelay.Seconds(); d > longestSweep { + longestSweep = d + } + + scheduledTaskTTL := time.Duration(longestSweep)*time.Second + 24*time.Hour + var liveStoreSvc ports.LiveStore var err error switch c.LiveStoreType { @@ -939,7 +947,9 @@ func (c *Config) liveStoreService() error { return fmt.Errorf("invalid REDIS_URL: %w", err) } rdb := redis.NewClient(redisOpts) - liveStoreSvc = redislivestore.NewLiveStore(rdb, c.txBuilder, c.RedisTxNumOfRetries) + liveStoreSvc = redislivestore.NewLiveStore( + rdb, c.txBuilder, c.RedisTxNumOfRetries, scheduledTaskTTL, + ) default: err = fmt.Errorf("unknown liveStore type") } diff --git a/internal/core/application/service.go b/internal/core/application/service.go index 7959fa386..9b4fd8d45 100644 --- a/internal/core/application/service.go +++ b/internal/core/application/service.go @@ -197,7 +197,7 @@ func NewService( cache: cache, scanner: scanner, sweeper: newSweeper( - wallet, repoManager, builder, scheduler, noteUriPrefix, + wallet, repoManager, builder, scheduler, noteUriPrefix, cache, ), boardingExitDelay: boardingExitDelay, operatorPrvkey: operatorSigningKey, @@ -3376,8 +3376,13 @@ func (s *service) listenToScannerNotifications() { // remove sweeper task for the associated checkpoint outputs for _, in := range ptx.UnsignedTx.TxIn { taskId := in.PreviousOutPoint.Hash.String() - s.sweeper.removeTask(taskId) - log.Debugf("sweeper: unscheduled task for tx %s", taskId) + if err := s.sweeper.removeTask(taskId); err != nil { + log.WithError(err).Warnf( + "sweeper: failed to unschedule task for tx %s", taskId, + ) + } else { + log.Debugf("sweeper: unscheduled task for tx %s", taskId) + } } }() } diff --git a/internal/core/application/sweeper.go b/internal/core/application/sweeper.go index 20e51ffb7..5a4e76fe9 100644 --- a/internal/core/application/sweeper.go +++ b/internal/core/application/sweeper.go @@ -37,26 +37,25 @@ type sweeper struct { scheduler ports.SchedulerService noteUriPrefix string - - // cache of scheduled tasks, avoid scheduling the same sweep event multiple times - locker *sync.Mutex - // TODO move the scheduled task map to LiveStore port - scheduledTasks map[string]struct{} - ctx context.Context + cache ports.LiveStore + ctx context.Context } func newSweeper( wallet ports.WalletService, repoManager ports.RepoManager, builder ports.TxBuilder, - scheduler ports.SchedulerService, noteUriPrefix string, + scheduler ports.SchedulerService, noteUriPrefix string, cache ports.LiveStore, ) *sweeper { return &sweeper{ wallet, repoManager, builder, scheduler, - noteUriPrefix, &sync.Mutex{}, make(map[string]struct{}), nil, + noteUriPrefix, cache, nil, } } func (s *sweeper) start(ctx context.Context) error { - s.scheduledTasks = make(map[string]struct{}) + if err := s.cache.ScheduledTasks().Clear(ctx); err != nil { + return fmt.Errorf("sweeper: failed to clear stale scheduled task claims: %w", err) + } + s.scheduler.Start() s.ctx = ctx @@ -209,11 +208,11 @@ func (s *sweeper) stop() { s.scheduler.Stop() } -// removeTask update the cached map of scheduled tasks -func (s *sweeper) removeTask(id string) { - s.locker.Lock() - defer s.locker.Unlock() - delete(s.scheduledTasks, id) +// removeTask releases the claim on a scheduled task id. Callers use this +// to cancel a pending sweep (e.g. when the underlying tx is observed +// spent before the sweep fires). +func (s *sweeper) removeTask(id string) error { + return s.cache.ScheduledTasks().Remove(s.ctx, id) } func (s *sweeper) scheduleCheckpointSweep( @@ -321,11 +320,9 @@ func (s *sweeper) scheduleCheckpointSweep( vtxo, ) - // if the sweep checkpoint tapscript is available, execute the task immediately - if !s.scheduler.AfterNow(sweepAt) { - return execute() - } - + // scheduleTask handles the in-past case internally — calling it + // unconditionally keeps both overdue and future checkpoint sweeps + // going through the same claim path. if err := s.scheduleTask(sweeperTask{ id: checkpointTxid.String(), at: sweepAt, @@ -379,42 +376,61 @@ func (s *sweeper) scheduleTask(task sweeperTask) error { return nil } + // Claim the slot first so no other caller can pick up the same id. + claimed, err := s.cache.ScheduledTasks().AddIfAbsent(s.ctx, task.id) + if err != nil { + return fmt.Errorf("failed to claim scheduled task %s: %w", task.id, err) + } + if !claimed { + return nil + } + + // Release only after execute() finishes, so the slot stays held while the sweep runs. + releaseClaim := func() { + if err := s.removeTask(task.id); err != nil { + log.WithError(err).Errorf( + "sweeper: failed to release scheduled task %s", task.id, + ) + } + } + if !s.scheduler.AfterNow(task.at) { log.Debugf( "sweeper: trying to schedule task in the past for tx %s, executing it immediately", task.id, ) + defer releaseClaim() return task.execute() } - s.locker.Lock() - defer s.locker.Unlock() - - if _, scheduled := s.scheduledTasks[task.id]; scheduled { - return nil - } - - s.scheduledTasks[task.id] = struct{}{} - - return s.scheduler.ScheduleTaskOnce(task.at, func() { - // check if the task is still scheduled before executing it - s.locker.Lock() - if _, scheduled := s.scheduledTasks[task.id]; !scheduled { + err = s.scheduler.ScheduleTaskOnce(task.at, func() { + // Skip if the claim was released externally (e.g. the tx was already spent). + has, err := s.cache.ScheduledTasks().Has(s.ctx, task.id) + if err != nil { + log.WithError(err).Errorf( + "sweeper: failed to check scheduled task %s, proceeding anyway", task.id, + ) + } else if !has { log.Debugf( "sweeper: task for sweeping tx %s has been unscheduled, nothing left to do", task.id, ) - s.locker.Unlock() return } - s.locker.Unlock() - s.removeTask(task.id) + defer releaseClaim() if err := task.execute(); err != nil { log.WithError(err).Errorf("failed to execute sweep of tx %s", task.id) } }) + + if err != nil { + releaseClaim() + return err + } + + return nil } // createBatchSweepTask returns a function passed as handler in the scheduler diff --git a/internal/core/ports/live_store.go b/internal/core/ports/live_store.go index 86f49fb03..2cee1556e 100644 --- a/internal/core/ports/live_store.go +++ b/internal/core/ports/live_store.go @@ -17,6 +17,7 @@ type LiveStore interface { ConfirmationSessions() ConfirmationSessionsStore TreeSigingSessions() TreeSigningSessionsStore BoardingInputs() BoardingInputsStore + ScheduledTasks() ScheduledTasksStore } type IntentStore interface { @@ -89,6 +90,13 @@ type BoardingInputsStore interface { DeleteSignatures(ctx context.Context, batchId string) error } +type ScheduledTasksStore interface { + AddIfAbsent(ctx context.Context, id string) (bool, error) + Remove(ctx context.Context, id string) error + Has(ctx context.Context, id string) (bool, error) + Clear(ctx context.Context) error +} + type TimedIntent struct { domain.Intent BoardingInputs []BoardingInput diff --git a/internal/infrastructure/live-store/inmemory/scheduled_tasks.go b/internal/infrastructure/live-store/inmemory/scheduled_tasks.go new file mode 100644 index 000000000..314a4609e --- /dev/null +++ b/internal/infrastructure/live-store/inmemory/scheduled_tasks.go @@ -0,0 +1,50 @@ +package inmemorylivestore + +import ( + "context" + "sync" + + "github.com/arkade-os/arkd/internal/core/ports" +) + +type scheduledTasksStore struct { + lock sync.Mutex + ids map[string]struct{} +} + +func NewScheduledTasksStore() ports.ScheduledTasksStore { + return &scheduledTasksStore{ + ids: make(map[string]struct{}), + } +} + +func (s *scheduledTasksStore) AddIfAbsent(_ context.Context, id string) (bool, error) { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.ids[id]; ok { + return false, nil + } + s.ids[id] = struct{}{} + return true, nil +} + +func (s *scheduledTasksStore) Remove(_ context.Context, id string) error { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.ids, id) + return nil +} + +func (s *scheduledTasksStore) Has(_ context.Context, id string) (bool, error) { + s.lock.Lock() + defer s.lock.Unlock() + _, ok := s.ids[id] + return ok, nil +} + +func (s *scheduledTasksStore) Clear(_ context.Context) error { + s.lock.Lock() + defer s.lock.Unlock() + s.ids = make(map[string]struct{}) + return nil +} diff --git a/internal/infrastructure/live-store/inmemory/store.go b/internal/infrastructure/live-store/inmemory/store.go index 40e7bb85c..aa941f06f 100644 --- a/internal/infrastructure/live-store/inmemory/store.go +++ b/internal/infrastructure/live-store/inmemory/store.go @@ -12,6 +12,7 @@ type inMemoryLiveStore struct { confirmationSessionsStore ports.ConfirmationSessionsStore treeSigningSessions ports.TreeSigningSessionsStore boardingInputsStore ports.BoardingInputsStore + scheduledTasksStore ports.ScheduledTasksStore } func NewLiveStore(txBuilder ports.TxBuilder) ports.LiveStore { @@ -23,6 +24,7 @@ func NewLiveStore(txBuilder ports.TxBuilder) ports.LiveStore { confirmationSessionsStore: NewConfirmationSessionsStore(), treeSigningSessions: NewTreeSigningSessionsStore(), boardingInputsStore: NewBoardingInputsStore(), + scheduledTasksStore: NewScheduledTasksStore(), } } @@ -47,3 +49,6 @@ func (s *inMemoryLiveStore) TreeSigingSessions() ports.TreeSigningSessionsStore func (s *inMemoryLiveStore) BoardingInputs() ports.BoardingInputsStore { return s.boardingInputsStore } +func (s *inMemoryLiveStore) ScheduledTasks() ports.ScheduledTasksStore { + return s.scheduledTasksStore +} diff --git a/internal/infrastructure/live-store/live_store_test.go b/internal/infrastructure/live-store/live_store_test.go index c9e35b8e3..236b86ca4 100644 --- a/internal/infrastructure/live-store/live_store_test.go +++ b/internal/infrastructure/live-store/live_store_test.go @@ -97,7 +97,7 @@ func TestLiveStoreImplementations(t *testing.T) { store ports.LiveStore }{ {"inmemory", inmemory.NewLiveStore(txBuilder)}, - {"redis", redislivestore.NewLiveStore(rdb, txBuilder, 5)}, + {"redis", redislivestore.NewLiveStore(rdb, txBuilder, 5, 30*24*time.Hour)}, } for _, tt := range stores { @@ -764,6 +764,72 @@ func runLiveStoreTests(t *testing.T, store ports.LiveStore) { require.NoError(t, err) require.Empty(t, gotSigs) }) + t.Run("ScheduledTasksStore", func(t *testing.T) { + ctx := t.Context() + + // AddIfAbsent: first call claims the id. + claimed, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc") + require.NoError(t, err) + require.True(t, claimed, "first AddIfAbsent should succeed and return true") + + has, err := store.ScheduledTasks().Has(ctx, "tx-abc") + require.NoError(t, err) + require.True(t, has) + + // AddIfAbsent: second call with the same id loses the race + secondClaim, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc") + require.NoError(t, err) + require.False(t, secondClaim, "second AddIfAbsent for the same id must return false, not an error") + + // Has: unknown id is false. + has, err = store.ScheduledTasks().Has(ctx, "never-added") + require.NoError(t, err) + require.False(t, has) + + // Remove: releases the claim. + err = store.ScheduledTasks().Remove(ctx, "tx-abc") + require.NoError(t, err) + + has, err = store.ScheduledTasks().Has(ctx, "tx-abc") + require.NoError(t, err) + require.False(t, has, "Has must reflect Remove") + + // Remove: idempotent — unknown id is a no-op. + err = store.ScheduledTasks().Remove(ctx, "never-added") + require.NoError(t, err) + + // After Remove, the same id can be re-claimed. + claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc") + require.NoError(t, err) + require.True(t, claimed) + + require.NoError(t, store.ScheduledTasks().Remove(ctx, "tx-abc")) + + // Clear: wipes every claim so the ids can be claimed again. + _, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-1") + require.NoError(t, err) + _, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-2") + require.NoError(t, err) + + require.NoError(t, store.ScheduledTasks().Clear(ctx)) + + has, err = store.ScheduledTasks().Has(ctx, "tx-clear-1") + require.NoError(t, err) + require.False(t, has, "Clear must remove every claim") + has, err = store.ScheduledTasks().Has(ctx, "tx-clear-2") + require.NoError(t, err) + require.False(t, has, "Clear must remove every claim") + + // Re-claim proves the slot was actually freed, not just hidden. + claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-1") + require.NoError(t, err) + require.True(t, claimed) + claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-2") + require.NoError(t, err) + require.True(t, claimed) + + require.NoError(t, store.ScheduledTasks().Clear(ctx)) + }) } type intentPushFixture struct { diff --git a/internal/infrastructure/live-store/redis/round.go b/internal/infrastructure/live-store/redis/round.go index e8c69ae78..304bfc29f 100644 --- a/internal/infrastructure/live-store/redis/round.go +++ b/internal/infrastructure/live-store/redis/round.go @@ -14,9 +14,10 @@ import ( ) const ( - currentRoundKey = "currentRoundStore:round" - boardingInputsKey = "boardingInputsStore:numOfInputs" - boardingInputSigsKey = "boardingInputsStore:signatures" + currentRoundKey = "currentRoundStore:round" + boardingInputsKey = "boardingInputsStore:numOfInputs" + boardingInputSigsKey = "boardingInputsStore:signatures" + scheduledTaskKeyPrefix = "scheduledTasksStore:task" ) type currentRoundStore struct { diff --git a/internal/infrastructure/live-store/redis/scheduled_tasks.go b/internal/infrastructure/live-store/redis/scheduled_tasks.go new file mode 100644 index 000000000..939d61fab --- /dev/null +++ b/internal/infrastructure/live-store/redis/scheduled_tasks.go @@ -0,0 +1,62 @@ +package redislivestore + +import ( + "context" + "fmt" + "time" + + "github.com/arkade-os/arkd/internal/core/ports" + "github.com/redis/go-redis/v9" +) + +type scheduledTasksStore struct { + rdb *redis.Client + ttl time.Duration +} + +func NewScheduledTasksStore(rdb *redis.Client, ttl time.Duration) ports.ScheduledTasksStore { + return &scheduledTasksStore{rdb: rdb, ttl: ttl} +} + +func (s *scheduledTasksStore) AddIfAbsent(ctx context.Context, id string) (bool, error) { + return s.rdb.SetNX(ctx, scheduledTaskKey(id), "1", s.ttl).Result() +} + +func (s *scheduledTasksStore) Remove(ctx context.Context, id string) error { + // Safe to call when the key isn't there — Del returns 0, not an error. + return s.rdb.Del(ctx, scheduledTaskKey(id)).Err() +} + +func (s *scheduledTasksStore) Has(ctx context.Context, id string) (bool, error) { + n, err := s.rdb.Exists(ctx, scheduledTaskKey(id)).Result() + if err != nil { + return false, err + } + return n > 0, nil +} + +func (s *scheduledTasksStore) Clear(ctx context.Context) error { + pattern := scheduledTaskKeyPrefix + ":*" + iter := s.rdb.Scan(ctx, 0, pattern, 100).Iterator() + var batch []string + for iter.Next(ctx) { + batch = append(batch, iter.Val()) + if len(batch) >= 100 { + if err := s.rdb.Del(ctx, batch...).Err(); err != nil { + return err + } + batch = batch[:0] + } + } + if err := iter.Err(); err != nil { + return err + } + if len(batch) > 0 { + return s.rdb.Del(ctx, batch...).Err() + } + return nil +} + +func scheduledTaskKey(id string) string { + return fmt.Sprintf("%s:%s", scheduledTaskKeyPrefix, id) +} diff --git a/internal/infrastructure/live-store/redis/store.go b/internal/infrastructure/live-store/redis/store.go index 306c32cc5..24a1abac4 100644 --- a/internal/infrastructure/live-store/redis/store.go +++ b/internal/infrastructure/live-store/redis/store.go @@ -1,6 +1,8 @@ package redislivestore import ( + "time" + "github.com/arkade-os/arkd/internal/core/ports" "github.com/redis/go-redis/v9" ) @@ -13,9 +15,13 @@ type redisLiveStore struct { confirmationSessionsStore ports.ConfirmationSessionsStore treeSigningSessions ports.TreeSigningSessionsStore boardingInputsStore ports.BoardingInputsStore + scheduledTasksStore ports.ScheduledTasksStore } -func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int) ports.LiveStore { +func NewLiveStore( + rdb *redis.Client, builder ports.TxBuilder, numOfRetries int, + scheduledTaskTTL time.Duration, +) ports.LiveStore { return &redisLiveStore{ intentStore: NewIntentStore(rdb, numOfRetries), forfeitTxsStore: NewForfeitTxsStore(rdb, builder, numOfRetries), @@ -24,6 +30,7 @@ func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int) confirmationSessionsStore: NewConfirmationSessionsStore(rdb, numOfRetries), treeSigningSessions: NewTreeSigningSessionsStore(rdb, numOfRetries), boardingInputsStore: NewBoardingInputsStore(rdb, numOfRetries), + scheduledTasksStore: NewScheduledTasksStore(rdb, scheduledTaskTTL), } } @@ -40,3 +47,6 @@ func (s *redisLiveStore) TreeSigingSessions() ports.TreeSigningSessionsStore { func (s *redisLiveStore) BoardingInputs() ports.BoardingInputsStore { return s.boardingInputsStore } +func (s *redisLiveStore) ScheduledTasks() ports.ScheduledTasksStore { + return s.scheduledTasksStore +}