Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,14 @@
return fmt.Errorf("tx builder not set")
}

// TTL must outlive the longest sweep schedule, with a safety buffer.
longestSweep := c.VtxoTreeExpiry.Seconds()
if d := c.CheckpointExitDelay.Seconds(); d > longestSweep {
longestSweep = d
}

scheduledTaskTTL := time.Duration(longestSweep)*time.Second + 24*time.Hour

var liveStoreSvc ports.LiveStore
var err error
switch c.LiveStoreType {
Expand All @@ -939,7 +947,7 @@
return fmt.Errorf("invalid REDIS_URL: %w", err)
}
rdb := redis.NewClient(redisOpts)
liveStoreSvc = redislivestore.NewLiveStore(rdb, c.txBuilder, c.RedisTxNumOfRetries)
liveStoreSvc = redislivestore.NewLiveStore(rdb, c.txBuilder, c.RedisTxNumOfRetries, scheduledTaskTTL)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
default:
err = fmt.Errorf("unknown liveStore type")
}
Expand All @@ -948,7 +956,7 @@
return err
}

c.liveStore = liveStoreSvc

Check failure on line 959 in internal/config/config.go

View workflow job for this annotation

GitHub Actions / unit tests

File is not properly formatted (golines)
return nil
}

Expand Down
11 changes: 8 additions & 3 deletions internal/core/application/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func NewService(
cache: cache,
scanner: scanner,
sweeper: newSweeper(
wallet, repoManager, builder, scheduler, noteUriPrefix,
wallet, repoManager, builder, scheduler, noteUriPrefix, cache,
),
boardingExitDelay: boardingExitDelay,
operatorPrvkey: operatorSigningKey,
Expand Down Expand Up @@ -3376,8 +3376,13 @@ func (s *service) listenToScannerNotifications() {
// remove sweeper task for the associated checkpoint outputs
for _, in := range ptx.UnsignedTx.TxIn {
taskId := in.PreviousOutPoint.Hash.String()
s.sweeper.removeTask(taskId)
log.Debugf("sweeper: unscheduled task for tx %s", taskId)
if err := s.sweeper.removeTask(taskId); err != nil {
log.WithError(err).Warnf(
"sweeper: failed to unschedule task for tx %s", taskId,
)
} else {
log.Debugf("sweeper: unscheduled task for tx %s", taskId)
}
}
}()
}
Expand Down
86 changes: 51 additions & 35 deletions internal/core/application/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,25 @@ type sweeper struct {
scheduler ports.SchedulerService

noteUriPrefix string

// cache of scheduled tasks, avoid scheduling the same sweep event multiple times
locker *sync.Mutex
// TODO move the scheduled task map to LiveStore port
scheduledTasks map[string]struct{}
ctx context.Context
cache ports.LiveStore
ctx context.Context
}

func newSweeper(
wallet ports.WalletService, repoManager ports.RepoManager, builder ports.TxBuilder,
scheduler ports.SchedulerService, noteUriPrefix string,
scheduler ports.SchedulerService, noteUriPrefix string, cache ports.LiveStore,
) *sweeper {
return &sweeper{
wallet, repoManager, builder, scheduler,
noteUriPrefix, &sync.Mutex{}, make(map[string]struct{}), nil,
noteUriPrefix, cache, nil,
}
}

func (s *sweeper) start(ctx context.Context) error {
s.scheduledTasks = make(map[string]struct{})
if err := s.cache.ScheduledTasks().Clear(ctx); err != nil {
return fmt.Errorf("sweeper: failed to clear stale scheduled task claims: %w", err)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

s.scheduler.Start()

s.ctx = ctx
Expand Down Expand Up @@ -209,11 +208,11 @@ func (s *sweeper) stop() {
s.scheduler.Stop()
}

// removeTask update the cached map of scheduled tasks
func (s *sweeper) removeTask(id string) {
s.locker.Lock()
defer s.locker.Unlock()
delete(s.scheduledTasks, id)
// removeTask releases the claim on a scheduled task id. Callers use this
// to cancel a pending sweep (e.g. when the underlying tx is observed
// spent before the sweep fires).
func (s *sweeper) removeTask(id string) error {
return s.cache.ScheduledTasks().Remove(s.ctx, id)
}

func (s *sweeper) scheduleCheckpointSweep(
Expand Down Expand Up @@ -321,11 +320,9 @@ func (s *sweeper) scheduleCheckpointSweep(
vtxo,
)

// if the sweep checkpoint tapscript is available, execute the task immediately
if !s.scheduler.AfterNow(sweepAt) {
return execute()
}

// scheduleTask handles the in-past case internally — calling it
// unconditionally keeps both overdue and future checkpoint sweeps
// going through the same claim path.
if err := s.scheduleTask(sweeperTask{
id: checkpointTxid.String(),
at: sweepAt,
Expand Down Expand Up @@ -379,42 +376,61 @@ func (s *sweeper) scheduleTask(task sweeperTask) error {
return nil
}

// Claim the slot first so no other caller can pick up the same id.
claimed, err := s.cache.ScheduledTasks().AddIfAbsent(s.ctx, task.id)
if err != nil {
return fmt.Errorf("failed to claim scheduled task %s: %w", task.id, err)
}
if !claimed {
return nil
}

// Release only after execute() finishes, so the slot stays held while the sweep runs.
releaseClaim := func() {
if err := s.removeTask(task.id); err != nil {
log.WithError(err).Errorf(
"sweeper: failed to release scheduled task %s", task.id,
)
}
}

if !s.scheduler.AfterNow(task.at) {
log.Debugf(
"sweeper: trying to schedule task in the past for tx %s, executing it immediately",
task.id,
)
defer releaseClaim()
return task.execute()
}

s.locker.Lock()
defer s.locker.Unlock()

if _, scheduled := s.scheduledTasks[task.id]; scheduled {
return nil
}

s.scheduledTasks[task.id] = struct{}{}

return s.scheduler.ScheduleTaskOnce(task.at, func() {
// check if the task is still scheduled before executing it
s.locker.Lock()
if _, scheduled := s.scheduledTasks[task.id]; !scheduled {
err = s.scheduler.ScheduleTaskOnce(task.at, func() {
// Skip if the claim was released externally (e.g. the tx was already spent).
has, err := s.cache.ScheduledTasks().Has(s.ctx, task.id)
if err != nil {
log.WithError(err).Errorf(
"sweeper: failed to check scheduled task %s, proceeding anyway", task.id,
)
} else if !has {
log.Debugf(
"sweeper: task for sweeping tx %s has been unscheduled, nothing left to do",
task.id,
)
s.locker.Unlock()
return
}
s.locker.Unlock()

s.removeTask(task.id)
defer releaseClaim()

if err := task.execute(); err != nil {
log.WithError(err).Errorf("failed to execute sweep of tx %s", task.id)
}
})

if err != nil {
releaseClaim()
return err
}

return nil
}

// createBatchSweepTask returns a function passed as handler in the scheduler
Expand Down
8 changes: 8 additions & 0 deletions internal/core/ports/live_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type LiveStore interface {
ConfirmationSessions() ConfirmationSessionsStore
TreeSigingSessions() TreeSigningSessionsStore
BoardingInputs() BoardingInputsStore
ScheduledTasks() ScheduledTasksStore
}

type IntentStore interface {
Expand Down Expand Up @@ -89,6 +90,13 @@ type BoardingInputsStore interface {
DeleteSignatures(ctx context.Context, batchId string) error
}

type ScheduledTasksStore interface {
AddIfAbsent(ctx context.Context, id string) (bool, error)
Remove(ctx context.Context, id string) error
Has(ctx context.Context, id string) (bool, error)
Clear(ctx context.Context) error
}

type TimedIntent struct {
domain.Intent
BoardingInputs []BoardingInput
Expand Down
50 changes: 50 additions & 0 deletions internal/infrastructure/live-store/inmemory/scheduled_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package inmemorylivestore

import (
"context"
"sync"

"github.com/arkade-os/arkd/internal/core/ports"
)

type scheduledTasksStore struct {
lock sync.Mutex
ids map[string]struct{}
}

func NewScheduledTasksStore() ports.ScheduledTasksStore {
return &scheduledTasksStore{
ids: make(map[string]struct{}),
}
}

func (s *scheduledTasksStore) AddIfAbsent(_ context.Context, id string) (bool, error) {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.ids[id]; ok {
return false, nil
}
s.ids[id] = struct{}{}
return true, nil
}

func (s *scheduledTasksStore) Remove(_ context.Context, id string) error {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.ids, id)
return nil
}

func (s *scheduledTasksStore) Has(_ context.Context, id string) (bool, error) {
s.lock.Lock()
defer s.lock.Unlock()
_, ok := s.ids[id]
return ok, nil
}

func (s *scheduledTasksStore) Clear(_ context.Context) error {
s.lock.Lock()
defer s.lock.Unlock()
s.ids = make(map[string]struct{})
return nil
}
5 changes: 5 additions & 0 deletions internal/infrastructure/live-store/inmemory/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type inMemoryLiveStore struct {
confirmationSessionsStore ports.ConfirmationSessionsStore
treeSigningSessions ports.TreeSigningSessionsStore
boardingInputsStore ports.BoardingInputsStore
scheduledTasksStore ports.ScheduledTasksStore
}

func NewLiveStore(txBuilder ports.TxBuilder) ports.LiveStore {
Expand All @@ -23,6 +24,7 @@ func NewLiveStore(txBuilder ports.TxBuilder) ports.LiveStore {
confirmationSessionsStore: NewConfirmationSessionsStore(),
treeSigningSessions: NewTreeSigningSessionsStore(),
boardingInputsStore: NewBoardingInputsStore(),
scheduledTasksStore: NewScheduledTasksStore(),
}
}

Expand All @@ -47,3 +49,6 @@ func (s *inMemoryLiveStore) TreeSigingSessions() ports.TreeSigningSessionsStore
func (s *inMemoryLiveStore) BoardingInputs() ports.BoardingInputsStore {
return s.boardingInputsStore
}
func (s *inMemoryLiveStore) ScheduledTasks() ports.ScheduledTasksStore {
return s.scheduledTasksStore
}
68 changes: 67 additions & 1 deletion internal/infrastructure/live-store/live_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestLiveStoreImplementations(t *testing.T) {
store ports.LiveStore
}{
{"inmemory", inmemory.NewLiveStore(txBuilder)},
{"redis", redislivestore.NewLiveStore(rdb, txBuilder, 5)},
{"redis", redislivestore.NewLiveStore(rdb, txBuilder, 5, 30*24*time.Hour)},
}

for _, tt := range stores {
Expand Down Expand Up @@ -764,6 +764,72 @@ func runLiveStoreTests(t *testing.T, store ports.LiveStore) {
require.NoError(t, err)
require.Empty(t, gotSigs)
})
t.Run("ScheduledTasksStore", func(t *testing.T) {
ctx := t.Context()

// AddIfAbsent: first call claims the id.
claimed, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc")
require.NoError(t, err)
require.True(t, claimed, "first AddIfAbsent should succeed and return true")

has, err := store.ScheduledTasks().Has(ctx, "tx-abc")
require.NoError(t, err)
require.True(t, has)

// AddIfAbsent: second call with the same id loses the race
secondClaim, err := store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc")
require.NoError(t, err)
require.False(t, secondClaim, "second AddIfAbsent for the same id must return false, not an error")

// Has: unknown id is false.
has, err = store.ScheduledTasks().Has(ctx, "never-added")
require.NoError(t, err)
require.False(t, has)

// Remove: releases the claim.
err = store.ScheduledTasks().Remove(ctx, "tx-abc")
require.NoError(t, err)

has, err = store.ScheduledTasks().Has(ctx, "tx-abc")
require.NoError(t, err)
require.False(t, has, "Has must reflect Remove")

// Remove: idempotent — unknown id is a no-op.
err = store.ScheduledTasks().Remove(ctx, "never-added")
require.NoError(t, err)

// After Remove, the same id can be re-claimed.
claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-abc")
require.NoError(t, err)
require.True(t, claimed)

require.NoError(t, store.ScheduledTasks().Remove(ctx, "tx-abc"))

// Clear: wipes every claim so the ids can be claimed again.
_, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-1")
require.NoError(t, err)
_, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-2")
require.NoError(t, err)

require.NoError(t, store.ScheduledTasks().Clear(ctx))

has, err = store.ScheduledTasks().Has(ctx, "tx-clear-1")
require.NoError(t, err)
require.False(t, has, "Clear must remove every claim")
has, err = store.ScheduledTasks().Has(ctx, "tx-clear-2")
require.NoError(t, err)
require.False(t, has, "Clear must remove every claim")

// Re-claim proves the slot was actually freed, not just hidden.
claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-1")
require.NoError(t, err)
require.True(t, claimed)
claimed, err = store.ScheduledTasks().AddIfAbsent(ctx, "tx-clear-2")
require.NoError(t, err)
require.True(t, claimed)

require.NoError(t, store.ScheduledTasks().Clear(ctx))
})
}

type intentPushFixture struct {
Expand Down
7 changes: 4 additions & 3 deletions internal/infrastructure/live-store/redis/round.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
)

const (
currentRoundKey = "currentRoundStore:round"
boardingInputsKey = "boardingInputsStore:numOfInputs"
boardingInputSigsKey = "boardingInputsStore:signatures"
currentRoundKey = "currentRoundStore:round"
boardingInputsKey = "boardingInputsStore:numOfInputs"
boardingInputSigsKey = "boardingInputsStore:signatures"
scheduledTaskKeyPrefix = "scheduledTasksStore:task"
)

type currentRoundStore struct {
Expand Down
Loading
Loading