Skip to content

fix(sweeper): move the scheduled task map to LiveStore port#1079

Open
Dunsin-cyber wants to merge 6 commits into
arkade-os:masterfrom
Dunsin-cyber:fix/sweeper-scheduled-tasks-shared-store
Open

fix(sweeper): move the scheduled task map to LiveStore port#1079
Dunsin-cyber wants to merge 6 commits into
arkade-os:masterfrom
Dunsin-cyber:fix/sweeper-scheduled-tasks-shared-store

Conversation

@Dunsin-cyber
Copy link
Copy Markdown
Contributor

@Dunsin-cyber Dunsin-cyber commented May 21, 2026

moves the scheduled-task id set out of a per-process map on the sweeper struct and into a LiveStore-backed ScheduledTasksStore, matching the rest of the live-store conventions.

Closes the TODO at internal/core/application/sweeper.go:43

please review @altafan @sekulicd

Summary by CodeRabbit

  • Refactor
    • Move scheduled-task tracking to a claim-based persistent store and unify scheduling so all tasks follow the same claim lifecycle.
  • Bug Fixes
    • Improve handling and logging when unscheduling or releasing tasks; failures are now reported.
  • Tests
    • Add tests for claim semantics, idempotency, presence checks, and clearing behavior.
  • Chores
    • Configure scheduled-task TTL and wire it into the live store setup.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 21, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8631a080-a9cc-4370-b87f-f7a3d1513f62

📥 Commits

Reviewing files that changed from the base of the PR and between c5febd5 and b6364be.

📒 Files selected for processing (2)
  • internal/config/config.go
  • internal/infrastructure/live-store/redis/store.go

Walkthrough

Sweeper scheduling now uses a LiveStore-backed ScheduledTasksStore for distributed claim-based ownership; adds ScheduledTasksStore port, in-memory and Redis implementations (with Clear), wires stores into live stores and config TTL, and updates sweeper and service to claim/verify/release scheduled-task claims.

Changes

Distributed Scheduled Tasks Store

Layer / File(s) Summary
ScheduledTasksStore port and interface
internal/core/ports/live_store.go
New ScheduledTasksStore interface (AddIfAbsent, Remove, Has, Clear) and LiveStore.ScheduledTasks() accessor.
In-memory scheduled tasks store & wiring
internal/infrastructure/live-store/inmemory/scheduled_tasks.go, internal/infrastructure/live-store/inmemory/store.go, internal/infrastructure/live-store/live_store_test.go
In-memory ScheduledTasksStore implementation (mutex-protected map) with AddIfAbsent, Remove, Has, and new Clear. Wired into inMemoryLiveStore. Tests validate claim lifecycle and re-claim after Remove/Clear.
Redis scheduled tasks store & wiring
internal/infrastructure/live-store/redis/round.go, internal/infrastructure/live-store/redis/scheduled_tasks.go, internal/infrastructure/live-store/redis/store.go, internal/infrastructure/live-store/live_store_test.go
Redis implementation using SETNX with TTL, EXISTS, DEL, Clear scanning by scheduledTaskKeyPrefix, TTL-configured constructor, and wiring into redisLiveStore. Tests updated to pass TTL and validate claim lifecycle.
Config TTL wiring
internal/config/config.go
Compute scheduledTaskTTL from sweep parameters (max sweep duration + 24h buffer) and pass it into Redis NewLiveStore call.
Sweeper distributed task scheduling
internal/core/application/sweeper.go
Sweeper now stores cache ports.LiveStore, clears stale claims on start, uses AddIfAbsent to claim tasks before scheduling, verifies claim with Has in the scheduler callback, defers Remove before executing task.execute(), and removeTask returns an error on release failure.
Service wiring and unschedule logging
internal/core/application/service.go
Service constructs sweeper with cache argument and scanner notification unscheduling now logs removeTask errors as warnings or debug on success.

Sequence Diagram

sequenceDiagram
  participant Sweeper
  participant ScheduledTasksStore
  participant Executor

  Sweeper->>ScheduledTasksStore: AddIfAbsent(taskId)
  alt Claim succeeded
    ScheduledTasksStore-->>Sweeper: true, nil
    Sweeper->>Sweeper: schedule timer/callback
    Note over Sweeper: On timer trigger
    Sweeper->>ScheduledTasksStore: Has(taskId)
    alt Still claimed
      ScheduledTasksStore-->>Sweeper: true, nil
      Sweeper->>ScheduledTasksStore: Remove(taskId)
      ScheduledTasksStore-->>Sweeper: nil
      Sweeper->>Executor: task.execute()
    else Externally unscheduled
      ScheduledTasksStore-->>Sweeper: false, nil
      Sweeper->>Sweeper: skip execution
    end
  else Claim already held
    ScheduledTasksStore-->>Sweeper: false, nil
    Sweeper->>Sweeper: skip scheduling
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • arkade-os/arkd#838: Modifies sweeper/checkpoint scheduling behavior and is connected to scheduling logic changes in this PR.
  • arkade-os/arkd#763: Overlaps at sweeper start/shutdown integration points and related lifecycle changes.

Suggested reviewers

  • altafan
  • sekulicd
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main change: moving scheduled task management from a per-process map to the LiveStore port, which is the central architectural change across all modified files.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@arkanaai arkanaai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — #1079

Solid direction: moving the scheduled-task map into LiveStore is the right call for multi-instance arkd. The interface is clean, tests are thorough (including the concurrency race test — nice), and the Redis SETNX approach is correct for atomic claiming. However, there is one critical bug and a few issues that need fixing before merge.


🔴 CRITICAL — Stale claims cause permanently missed sweeps after a crash

Files: sweeper.go, redis/scheduled_tasks.go

The old code reset the task map on every restart (sweeper.go:59, master):

func (s *sweeper) start(ctx context.Context) error {
    s.scheduledTasks = make(map[string]struct{})  // ← wiped on restart

The PR removes this line. Now claims live in Redis with TTL 0 (forever). If an arkd instance:

  1. Claims a task via AddIfAbsent → key is set in Redis
  2. Crashes before the scheduler fires the callback
  3. Restarts and calls start() → re-discovers the sweepable batch → calls scheduleTaskAddIfAbsent returns falsetask is silently skipped

No instance will ever schedule that sweep again. The operator's funds remain locked in the VTXO tree past expiry. This is a funds-at-risk bug.

Fix options (pick one):

  • (A) Add a Clear(ctx) method to ScheduledTasksStore and call it at the top of sweeper.start(), same as the old make(map[string]struct{}). This preserves the original restart-recovery semantics.
  • (B) Set a TTL on the Redis key equal to sweepAt - now + margin. This naturally expires stale claims, but adds complexity.
  • (C) Use a Redis key prefix that includes a boot/instance ID, so each restart gets a fresh namespace. (Defeats the purpose of cross-instance dedup, though.)

I'd recommend (A) — it's the simplest and matches the existing semantics exactly. In a multi-instance setup, only one instance should be running sweeper.start() anyway (the leader), so clearing the shared set on startup is safe.


🟡 HIGH — Redis key leak on long-running instances

File: redis/scheduled_tasks.go:24

return s.rdb.SetNX(ctx, scheduledTaskKey(id), "1", 0).Result()
//                                                   ^ TTL = 0 = forever

Even if the crash scenario above is fixed, any key whose Remove call fails (network blip, Redis timeout) will persist forever. Over months of operation this is unbounded growth. Consider setting a generous TTL (e.g., 30 days — well beyond any realistic sweep window) as a safety net.


🟡 MEDIUM — removeTask error swallowed in goroutine

File: service.go:3379-3384

The error is now logged but execution continues in the goroutine regardless. If Remove fails, the task remains claimed in the shared store, which means a future restart could skip it (same crash scenario, different trigger). The logging is good, but consider whether the goroutine should retry or whether this is acceptable.


🟢 Minor nits

  1. redis/round.go:17-20: The scheduledTaskKeyPrefix constant is declared in round.go but only used by scheduled_tasks.go. Move it to scheduled_tasks.go for locality.

  2. Blank line: redis/scheduled_tasks.go:11 — extra blank line between import block and type declaration. Cosmetic only.

  3. PR title: Has leading whitespace: " fix(sweeper):..." — minor but worth cleaning up.


Summary

The core approach is sound and the test coverage (including the 100-goroutine race test) is well done. The critical issue is the missing cleanup on restart — without it, a crash permanently orphans sweep tasks. Add a Clear method, call it on startup, and this is good to go.

⚠️ Protocol-critical: This PR touches sweep scheduling, which controls when the operator reclaims expired VTXO tree outputs. Requesting human review from @altafan or @sekulicd per protocol-critical policy.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/core/application/sweeper.go (1)

385-420: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Task claims can be orphaned permanently.

Once AddIfAbsent succeeds, cleanup depends on the callback eventually calling Remove. If ScheduleTaskOnce returns an error, or the process exits after claiming and before the callback runs, the claim is never cleared. With the Redis-backed store in this stack using persistent SETNX/DEL/EXISTS semantics, that leaves the task ID blocked indefinitely across the cluster. This needs an expiring/owned claim, and at minimum a rollback when scheduler registration fails.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/core/application/sweeper.go` around lines 385 - 420, After
successfully claiming a scheduled task with
s.cache.ScheduledTasks().AddIfAbsent, the claim can be orphaned if
s.scheduler.ScheduleTaskOnce fails or the process dies before the callback runs;
ensure you rollback the claim on scheduler registration failure and move to an
expiring/owned claim model. Specifically: after AddIfAbsent succeeds, call
s.scheduler.ScheduleTaskOnce and if it returns an error immediately call
s.removeTask(task.id) to release the claim; additionally replace or augment
AddIfAbsent with a TTL/owner-aware variant (e.g., AddIfAbsentWithTTL or using a
SetNX+EXPIRE pattern and include a unique owner ID and optional heartbeat) so
that claims automatically expire and the callback still calls
s.removeTask(task.id) when it completes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/core/application/sweeper.go`:
- Around line 385-390: The restore path currently executes overdue
batches/checkpoints before attempting the distributed claim, causing duplicate
sweeps; change the flow in the sweeper where you call
s.cache.ScheduledTasks().AddIfAbsent (the distributed claim) so that you claim
the task first for every scheduled item, then, only if claimed, apply the
existing AfterNow fast-path logic to decide whether to run immediately or hand
off to the local scheduler; specifically move the AddIfAbsent call to precede
the AfterNow checks in the restore/restore-related handling, and ensure you skip
executing the sweep when AddIfAbsent returns false.

In `@internal/infrastructure/live-store/live_store_test.go`:
- Around line 820-825: The test uses require.NoError(t, err) inside the worker
goroutine that calls store.ScheduledTasks().AddIfAbsent, which can call FailNow
from a non-test goroutine; instead capture the error and claimed result into
outer-scoped variables (e.g., errCh or slices/vars like workerErr,
workerClaimed) inside the goroutine and perform the require/assert checks after
wg.Wait() in the main test goroutine; update the goroutine that calls
AddIfAbsent to set those variables (or send to a channel) and replace the
in-goroutine require.NoError(t, err) with post-wait assertions that call
require.NoError(t, workerErr) and require.True/False(t, workerClaimed) as
needed.

---

Outside diff comments:
In `@internal/core/application/sweeper.go`:
- Around line 385-420: After successfully claiming a scheduled task with
s.cache.ScheduledTasks().AddIfAbsent, the claim can be orphaned if
s.scheduler.ScheduleTaskOnce fails or the process dies before the callback runs;
ensure you rollback the claim on scheduler registration failure and move to an
expiring/owned claim model. Specifically: after AddIfAbsent succeeds, call
s.scheduler.ScheduleTaskOnce and if it returns an error immediately call
s.removeTask(task.id) to release the claim; additionally replace or augment
AddIfAbsent with a TTL/owner-aware variant (e.g., AddIfAbsentWithTTL or using a
SetNX+EXPIRE pattern and include a unique owner ID and optional heartbeat) so
that claims automatically expire and the callback still calls
s.removeTask(task.id) when it completes.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6d300f08-0798-4f5b-9160-e0f473299a84

📥 Commits

Reviewing files that changed from the base of the PR and between 2c9612a and 68d0946.

📒 Files selected for processing (9)
  • internal/core/application/service.go
  • internal/core/application/sweeper.go
  • internal/core/ports/live_store.go
  • internal/infrastructure/live-store/inmemory/scheduled_tasks.go
  • internal/infrastructure/live-store/inmemory/store.go
  • internal/infrastructure/live-store/live_store_test.go
  • internal/infrastructure/live-store/redis/round.go
  • internal/infrastructure/live-store/redis/scheduled_tasks.go
  • internal/infrastructure/live-store/redis/store.go

Comment thread internal/core/application/sweeper.go Outdated
Comment thread internal/infrastructure/live-store/live_store_test.go Outdated
@Dunsin-cyber Dunsin-cyber changed the title fix(sweeper): move the scheduled task map to LiveStore port fix(sweeper): move the scheduled task map to LiveStore port May 21, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/core/application/sweeper.go`:
- Around line 391-401: The current code calls s.removeTask(task.id) before
task.execute(), releasing the shared claim too early and allowing another
instance to claim and execute the same task concurrently; change the flow to
keep the claim until after execution by deferring the removal: after
successfully acquiring/keeping the claim for a task (the branch that currently
calls s.removeTask(...) before task.execute()), add a deferred cleanup (e.g.,
defer func() { if err := s.removeTask(task.id); if err != nil {
log.WithError(err).Errorf("sweeper: failed to release scheduled task %s",
task.id) } }()) so the claim is only released after task.execute() completes
(and still released on panics/errors), and apply the same deferred-release fix
to the other path referenced (the block around task.execute() at lines 421-427).
- Around line 55-57: The current call s.cache.ScheduledTasks().Clear(ctx) wipes
claims for all instances and must be replaced with owner-scoped cleanup: instead
of Clear, call a method that deletes only claims owned by this node (e.g.
ScheduledTasks().DeleteByOwner(ctx, ownerID) or
ScheduledTasks().ClearOwnedClaims(ctx, leaseID)), or remove the global clear and
rely on TTL/lease expiry; ensure the sweeper has access to this instance's
identifier (ownerID/leaseID) via the Sweeper struct (s) and use that to scope
the deletion so you never remove other instances' live claims.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d9bb2604-0ebe-4187-8962-bb5ed667c1c8

📥 Commits

Reviewing files that changed from the base of the PR and between 68d0946 and 1492090.

📒 Files selected for processing (5)
  • internal/core/application/sweeper.go
  • internal/core/ports/live_store.go
  • internal/infrastructure/live-store/inmemory/scheduled_tasks.go
  • internal/infrastructure/live-store/live_store_test.go
  • internal/infrastructure/live-store/redis/scheduled_tasks.go

Comment thread internal/core/application/sweeper.go
Comment thread internal/core/application/sweeper.go Outdated
@Dunsin-cyber Dunsin-cyber marked this pull request as draft May 21, 2026 14:15
Copy link
Copy Markdown
Contributor

@arkanaai arkanaai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Review — #1079 (new commits)

All three issues from my previous review are resolved:

  • CRITICAL (stale claims after crash): Clear(ctx) added to ScheduledTasksStore interface and called at top of sweeper.start() — matches the old make(map[string]struct{}) semantics exactly. Good.
  • HIGH (Redis key leak): scheduledTaskTTL = 30 * 24 * time.Hour applied to SetNX. Safety net in place.
  • MEDIUM (removeTask error swallowed): removeTask now returns error, callers in service.go:3379 log warnings appropriately.

The checkpoint sweep path now goes through scheduleTask unconditionally (removed the early execute() shortcut), which is cleaner — all paths go through the same claim→execute→release flow.


One remaining minor issue

sweeper.go:406 — orphaned claim if ScheduleTaskOnce fails

return s.scheduler.ScheduleTaskOnce(task.at, func() { ... })

The gocron implementation (gocron/service.go:43) can return an error from s.scheduler.Every(...).Do(task). If it does, AddIfAbsent already claimed the slot but releaseClaim is never called — the claim stays until Clear() on next restart (or 30-day TTL). Not critical thanks to those safety nets, but a one-liner fix:

err := s.scheduler.ScheduleTaskOnce(task.at, func() { ... })
if err != nil {
    releaseClaim()
}
return err

Previous minor nit (still open)

scheduledTaskKeyPrefix is declared in redis/round.go but only used by redis/scheduled_tasks.go. Worth moving for locality.


Overall: the critical and high-severity issues are all fixed. The code is clean, the test coverage is solid, and the claim semantics are correct. Approving.

⚠️ Protocol-critical reminder: This PR touches sweep scheduling (operator fund recovery from expired VTXO trees). Human approval still required from @altafan or @sekulicd before merge.

@Dunsin-cyber Dunsin-cyber marked this pull request as ready for review May 21, 2026 21:50
Copy link
Copy Markdown
Contributor

@arkanaai arkanaai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — #1079

Reviewed full diff + full file context for all 9 changed files. Cross-referenced LiveStore consumers (ports, inmemory, redis, config, service, admin). Checked git history on sweeper.go (20 commits).

Overall: Good refactor direction — the TODO was well-placed and this is the right move for multi-instance deployments. The dedup-via-claim pattern is cleaner than the old mutex+map. But there are two bugs I'd like addressed before merge.


🔴 BUG: Claim leak when ScheduleTaskOnce fails

internal/core/application/sweeper.goscheduleTask(), around the final return

If s.scheduler.ScheduleTaskOnce(task.at, ...) returns an error, the claim acquired via AddIfAbsent is never released. The releaseClaim closure is only invoked inside the scheduler callback or in the immediate-execution path. A scheduler error leaves the task ID permanently claimed (until TTL expiry in Redis or until restart calls Clear()).

This means the sweep for that batch/checkpoint can never be re-attempted by any instance until the claim is manually cleared.

Fix: Release the claim if ScheduleTaskOnce returns an error:

err := s.scheduler.ScheduleTaskOnce(task.at, func() { ... })
if err != nil {
    releaseClaim()
    return err
}
return nil

🟡 CONCERN: Redis TTL of 30 days may be too short

internal/infrastructure/live-store/redis/scheduled_tasks.go:13

const scheduledTaskTTL = 30 * 24 * time.Hour

The SetNX TTL of 30 days starts when the claim is created. Sweep tasks are scheduled at blockTimestamp + CSV_locktime. Ark's typical VTXO lifetime is ~4 weeks (28 days), so this barely covers it. Any deployment with a longer locktime (or if confirmation delay pushes the schedule further out) will hit this:

  1. Claim created → 30-day TTL starts
  2. Scheduled callback fires after >30 days → claim expired → Has() returns false → sweep silently skipped

The sweep would eventually recover on restart (since start() re-discovers sweepable batches), but between TTL expiry and restart, sweeps are stalled.

Suggestion: Either:

  • Set TTL to something generous like 90 days, or
  • Tie the TTL to the actual task.at timestamp (e.g., time.Until(sweepAt) + 24h buffer), or
  • Document the assumption that locktimes must be < 30 days

🟢 Minor / Nits

  1. No concurrent AddIfAbsent test. The test validates basic claim semantics well, but the PR description and CodeRabbit summary mention "concurrency" testing. Consider adding a test that fires N goroutines calling AddIfAbsent with the same ID and asserts exactly one returns true. This is especially important for the Redis SetNX path where network timing matters.

  2. sync import still used. Confirmed sync.WaitGroup in start() keeps the import alive — no issue.

  3. removeTask signature change is good. Propagating the error to callers (service.go:3378) with proper warn logging is the right call.

  4. Removing the early-return fast-path in scheduleCheckpointSweep (lines 323-327 deleted) is correct. Routing everything through scheduleTask ensures dedup applies uniformly to both overdue and future tasks. Clean simplification.


Protocol-Critical Assessment

This PR touches the sweeper — responsible for reclaiming expired batch/checkpoint outputs. The sweeper is protocol-critical infrastructure: if it fails to sweep, funds remain locked in expired outputs. The claim-leak bug (🔴) could cause exactly that scenario. The TTL concern (🟡) is a ticking time bomb for non-default configurations.

Requesting human review per protocol-critical rules.

cc @altafan @sekulicd — the claim leak on ScheduleTaskOnce error is the blocking issue. The TTL concern should also be discussed before merge.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
internal/infrastructure/live-store/redis/store.go (1)

21-31: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix golines formatting in constructor block.

Current wrapping in this hunk is failing lint and blocking CI.

Formatting fix
-func NewLiveStore(rdb *redis.Client, builder ports.TxBuilder, numOfRetries int, scheduledTaskTTL time.Duration) ports.LiveStore {
+func NewLiveStore(
+	rdb *redis.Client,
+	builder ports.TxBuilder,
+	numOfRetries int,
+	scheduledTaskTTL time.Duration,
+) ports.LiveStore {
 	return &redisLiveStore{
@@
-		scheduledTasksStore:       NewScheduledTasksStore(rdb, scheduledTaskTTL),
+		scheduledTasksStore: NewScheduledTasksStore(rdb, scheduledTaskTTL),
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/infrastructure/live-store/redis/store.go` around lines 21 - 31, The
NewLiveStore constructor formatting violates golines/line-length rules; reflow
the argument list in the NewLiveStore function so each field initialization in
the returned &redisLiveStore{...} is on its own line and lines do not exceed the
project's max width (e.g., one field per line: intentStore: NewIntentStore(...),
forfeitTxsStore: NewForfeitTxsStore(...), offChainTxStore: ...,
currentRoundStore: ..., confirmationSessionsStore: ..., treeSigningSessions:
..., boardingInputsStore: ..., scheduledTasksStore: ...), preserving the same
function names and parameters (NewIntentStore, NewForfeitTxsStore,
NewOffChainTxStore, NewCurrentRoundStore, NewConfirmationSessionsStore,
NewTreeSigningSessionsStore, NewBoardingInputsStore, NewScheduledTasksStore) so
golines/CI no longer fails.
internal/infrastructure/live-store/redis/scheduled_tasks.go (2)

38-57: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Clear is unsafe for shared-runtime claims.

This deletes all scheduled-task keys by prefix, including live claims from other nodes. In multi-instance mode, that can re-enable duplicate task execution.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/infrastructure/live-store/redis/scheduled_tasks.go` around lines 38
- 57, The Clear method on scheduledTasksStore deletes all keys matching
scheduledTaskKeyPrefix which removes scheduled tasks for other nodes; change it
to only remove keys owned by this instance by filtering on instance-specific
ownership (e.g., use a namespaced prefix or embed s.instanceID in the key and
scan for scheduledTaskKeyPrefix+":"+s.instanceID+":*"), or alternatively check
each key's owner/claim metadata before deleting (use a Redis GET/HGET or a Lua
script to atomically verify ownership and delete). Update
scheduledTasksStore.Clear, the scan pattern construction (currently pattern :=
scheduledTaskKeyPrefix + ":*"), and the deletion logic (currently calling
s.rdb.Del) to perform instance-scoped deletes only.

25-27: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Add claim ownership semantics for scheduled task release; avoid wiping other instances’ active claims via Clear.

  • Claim release isn’t safe: AddIfAbsent stores constant value "1" and Remove unconditionally DELs the key. ScheduledTasksStore.Remove(ctx, id) has no ownership/token parameter, so if a claim expires and another instance re-claims the same id, an older instance’s deferred releaseClaim() can delete the newer claim.
  • Global cleanup isn’t scoped: sweeper.start() unconditionally calls s.cache.ScheduledTasks().Clear(ctx), and the Redis Clear implementation SCANs scheduledTaskKeyPrefix + ":*" and DELs every match—so another instance’s active claims can be wiped on startup.
Suggested direction
- func (s *scheduledTasksStore) AddIfAbsent(ctx context.Context, id string) (bool, error)
- func (s *scheduledTasksStore) Remove(ctx context.Context, id string) error
+ func (s *scheduledTasksStore) AddIfAbsent(ctx context.Context, id string, ownerToken string) (bool, error)
+ func (s *scheduledTasksStore) Remove(ctx context.Context, id string, ownerToken string) error // Lua compare-and-del
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/infrastructure/live-store/redis/scheduled_tasks.go` around lines 25
- 27, The current claim semantics let a stale/released caller delete another
instance's active claim because AddIfAbsent stores a constant "1" and
scheduledTasksStore.Remove(ctx, id) unconditionally DELs keys; likewise
sweeper.start() calls cache.ScheduledTasks().Clear(ctx) which scans and deletes
all scheduledTaskKeyPrefix keys. Fix by introducing per-claim ownership tokens:
make the claim creation function (AddIfAbsent or the method that sets the key)
store a unique claim token (e.g., instanceID/UUID) instead of "1" and return
that token to the caller; change scheduledTasksStore.Remove(ctx, id) (and any
releaseClaim()) to accept the token and perform an atomic compare-and-delete
(use a Redis EVAL script that checks GET(key) == token then DEL) so only the
owner can delete its claim; update ScheduledTasksStore.Clear (and
sweeper.start() usage) to only remove keys owned by this instance by scanning
keys and deleting only when the stored token matches our instance token (or run
a Lua script to SCAN+conditional DEL), avoiding global wipes of other instances'
claims.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/config/config.go`:
- Line 950: The long constructor call to redislivestore.NewLiveStore assigned to
liveStoreSvc exceeds golines; to fix, break the call across multiple lines or
introduce intermediate variables for arguments (e.g., assign rdb, c.txBuilder,
c.RedisTxNumOfRetries, scheduledTaskTTL to local variables) and then call
redislivestore.NewLiveStore with each argument on its own line so the expression
fits line-length rules; update the assignment to liveStoreSvc =
redislivestore.NewLiveStore( rdb, c.txBuilder, c.RedisTxNumOfRetries,
scheduledTaskTTL ) with proper line breaks to satisfy the linter.

---

Outside diff comments:
In `@internal/infrastructure/live-store/redis/scheduled_tasks.go`:
- Around line 38-57: The Clear method on scheduledTasksStore deletes all keys
matching scheduledTaskKeyPrefix which removes scheduled tasks for other nodes;
change it to only remove keys owned by this instance by filtering on
instance-specific ownership (e.g., use a namespaced prefix or embed s.instanceID
in the key and scan for scheduledTaskKeyPrefix+":"+s.instanceID+":*"), or
alternatively check each key's owner/claim metadata before deleting (use a Redis
GET/HGET or a Lua script to atomically verify ownership and delete). Update
scheduledTasksStore.Clear, the scan pattern construction (currently pattern :=
scheduledTaskKeyPrefix + ":*"), and the deletion logic (currently calling
s.rdb.Del) to perform instance-scoped deletes only.
- Around line 25-27: The current claim semantics let a stale/released caller
delete another instance's active claim because AddIfAbsent stores a constant "1"
and scheduledTasksStore.Remove(ctx, id) unconditionally DELs keys; likewise
sweeper.start() calls cache.ScheduledTasks().Clear(ctx) which scans and deletes
all scheduledTaskKeyPrefix keys. Fix by introducing per-claim ownership tokens:
make the claim creation function (AddIfAbsent or the method that sets the key)
store a unique claim token (e.g., instanceID/UUID) instead of "1" and return
that token to the caller; change scheduledTasksStore.Remove(ctx, id) (and any
releaseClaim()) to accept the token and perform an atomic compare-and-delete
(use a Redis EVAL script that checks GET(key) == token then DEL) so only the
owner can delete its claim; update ScheduledTasksStore.Clear (and
sweeper.start() usage) to only remove keys owned by this instance by scanning
keys and deleting only when the stored token matches our instance token (or run
a Lua script to SCAN+conditional DEL), avoiding global wipes of other instances'
claims.

In `@internal/infrastructure/live-store/redis/store.go`:
- Around line 21-31: The NewLiveStore constructor formatting violates
golines/line-length rules; reflow the argument list in the NewLiveStore function
so each field initialization in the returned &redisLiveStore{...} is on its own
line and lines do not exceed the project's max width (e.g., one field per line:
intentStore: NewIntentStore(...), forfeitTxsStore: NewForfeitTxsStore(...),
offChainTxStore: ..., currentRoundStore: ..., confirmationSessionsStore: ...,
treeSigningSessions: ..., boardingInputsStore: ..., scheduledTasksStore: ...),
preserving the same function names and parameters (NewIntentStore,
NewForfeitTxsStore, NewOffChainTxStore, NewCurrentRoundStore,
NewConfirmationSessionsStore, NewTreeSigningSessionsStore,
NewBoardingInputsStore, NewScheduledTasksStore) so golines/CI no longer fails.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: de76ecee-b88b-496c-a127-d4462003b7b7

📥 Commits

Reviewing files that changed from the base of the PR and between ea70ffc and c5febd5.

📒 Files selected for processing (5)
  • internal/config/config.go
  • internal/core/application/sweeper.go
  • internal/infrastructure/live-store/live_store_test.go
  • internal/infrastructure/live-store/redis/scheduled_tasks.go
  • internal/infrastructure/live-store/redis/store.go

Comment thread internal/config/config.go Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant