diff --git a/README.md b/README.md index 7cab0227a..ca9cb497e 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,7 @@ The `arkd` server can be configured using environment variables. | `ARKD_INDEXER_EXPOSURE`. | Require intent for getting vtxo chain (public, private, withheld) | `public` | | `ARKD_INDEXER_SIGNING_PRIVKEY` | Hex-encoded private key for indexer auth token signing (sensitive) | - | | `ARKD_INDEXER_AUTH_TOKEN_EXPIRY` | Auth token TTL in seconds | `300` (5 minutes) | +| `ARKD_BATCH_TRIGGER` | Optional CEL formula returning `bool`. When set, the server only starts a new batch round when the formula evaluates to `true`. See [`pkg/ark-lib/batchtrigger/README.md`](pkg/ark-lib/batchtrigger/README.md) for the available variables and examples. | - (always start) | ## Provisioning diff --git a/envs/arkd.dev.env b/envs/arkd.dev.env index 967e14b6b..577e4cc50 100644 --- a/envs/arkd.dev.env +++ b/envs/arkd.dev.env @@ -21,4 +21,8 @@ ARKD_BAN_THRESHOLD=1 ARKD_ONCHAIN_OUTPUT_FEE=100 ARKD_ENABLE_PPROF=true ARKD_UNROLLED_VTXO_MIN_EXPIRY_MARGIN=10 -ARKD_SESSION_DURATION=10 \ No newline at end of file +ARKD_SESSION_DURATION=10 +# Optional CEL gate: only start a batch round when the formula returns true. +# Empty/unset = always start (legacy behaviour). +# See pkg/ark-lib/batchtrigger/README.md for the variable reference. +ARKD_BATCH_TRIGGER= diff --git a/envs/arkd.light.env b/envs/arkd.light.env index 35c74d3d3..0b89b2838 100644 --- a/envs/arkd.light.env +++ b/envs/arkd.light.env @@ -16,3 +16,7 @@ ARKD_ONCHAIN_OUTPUT_FEE=100 ARKD_VTXO_TREE_EXPIRY=40 ARKD_CHECKPOINT_EXIT_DELAY=10 ARKD_UNROLLED_VTXO_MIN_EXPIRY_MARGIN=10 +# Optional CEL gate: only start a batch round when the formula returns true. +# Empty/unset = always start (legacy behaviour). +# See pkg/ark-lib/batchtrigger/README.md for the variable reference. +ARKD_BATCH_TRIGGER= diff --git a/internal/config/config.go b/internal/config/config.go index 911ba7c1e..b13879fce 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -27,6 +27,7 @@ import ( fileunlocker "github.com/arkade-os/arkd/internal/infrastructure/unlocker/file" walletclient "github.com/arkade-os/arkd/internal/infrastructure/wallet" arklib "github.com/arkade-os/arkd/pkg/ark-lib" + "github.com/arkade-os/arkd/pkg/ark-lib/batchtrigger" "github.com/btcsuite/btcd/btcec/v2" "github.com/redis/go-redis/v9" log "github.com/sirupsen/logrus" @@ -149,6 +150,11 @@ type Config struct { MaxConcurrentStreams uint32 StreamConnPoolSize uint32 + // BatchTrigger is an optional CEL formula. When set, the server only + // starts a new batch round when the formula evaluates to true. When + // empty, every session starts a round (legacy behaviour). + BatchTrigger string + fee ports.FeeManager repo ports.RepoManager svc application.Service @@ -254,6 +260,9 @@ var ( IndexerSigningKey = "INDEXER_SIGNING_PRIVKEY" // #nosec G101 MaxConcurrentStreams = "MAX_CONCURRENT_STREAMS" StreamConnPoolSize = "STREAM_CONN_POOL_SIZE" + // BatchTrigger is a CEL formula evaluated before every round to decide + // whether the server should start a new batch. Empty = always start. + BatchTrigger = "BATCH_TRIGGER" defaultDatadir = arklib.AppDataDir("arkd", false) defaultSessionDuration = 30 @@ -469,6 +478,7 @@ func LoadConfig() (*Config, error) { ), // Default to 1 if set to 0 MaxOpReturnOutputs: max(1, viper.GetUint32(MaxOpReturnOutputs)), + BatchTrigger: viper.GetString(BatchTrigger), }, nil } @@ -677,6 +687,10 @@ func (c *Config) Validate() error { return fmt.Errorf("max concurrent streams must be greater than 0") } + if _, err := batchtrigger.New(c.BatchTrigger); err != nil { + return fmt.Errorf("invalid batch trigger program: %w", err) + } + if err := c.repoManager(); err != nil { return err } @@ -1019,6 +1033,7 @@ func (c *Config) appService() error { c.SettlementMinExpiryGap, c.UnrolledVtxoMinExpiryMargin, time.Unix(c.VtxoNoCsvValidationCutoffDate, 0), c.MaxOpReturnOutputs, + c.BatchTrigger, ) if err != nil { return err diff --git a/internal/core/application/batch_trigger_test.go b/internal/core/application/batch_trigger_test.go new file mode 100644 index 000000000..e57cacad5 --- /dev/null +++ b/internal/core/application/batch_trigger_test.go @@ -0,0 +1,222 @@ +package application + +import ( + "testing" + "time" + + "github.com/arkade-os/arkd/internal/core/domain" + "github.com/arkade-os/arkd/internal/core/ports" + "github.com/arkade-os/arkd/pkg/ark-lib/batchtrigger" + "github.com/stretchr/testify/require" +) + +func TestAggregateIntentTriggerData(t *testing.T) { + tests := []struct { + name string + intents []ports.TimedIntent + wantBoardingInputsCount int64 + wantTotalBoardingAmount uint64 + wantTotalIntentFees uint64 + }{ + { + name: "empty intents", + intents: nil, + wantBoardingInputsCount: 0, + wantTotalBoardingAmount: 0, + wantTotalIntentFees: 0, + }, + { + name: "single intent with boarding inputs and positive fee", + intents: []ports.TimedIntent{ + { + Intent: domain.Intent{ + Inputs: []domain.Vtxo{ + {Amount: 1000}, + {Amount: 500}, + }, + Receivers: []domain.Receiver{ + {Amount: 800}, + {Amount: 600}, + }, + }, + BoardingInputs: []ports.BoardingInput{ + {Amount: 200}, + {Amount: 300}, + }, + }, + }, + wantBoardingInputsCount: 2, + wantTotalBoardingAmount: 500, + // inputs: 1500 vtxo + 500 boarding = 2000; outputs: 1400; fee = 600 + wantTotalIntentFees: 600, + }, + { + name: "intent with no boarding and no fee (inputs == outputs)", + intents: []ports.TimedIntent{ + { + Intent: domain.Intent{ + Inputs: []domain.Vtxo{{Amount: 1000}}, + Receivers: []domain.Receiver{{Amount: 1000}}, + }, + }, + }, + wantBoardingInputsCount: 0, + wantTotalBoardingAmount: 0, + wantTotalIntentFees: 0, + }, + { + name: "intent where outputs exceed inputs is treated as zero fee", + intents: []ports.TimedIntent{ + { + Intent: domain.Intent{ + Inputs: []domain.Vtxo{{Amount: 100}}, + Receivers: []domain.Receiver{{Amount: 200}}, + }, + }, + }, + wantBoardingInputsCount: 0, + wantTotalBoardingAmount: 0, + wantTotalIntentFees: 0, + }, + { + name: "multiple intents are summed", + intents: []ports.TimedIntent{ + { + Intent: domain.Intent{ + Inputs: []domain.Vtxo{{Amount: 1000}}, + Receivers: []domain.Receiver{{Amount: 900}}, + }, + BoardingInputs: []ports.BoardingInput{{Amount: 50}}, + }, + { + Intent: domain.Intent{ + Inputs: []domain.Vtxo{{Amount: 2000}}, + Receivers: []domain.Receiver{{Amount: 1800}}, + }, + BoardingInputs: []ports.BoardingInput{ + {Amount: 100}, + {Amount: 100}, + }, + }, + }, + wantBoardingInputsCount: 3, + wantTotalBoardingAmount: 250, + // intent 1: 1000+50 - 900 = 150 + // intent 2: 2000+200 - 1800 = 400 + wantTotalIntentFees: 550, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotCount, gotBoarding, gotFees := aggregateIntentTriggerData(tt.intents) + require.Equal(t, tt.wantBoardingInputsCount, gotCount, "boarding inputs count") + require.Equal(t, tt.wantTotalBoardingAmount, gotBoarding, "total boarding amount") + require.Equal(t, tt.wantTotalIntentFees, gotFees, "total intent fees") + }) + } +} + +func TestEvalBatchTrigger(t *testing.T) { + tests := []struct { + name string + program string + ctx batchtrigger.Context + want bool + }{ + { + name: "nil trigger always permits", + program: "", + want: true, + }, + { + name: "true literal permits", + program: "true", + want: true, + }, + { + name: "false literal denies", + program: "false", + want: false, + }, + { + name: "intent count gate satisfied", + program: "intents_count >= 2.0", + ctx: batchtrigger.Context{IntentsCount: 5}, + want: true, + }, + { + name: "intent count gate unsatisfied", + program: "intents_count >= 2.0", + ctx: batchtrigger.Context{IntentsCount: 1}, + want: false, + }, + { + name: "fee revenue gate satisfied", + program: "total_intent_fees >= 500.0", + ctx: batchtrigger.Context{TotalIntentFees: 1000}, + want: true, + }, + { + name: "issue 1045 example: many intents, low fees", + program: "intents_count > 1.0 && " + + "(current_feerate <= 2.0 || time_since_last_batch >= 3600.0)", + ctx: batchtrigger.Context{ + IntentsCount: 3, + CurrentFeerate: 1, + }, + want: true, + }, + { + name: "issue 1045 example: many intents, high fees, but stale", + program: "intents_count > 1.0 && " + + "(current_feerate <= 2.0 || time_since_last_batch >= 3600.0)", + ctx: batchtrigger.Context{ + IntentsCount: 3, + CurrentFeerate: 50, + TimeSinceLastBatch: 7200, + }, + want: true, + }, + { + name: "issue 1045 example: many intents, high fees, recent", + program: "intents_count > 1.0 && " + + "(current_feerate <= 2.0 || time_since_last_batch >= 3600.0)", + ctx: batchtrigger.Context{ + IntentsCount: 3, + CurrentFeerate: 50, + TimeSinceLastBatch: 60, + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr, err := batchtrigger.New(tt.program) + require.NoError(t, err) + + s := &service{batchTrigger: tr} + require.Equal(t, tt.want, s.evalBatchTrigger(tt.ctx)) + }) + } +} + +func TestEvalBatchTriggerNilFailsOpen(t *testing.T) { + // A nil service.batchTrigger must permit; the context value is ignored. + s := &service{} + require.True(t, s.evalBatchTrigger(batchtrigger.Context{})) + require.True(t, s.evalBatchTrigger(batchtrigger.Context{IntentsCount: 0})) +} + +func TestLastBatchAtRoundtrip(t *testing.T) { + // Sanity check that the atomic counter we use to derive + // time_since_last_batch behaves as expected: zero until set, monotonic + // once written. + s := &service{} + require.Equal(t, int64(0), s.lastBatchAt.Load()) + + now := time.Now().Unix() + s.lastBatchAt.Store(now) + require.Equal(t, now, s.lastBatchAt.Load()) +} diff --git a/internal/core/application/service.go b/internal/core/application/service.go index 7959fa386..f715ea3a9 100644 --- a/internal/core/application/service.go +++ b/internal/core/application/service.go @@ -17,6 +17,7 @@ import ( "github.com/arkade-os/arkd/internal/core/ports" arklib "github.com/arkade-os/arkd/pkg/ark-lib" "github.com/arkade-os/arkd/pkg/ark-lib/asset" + "github.com/arkade-os/arkd/pkg/ark-lib/batchtrigger" "github.com/arkade-os/arkd/pkg/ark-lib/extension" "github.com/arkade-os/arkd/pkg/ark-lib/intent" "github.com/arkade-os/arkd/pkg/ark-lib/offchain" @@ -80,6 +81,14 @@ type service struct { // fees feeManager ports.FeeManager + // batchTrigger is an optional CEL gate evaluated at the top of every + // startRound() call. A nil trigger always permits the round. + batchTrigger *batchtrigger.Trigger + // lastBatchAt is the Unix timestamp of the most recently finalized + // batch. Set after a successful EndFinalization. Zero until the first + // batch is finalized after server start. + lastBatchAt atomic.Int64 + // cutoff date (unix timestamp) before which CSV validation is skipped for VTXOs vtxoNoCsvValidationCutoffTime time.Time @@ -127,6 +136,7 @@ func NewService( unrolledVtxoMinExpiryMargin int64, vtxoNoCsvValidationCutoffTime time.Time, maxOpReturnOutputs uint32, + batchTriggerProgram string, ) (Service, error) { ctx := context.Background() @@ -135,6 +145,11 @@ func NewService( return nil, fmt.Errorf("failed to fetch signer pubkey: %s", err) } + batchTrigger, err := batchtrigger.New(batchTriggerProgram) + if err != nil { + return nil, fmt.Errorf("failed to compile batch trigger: %w", err) + } + // Try to load scheduled session from DB first scheduledSession, err := repoManager.ScheduledSession().Get(ctx) if err != nil { @@ -222,6 +237,7 @@ func NewService( unrolledVtxoMinExpiryMargin: time.Duration(unrolledVtxoMinExpiryMargin) * time.Second, vtxoNoCsvValidationCutoffTime: vtxoNoCsvValidationCutoffTime, feeManager: feeManager, + batchTrigger: batchTrigger, } svc.infoCache = newInfoCache(svc.loadInfo) return svc, nil @@ -2362,6 +2378,96 @@ func (s *service) start() { s.startRound() } +// aggregateIntentTriggerData computes the boarding/fee aggregates exposed to +// the batch_trigger CEL program from a list of pending intents. The implicit +// fee per intent is (input amounts) - (output amounts), where inputs include +// both vtxo inputs and boarding inputs. Mirrors the formula used to compute +// round-level CollectedFees in alert.go. +func aggregateIntentTriggerData( + intents []ports.TimedIntent, +) (boardingInputsCount int64, totalBoardingAmount, totalIntentFees uint64) { + for _, it := range intents { + var boardingAmount uint64 + for _, bi := range it.BoardingInputs { + boardingAmount += bi.Amount + boardingInputsCount++ + } + totalBoardingAmount += boardingAmount + + inputAmount := it.TotalInputAmount() + boardingAmount + outputAmount := it.TotalOutputAmount() + if inputAmount > outputAmount { + totalIntentFees += inputAmount - outputAmount + } + } + return +} + +// collectTriggerContext gathers a snapshot of the variables exposed to the +// batch_trigger CEL program. Errors are logged and surfaced as zero values so +// that a transient failure (e.g. a wallet RPC blip) cannot wedge the round +// scheduler — the gate then falls through to the worst-case interpretation +// (no fee/no intent/no boarding) which a sensible formula will reject. +func (s *service) collectTriggerContext(ctx context.Context) batchtrigger.Context { + tc := batchtrigger.Context{} + + if feerate, err := s.wallet.FeeRate(ctx); err != nil { + log.WithError(err).Warn("batch_trigger: failed to read fee rate") + } else { + tc.CurrentFeerate = feerate + } + + if last := s.lastBatchAt.Load(); last > 0 { + now := time.Now().Unix() + if now > last { + tc.TimeSinceLastBatch = now - last + } + } + + // Read intents once so IntentsCount and the boarding/fee aggregates all + // derive from the same snapshot — using a separate Len() call would race + // with concurrent intent registrations. + intents, err := s.cache.Intents().ViewAll(ctx, nil) + if err != nil { + log.WithError(err).Warn("batch_trigger: failed to view pending intents") + return tc + } + tc.IntentsCount = int64(len(intents)) + tc.BoardingInputsCount, tc.TotalBoardingAmount, tc.TotalIntentFees = + aggregateIntentTriggerData(intents) + + return tc +} + +// evalBatchTrigger evaluates the configured trigger against the supplied +// context. Returns true when no trigger is configured, when the program +// returns true, or when evaluation fails (failing open is intentional — we +// never want a buggy formula to halt rounds permanently; the misconfiguration +// is logged loudly). +func (s *service) evalBatchTrigger(tc batchtrigger.Context) bool { + if s.batchTrigger == nil { + return true + } + ok, err := s.batchTrigger.Eval(tc) + if err != nil { + log.WithError(err).Warnf( + "batch_trigger: evaluation failed for program %q, starting batch as fallback", + s.batchTrigger.Source(), + ) + return true + } + return ok +} + +// shouldStartBatch evaluates the batch_trigger gate using a freshly collected +// context. +func (s *service) shouldStartBatch(ctx context.Context) bool { + if s.batchTrigger == nil { + return true + } + return s.evalBatchTrigger(s.collectTriggerContext(ctx)) +} + func (s *service) startRound() { defer s.wg.Done() @@ -2415,6 +2521,21 @@ func (s *service) startRound() { } } + if !s.shouldStartBatch(ctx) { + // Gate denied the round. Wait one registration window then re-check + // without creating any round state. + backoff := newRoundTiming(s.sessionDuration).registrationDuration() + log.Debugf("batch_trigger denied round, waiting %s before re-check", backoff) + select { + case <-s.ctx.Done(): + return + case <-time.After(backoff): + } + s.wg.Add(1) + go s.startRound() + return + } + round := domain.NewRound() // nolint round.StartRegistration() @@ -3318,6 +3439,8 @@ func (s *service) finalizeRound(roundId string, roundTiming roundTiming) { return } + s.lastBatchAt.Store(time.Now().Unix()) + totalOutputVtxos := len(round.VtxoTree.Leaves()) numOfTreeNodes := len(round.VtxoTree) diff --git a/pkg/ark-lib/batchtrigger/README.md b/pkg/ark-lib/batchtrigger/README.md new file mode 100644 index 000000000..7f71c6e3c --- /dev/null +++ b/pkg/ark-lib/batchtrigger/README.md @@ -0,0 +1,80 @@ +# batchtrigger + +The `batchtrigger` package compiles and evaluates an operator-supplied +[CEL](https://github.com/google/cel-spec) formula that decides whether the +server should start a new batch round. It mirrors the design of +[`pkg/ark-lib/arkfee`](../arkfee/README.md) — programs are compiled once and +reused on every evaluation. + +## When the gate runs + +The compiled program is evaluated at the top of `startRound()`, before any +new round state is created. If it returns `true`, the round proceeds as +before. If it returns `false`, the server logs a debug message, waits one +sixth of `ARKD_SESSION_DURATION` (the same cadence as the registration +window), and re-checks. + +A nil/empty program is permissive: the gate always allows the round, which +preserves the legacy "start every session" behaviour for deployments that do +not configure `ARKD_BATCH_TRIGGER`. + +## CEL Environment + +A program must return `bool`. The following variables are exposed: + +| Variable | Type | Description | +|----------|------|-------------| +| `intents_count` | `double` | Number of pending intents queued | +| `current_feerate` | `double` | Current mempool fee rate in sat/kvbyte (as reported by the wallet) | +| `time_since_last_batch` | `double` | Seconds elapsed since the last finalized batch (`0` if no batch finalized since boot) | +| `boarding_inputs_count` | `double` | Total number of pending boarding UTXOs across all queued intents | +| `total_boarding_amount` | `double` | Total satoshis across all pending boarding UTXOs | +| `total_intent_fees` | `double` | Total implicit fees in satoshis across all pending intents (sum of `(input amounts) - (output amounts)` per intent) | + +The `now() -> double` helper is also available and returns the current Unix +timestamp in seconds. + +## Examples + +All variables are typed as `double`, so numeric literals must use the `.0` +form (e.g. `1.0`, not `1`). CEL is strictly typed and will refuse `>` between +a `double` and an `int`. + +**Original example from the issue — only batch when there's more than one +intent and either fees are low or an hour has passed:** + +```cel +intents_count > 1.0 && (current_feerate <= 2.0 || time_since_last_batch >= 3600.0) +``` + +**Pure boarding-driven settlement (settle whenever a non-trivial amount of +sats is queued in boarding):** + +```cel +boarding_inputs_count > 0.0 && total_boarding_amount >= 100000.0 +``` + +**Combined revenue and time gate (settle when at least 500 sats of intent +fees are on the table or it's been more than 30 minutes):** + +```cel +total_intent_fees >= 500.0 || time_since_last_batch >= 1800.0 +``` + +**Always-on (default behaviour when `ARKD_BATCH_TRIGGER` is unset):** + +```cel +true +``` + +## Validation + +The program is compiled once at server startup. Any of the following errors +are surfaced before the server begins serving traffic: + +* CEL syntax errors +* a return type other than `bool` +* references to variables that are not in the table above +* type mismatches in operators or function calls + +This means a misconfigured program never makes it past `arkd Validate()`. diff --git a/pkg/ark-lib/batchtrigger/celenv/env.go b/pkg/ark-lib/batchtrigger/celenv/env.go new file mode 100644 index 000000000..5dcb9bba9 --- /dev/null +++ b/pkg/ark-lib/batchtrigger/celenv/env.go @@ -0,0 +1,31 @@ +// Package celenv defines the CEL environment used by batch_trigger programs. +// +// The environment exposes a fixed set of variables describing the state of the +// pending batch (queued intents, fee market, time since last batch, etc.) and a +// `now()` helper that returns the current unix timestamp in seconds. +package celenv + +import ( + "github.com/google/cel-go/cel" +) + +// BatchTriggerEnv is the CEL environment used to compile batch_trigger programs. +var BatchTriggerEnv *cel.Env + +func init() { + var err error + BatchTriggerEnv, err = cel.NewEnv( + // variables + intentsCount, + currentFeerate, + timeSinceLastBatch, + boardingInputsCount, + totalBoardingAmount, + totalIntentFees, + // functions + nowFunction, + ) + if err != nil { + panic(err) + } +} diff --git a/pkg/ark-lib/batchtrigger/celenv/functions.go b/pkg/ark-lib/batchtrigger/celenv/functions.go new file mode 100644 index 000000000..a978b984a --- /dev/null +++ b/pkg/ark-lib/batchtrigger/celenv/functions.go @@ -0,0 +1,22 @@ +package celenv + +import ( + "time" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" +) + +// now() returns the current Unix timestamp in seconds. The binding is +// evaluated on every call: BatchTriggerEnv does not opt into the constant +// folding optimizer (cel.OptOptimize), so the value is never folded. +var nowFunction = cel.Function("now", + cel.Overload("now_double", + []*cel.Type{}, + cel.DoubleType, + cel.FunctionBinding(func(_ ...ref.Val) ref.Val { + return types.Double(time.Now().Unix()) + }), + ), +) diff --git a/pkg/ark-lib/batchtrigger/celenv/variables.go b/pkg/ark-lib/batchtrigger/celenv/variables.go new file mode 100644 index 000000000..df9de4df4 --- /dev/null +++ b/pkg/ark-lib/batchtrigger/celenv/variables.go @@ -0,0 +1,43 @@ +package celenv + +import ( + "github.com/google/cel-go/cel" +) + +const ( + IntentsCountVariableName = "intents_count" + CurrentFeerateVariableName = "current_feerate" + TimeSinceLastBatchVariableName = "time_since_last_batch" + BoardingInputsCountVariableName = "boarding_inputs_count" + TotalBoardingAmountVariableName = "total_boarding_amount" + TotalIntentFeesVariableName = "total_intent_fees" +) + +var ( + intentsCount = cel.VariableWithDoc( + IntentsCountVariableName, cel.DoubleType, + "Number of pending intents queued", + ) + currentFeerate = cel.VariableWithDoc( + CurrentFeerateVariableName, cel.DoubleType, + "Current mempool fee rate in sat/kvbyte (as reported by the wallet)", + ) + timeSinceLastBatch = cel.VariableWithDoc( + TimeSinceLastBatchVariableName, cel.DoubleType, + "Seconds elapsed since the last batch was finalized "+ + "(0 if no batch has been finalized since the server started)", + ) + boardingInputsCount = cel.VariableWithDoc( + BoardingInputsCountVariableName, cel.DoubleType, + "Total number of pending boarding UTXOs across all queued intents", + ) + totalBoardingAmount = cel.VariableWithDoc( + TotalBoardingAmountVariableName, cel.DoubleType, + "Total amount in satoshis across all pending boarding UTXOs", + ) + totalIntentFees = cel.VariableWithDoc( + TotalIntentFeesVariableName, cel.DoubleType, + "Total implicit fees in satoshis across all pending intents "+ + "(sum of (input amounts) - (output amounts) per intent)", + ) +) diff --git a/pkg/ark-lib/batchtrigger/testdata/invalid.json b/pkg/ark-lib/batchtrigger/testdata/invalid.json new file mode 100644 index 000000000..ccc266c6d --- /dev/null +++ b/pkg/ark-lib/batchtrigger/testdata/invalid.json @@ -0,0 +1,39 @@ +{ + "invalidConfigs": [ + { + "name": "syntax error", + "program": "intents_count >>", + "err": "Syntax error" + }, + { + "name": "wrong return type - double", + "program": "intents_count + 1.0", + "err": "expected return type bool" + }, + { + "name": "wrong return type - string", + "program": "'hello'", + "err": "expected return type bool" + }, + { + "name": "wrong return type - int", + "program": "1", + "err": "expected return type bool" + }, + { + "name": "undeclared variable", + "program": "undeclared_thing > 0.0", + "err": "undeclared reference" + }, + { + "name": "type mismatch", + "program": "intents_count == 'foo'", + "err": "no matching overload" + }, + { + "name": "unknown function", + "program": "missing_func() > 0.0", + "err": "undeclared reference" + } + ] +} diff --git a/pkg/ark-lib/batchtrigger/testdata/valid.json b/pkg/ark-lib/batchtrigger/testdata/valid.json new file mode 100644 index 000000000..28e58f949 --- /dev/null +++ b/pkg/ark-lib/batchtrigger/testdata/valid.json @@ -0,0 +1,103 @@ +{ + "cases": [ + { + "name": "always true literal", + "program": "true", + "context": {}, + "expected": true + }, + { + "name": "always false literal", + "program": "false", + "context": {}, + "expected": false + }, + { + "name": "issue 1045 example: many intents or low fees or stale", + "program": "intents_count > 1.0 && (current_feerate <= 2.0 || time_since_last_batch >= 3600.0)", + "context": { + "intents_count": 5, + "current_feerate": 1, + "time_since_last_batch": 60, + "boarding_inputs_count": 0, + "total_boarding_amount": 0, + "total_intent_fees": 0 + }, + "expected": true + }, + { + "name": "issue 1045 example: not enough intents", + "program": "intents_count > 1.0 && (current_feerate <= 2.0 || time_since_last_batch >= 3600.0)", + "context": { + "intents_count": 1, + "current_feerate": 1, + "time_since_last_batch": 99999, + "boarding_inputs_count": 0, + "total_boarding_amount": 0, + "total_intent_fees": 0 + }, + "expected": false + }, + { + "name": "issue 1045 example: high fees and recent batch", + "program": "intents_count > 1.0 && (current_feerate <= 2.0 || time_since_last_batch >= 3600.0)", + "context": { + "intents_count": 10, + "current_feerate": 50, + "time_since_last_batch": 60, + "boarding_inputs_count": 0, + "total_boarding_amount": 0, + "total_intent_fees": 0 + }, + "expected": false + }, + { + "name": "boarding-driven trigger", + "program": "boarding_inputs_count > 0.0 && total_boarding_amount >= 100000.0", + "context": { + "intents_count": 0, + "current_feerate": 0, + "time_since_last_batch": 0, + "boarding_inputs_count": 1, + "total_boarding_amount": 100000, + "total_intent_fees": 0 + }, + "expected": true + }, + { + "name": "boarding under threshold", + "program": "boarding_inputs_count > 0.0 && total_boarding_amount >= 100000.0", + "context": { + "intents_count": 0, + "current_feerate": 0, + "time_since_last_batch": 0, + "boarding_inputs_count": 3, + "total_boarding_amount": 50000, + "total_intent_fees": 0 + }, + "expected": false + }, + { + "name": "fee-revenue gate", + "program": "total_intent_fees >= 500.0", + "context": { + "total_intent_fees": 500 + }, + "expected": true + }, + { + "name": "fee-revenue gate insufficient", + "program": "total_intent_fees >= 500.0", + "context": { + "total_intent_fees": 499 + }, + "expected": false + }, + { + "name": "now() helper available", + "program": "now() > 0.0", + "context": {}, + "expected": true + } + ] +} diff --git a/pkg/ark-lib/batchtrigger/trigger.go b/pkg/ark-lib/batchtrigger/trigger.go new file mode 100644 index 000000000..f1687997d --- /dev/null +++ b/pkg/ark-lib/batchtrigger/trigger.go @@ -0,0 +1,77 @@ +// Package batchtrigger evaluates an operator-supplied CEL formula that decides +// whether the server should start a new batch round. +// +// The package mirrors the pattern used by pkg/ark-lib/arkfee for fee programs: +// programs are compiled once via Parse, then evaluated against a Context +// snapshot. A nil Trigger is always permissive (returns true) so that callers +// can keep batch behaviour unchanged when no formula is configured. +package batchtrigger + +import ( + "fmt" + "reflect" + + "github.com/arkade-os/arkd/pkg/ark-lib/batchtrigger/celenv" + "github.com/google/cel-go/cel" +) + +// Trigger wraps a compiled CEL program and the original source text. +type Trigger struct { + program cel.Program + txt string +} + +// New parses the supplied program text against the batch_trigger CEL +// environment. An empty string returns (nil, nil) so callers can treat the +// gate as "always allow" without special-casing. +func New(program string) (*Trigger, error) { + if program == "" { + return nil, nil + } + return Parse(program) +} + +// Parse compiles a batch_trigger CEL program. It enforces that the program's +// output type is bool — anything else is rejected at compile time so operators +// catch mistakes before the server boots. +func Parse(txt string) (*Trigger, error) { + ast, issues := celenv.BatchTriggerEnv.Compile(txt) + if issues != nil && issues.Err() != nil { + return nil, issues.Err() + } + + if !ast.OutputType().IsExactType(cel.BoolType) { + return nil, fmt.Errorf("expected return type bool, got %v", ast.OutputType()) + } + + prg, err := celenv.BatchTriggerEnv.Program(ast) + if err != nil { + return nil, err + } + return &Trigger{program: prg, txt: txt}, nil +} + +// Source returns the original program text. +func (t *Trigger) Source() string { + if t == nil { + return "" + } + return t.txt +} + +// Eval evaluates the program against the supplied context. A nil receiver +// returns true so that an unconfigured trigger never blocks a batch. +func (t *Trigger) Eval(ctx Context) (bool, error) { + if t == nil { + return true, nil + } + result, _, err := t.program.Eval(ctx.toArgs()) + if err != nil { + return false, err + } + native, err := result.ConvertToNative(reflect.TypeOf(false)) + if err != nil { + return false, err + } + return native.(bool), nil +} diff --git a/pkg/ark-lib/batchtrigger/trigger_test.go b/pkg/ark-lib/batchtrigger/trigger_test.go new file mode 100644 index 000000000..d63a682b3 --- /dev/null +++ b/pkg/ark-lib/batchtrigger/trigger_test.go @@ -0,0 +1,130 @@ +package batchtrigger_test + +import ( + _ "embed" + "encoding/json" + "testing" + + "github.com/arkade-os/arkd/pkg/ark-lib/batchtrigger" + "github.com/stretchr/testify/require" +) + +//go:embed testdata/valid.json +var validJSON []byte + +//go:embed testdata/invalid.json +var invalidJSON []byte + +type validTestData struct { + Cases []validCase `json:"cases"` +} + +type validCase struct { + Name string `json:"name"` + Program string `json:"program"` + Context jsonContext `json:"context"` + Expected bool `json:"expected"` +} + +type jsonContext struct { + IntentsCount int64 `json:"intents_count,omitempty"` + CurrentFeerate uint64 `json:"current_feerate,omitempty"` + TimeSinceLastBatch int64 `json:"time_since_last_batch,omitempty"` + BoardingInputsCount int64 `json:"boarding_inputs_count,omitempty"` + TotalBoardingAmount uint64 `json:"total_boarding_amount,omitempty"` + TotalIntentFees uint64 `json:"total_intent_fees,omitempty"` +} + +func (j jsonContext) toContext() batchtrigger.Context { + return batchtrigger.Context{ + IntentsCount: j.IntentsCount, + CurrentFeerate: j.CurrentFeerate, + TimeSinceLastBatch: j.TimeSinceLastBatch, + BoardingInputsCount: j.BoardingInputsCount, + TotalBoardingAmount: j.TotalBoardingAmount, + TotalIntentFees: j.TotalIntentFees, + } +} + +type invalidTestData struct { + InvalidConfigs []invalidCase `json:"invalidConfigs"` +} + +type invalidCase struct { + Name string `json:"name"` + Program string `json:"program"` + Err string `json:"err"` +} + +func TestNewEmpty(t *testing.T) { + tr, err := batchtrigger.New("") + require.NoError(t, err) + require.Nil(t, tr) +} + +func TestNilTriggerAlwaysAllows(t *testing.T) { + var tr *batchtrigger.Trigger + ok, err := tr.Eval(batchtrigger.Context{}) + require.NoError(t, err) + require.True(t, ok) + require.Empty(t, tr.Source()) +} + +func TestSource(t *testing.T) { + src := "intents_count >= 1.0" + tr, err := batchtrigger.New(src) + require.NoError(t, err) + require.NotNil(t, tr) + require.Equal(t, src, tr.Source()) +} + +func TestParseInvalid(t *testing.T) { + var data invalidTestData + require.NoError(t, json.Unmarshal(invalidJSON, &data)) + require.NotEmpty(t, data.InvalidConfigs) + + for _, tc := range data.InvalidConfigs { + t.Run(tc.Name, func(t *testing.T) { + tr, err := batchtrigger.New(tc.Program) + require.Error(t, err) + require.Nil(t, tr) + require.ErrorContains(t, err, tc.Err) + }) + } +} + +func TestEvalValid(t *testing.T) { + var data validTestData + require.NoError(t, json.Unmarshal(validJSON, &data)) + require.NotEmpty(t, data.Cases) + + for _, tc := range data.Cases { + t.Run(tc.Name, func(t *testing.T) { + tr, err := batchtrigger.New(tc.Program) + require.NoError(t, err) + require.NotNil(t, tr) + + got, err := tr.Eval(tc.Context.toContext()) + require.NoError(t, err) + require.Equal(t, tc.Expected, got) + }) + } +} + +func TestEvalAllVariablesAccessible(t *testing.T) { + // Ensures every declared CEL variable can be referenced without an + // "undeclared reference" error. + prog := "intents_count >= 0.0 && " + + "current_feerate >= 0.0 && " + + "time_since_last_batch >= 0.0 && " + + "boarding_inputs_count >= 0.0 && " + + "total_boarding_amount >= 0.0 && " + + "total_intent_fees >= 0.0" + tr, err := batchtrigger.New(prog) + require.NoError(t, err) + require.NotNil(t, tr) + + ok, err := tr.Eval(batchtrigger.Context{}) + require.NoError(t, err) + require.True(t, ok) +} diff --git a/pkg/ark-lib/batchtrigger/types.go b/pkg/ark-lib/batchtrigger/types.go new file mode 100644 index 000000000..9d0498cf2 --- /dev/null +++ b/pkg/ark-lib/batchtrigger/types.go @@ -0,0 +1,40 @@ +package batchtrigger + +import ( + "github.com/arkade-os/arkd/pkg/ark-lib/batchtrigger/celenv" +) + +// Context is the snapshot of state passed to a batch_trigger program when it is +// evaluated. Every field maps 1:1 to a CEL variable declared in +// pkg/ark-lib/batchtrigger/celenv. +type Context struct { + // IntentsCount is the number of pending intents queued. + IntentsCount int64 + // CurrentFeerate is the current mempool fee rate in sat/kvbyte + // (matches ports.WalletService.FeeRate). + CurrentFeerate uint64 + // TimeSinceLastBatch is the number of seconds elapsed since the last batch + // was finalized. It is 0 when no batch has been finalized yet since the + // server started. + TimeSinceLastBatch int64 + // BoardingInputsCount is the total number of pending boarding UTXOs across + // all queued intents. + BoardingInputsCount int64 + // TotalBoardingAmount is the total amount in satoshis across all pending + // boarding UTXOs. + TotalBoardingAmount uint64 + // TotalIntentFees is the total implicit fees in satoshis across all pending + // intents (sum of (input amounts) - (output amounts) per intent). + TotalIntentFees uint64 +} + +func (c Context) toArgs() map[string]any { + return map[string]any{ + celenv.IntentsCountVariableName: float64(c.IntentsCount), + celenv.CurrentFeerateVariableName: float64(c.CurrentFeerate), + celenv.TimeSinceLastBatchVariableName: float64(c.TimeSinceLastBatch), + celenv.BoardingInputsCountVariableName: float64(c.BoardingInputsCount), + celenv.TotalBoardingAmountVariableName: float64(c.TotalBoardingAmount), + celenv.TotalIntentFeesVariableName: float64(c.TotalIntentFees), + } +}