Skip to content
Open
77 changes: 43 additions & 34 deletions internal/core/application/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,21 +426,19 @@ func (s *service) Stop() {
s.sweeper.stop()

commitmentTxIds, err := s.repoManager.Rounds().GetSweepableRounds(ctx)
if err == nil {
tapkeys := make([]string, 0)

for _, commitmentTxId := range commitmentTxIds {
keys, err := s.repoManager.Vtxos().
GetVtxoPubKeysByCommitmentTxid(ctx, commitmentTxId, 0)
if err != nil {
log.WithError(err).Warn("failed to get vtxo tap keys")
continue
}

tapkeys = append(tapkeys, keys...)
if err == nil && len(commitmentTxIds) > 0 {
tapkeys, err := s.repoManager.Vtxos().
GetVtxoPubKeysByCommitmentTxids(ctx, commitmentTxIds, 0)
if err != nil {
log.WithError(err).Warnf(
"failed to get vtxo tap keys for %d sweepable rounds; "+
"skipping UnwatchScripts on shutdown, wallet may keep "+
"watching these scripts until the next restart",
len(commitmentTxIds),
)
} else {
s.stopWatchingVtxos(tapkeys)
}

s.stopWatchingVtxos(tapkeys)
}

// nolint
Expand Down Expand Up @@ -3624,6 +3622,14 @@ func (s *service) startWatchingVtxos(vtxos []domain.Vtxo) error {
return s.scanner.WatchScripts(context.Background(), scripts)
}

// restoreWatchingVtxos re-registers every sweepable round's vtxo pubkeys
// with the chain scanner so we resume receiving notifications after a
// restart. The pubkey lookup uses the bulk repo method
// GetVtxoPubKeysByCommitmentTxids so we issue exactly two DB queries
// (one for the round list, one for all keys) regardless of how many
// sweepable rounds exist. The cross-process WatchScripts gRPC call is
// chunked by walletclient.WatchScripts to stay below the default
// 4 MiB gRPC max-message size at large script counts.
func (s *service) restoreWatchingVtxos() error {
ctx := context.Background()

Expand All @@ -3632,38 +3638,41 @@ func (s *service) restoreWatchingVtxos() error {
return err
}

total := len(commitmentTxIds)
lastMilestone := 0
scripts := make([]string, 0)
for i, commitmentTxId := range commitmentTxIds {
tapKeys, err := s.repoManager.Vtxos().GetVtxoPubKeysByCommitmentTxid(ctx, commitmentTxId, 0)
if err != nil {
return err
}
if len(commitmentTxIds) == 0 {
return nil
}

for _, key := range tapKeys {
// skip if the key is not a valid x-only hex encoded pubkey
if len(key) != 64 {
continue
}
scripts = append(scripts, fmt.Sprintf("5120%s", key))
}
tapKeys, err := s.repoManager.Vtxos().
GetVtxoPubKeysByCommitmentTxids(ctx, commitmentTxIds, 0)
if err != nil {
return err
}

if milestone := (i + 1) * 100 / total / 10; milestone > lastMilestone {
lastMilestone = milestone
log.Debugf("restore watching vtxos: %d%%...", milestone*10)
scripts := make([]string, 0, len(tapKeys))
for _, key := range tapKeys {
// Skip values that are not a 32-byte x-only pubkey encoded as 64
// hex chars. arkd writes valid keys, but defending against a
// corrupted DB row here means a single bad pubkey cannot poison
// the entire WatchScripts gRPC payload at startup recovery.
decoded, err := hex.DecodeString(key)
if err != nil || len(decoded) != 32 {
continue
}
scripts = append(scripts, fmt.Sprintf("5120%s", key))
}

if len(scripts) <= 0 {
if len(scripts) == 0 {
return nil
}

if err := s.scanner.WatchScripts(ctx, scripts); err != nil {
return err
}

log.Debugf("restored watching %d vtxo scripts", len(scripts))
log.Debugf(
"restored watching %d vtxo scripts from %d sweepable rounds",
len(scripts), len(commitmentTxIds),
)
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions internal/core/domain/vtxo_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type VtxoRepository interface {
) (
[]string, error,
)
GetVtxoPubKeysByCommitmentTxids(
ctx context.Context, commitmentTxids []string, withMinimumAmount uint64,
) (
[]string, error,
)
GetPendingSpentVtxosWithPubKeys(
ctx context.Context,
pubkeys []string,
Expand Down
85 changes: 82 additions & 3 deletions internal/infrastructure/db/badger/vtxo_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,17 @@ func (r *vtxoRepository) GetVtxoPubKeysByCommitmentTxid(
return nil, err
}

// Combine and deduplicate by pubkey
// Combine and deduplicate by pubkey. The amount comparison must be >= to
// match the WHERE v.amount >= $1 contract used by the sqlite and postgres
// backends; a VTXO with Amount equal to the filter is included.
pubkeyMap := make(map[string]bool)
for _, vtxo := range vtxos1 {
if vtxo.Amount > amountFilter {
if vtxo.Amount >= amountFilter {
pubkeyMap[vtxo.PubKey] = true
}
}
for _, vtxo := range vtxos2 {
if vtxo.Amount > amountFilter {
if vtxo.Amount >= amountFilter {
pubkeyMap[vtxo.PubKey] = true
}
}
Expand All @@ -339,6 +341,83 @@ func (r *vtxoRepository) GetVtxoPubKeysByCommitmentTxid(
return taprootKeys, nil
}

// GetVtxoPubKeysByCommitmentTxids is the bulk variant of
// GetVtxoPubKeysByCommitmentTxid. It returns the deduplicated set of vtxo
// pubkeys whose root commitment_txid is in the given list, or whose
// CommitmentTxids slice intersects the given list. badgerhold has no native
// "slice intersects set" operator, so the second scan uses a MatchFunc that
// walks the in-memory slice; the SQL backends accomplish the same with a
// JOIN against vtxo_commitment_txid in a single query.
func (r *vtxoRepository) GetVtxoPubKeysByCommitmentTxids(
ctx context.Context, commitmentTxids []string, amountFilter uint64,
) ([]string, error) {
if len(commitmentTxids) == 0 {
return nil, nil
}

idxIfaces := make([]interface{}, len(commitmentTxids))
for i, txid := range commitmentTxids {
idxIfaces[i] = txid
}

// Two scans of the vtxo store: one for vtxos whose RootCommitmentTxid is in
// the set, one for vtxos whose CommitmentTxids slice intersects the set.
// badgerhold has no Contains-In, so we fall back to a single scan with a
// matcher function for the CommitmentTxids case.
query1 := badgerhold.Where("RootCommitmentTxid").
In(idxIfaces...).
And("Amount").
Ge(amountFilter)
vtxos1, err := r.findVtxos(ctx, query1)
if err != nil {
return nil, err
}

wanted := make(map[string]struct{}, len(commitmentTxids))
for _, t := range commitmentTxids {
wanted[t] = struct{}{}
}
query2 := badgerhold.Where("CommitmentTxids").
MatchFunc(func(ra *badgerhold.RecordAccess) (bool, error) {
txids, ok := ra.Field().([]string)
if !ok {
return false, nil
}
for _, t := range txids {
if _, hit := wanted[t]; hit {
return true, nil
}
}
return false, nil
}).
And("Amount").
Ge(amountFilter)
vtxos2, err := r.findVtxos(ctx, query2)
if err != nil {
return nil, err
}

// Amount comparison is >= to match the sqlite/postgres
// WHERE v.amount >= $1 contract; including amount == amountFilter.
pubkeyMap := make(map[string]struct{})
for _, vtxo := range vtxos1 {
if vtxo.Amount >= amountFilter {
pubkeyMap[vtxo.PubKey] = struct{}{}
}
}
for _, vtxo := range vtxos2 {
if vtxo.Amount >= amountFilter {
pubkeyMap[vtxo.PubKey] = struct{}{}
}
}

taprootKeys := make([]string, 0, len(pubkeyMap))
for pubkey := range pubkeyMap {
taprootKeys = append(taprootKeys, pubkey)
}
return taprootKeys, nil
}

func (r *vtxoRepository) GetPendingSpentVtxosWithPubKeys(
ctx context.Context, pubkeys []string, after, before int64,
) ([]domain.Vtxo, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS idx_vtxo_commitment_txid_commitment_txid;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS idx_vtxo_commitment_txid_commitment_txid
ON vtxo_commitment_txid (commitment_txid);
49 changes: 48 additions & 1 deletion internal/infrastructure/db/postgres/sqlc/queries/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion internal/infrastructure/db/postgres/sqlc/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,30 @@ SELECT sqlc.embed(offchain_tx_vw) FROM offchain_tx_vw WHERE txid = @txid AND COA
SELECT * FROM scheduled_session ORDER BY updated_at DESC LIMIT 1;

-- name: SelectVtxoPubKeysByCommitmentTxid :many
SELECT DISTINCT v.pubkey
SELECT DISTINCT v.pubkey
FROM vtxo_vw v
WHERE v.amount >= @min_amount
AND (v.commitment_txid = @commitment_txid
OR (',' || COALESCE(v.commitments::text, '') || ',') LIKE '%,' || @commitment_txid || ',%');

-- Bulk variant of SelectVtxoPubKeysByCommitmentTxid: returns the
-- deduplicated set of vtxo pubkeys for any of the given commitment_txids.
-- Used at startup by restoreWatchingVtxos to collapse what was an N+1
-- per-round loop into a single SQL call. The named parameter is reused
-- in both IN/ANY clauses; postgres binds it once.
-- name: SelectVtxoPubKeysByCommitmentTxids :many
SELECT DISTINCT v.pubkey
FROM vtxo v
WHERE v.amount >= @min_amount
AND (
v.commitment_txid = ANY(@commitment_txids::text[])
OR EXISTS (
SELECT 1 FROM vtxo_commitment_txid vc
WHERE vc.vtxo_txid = v.txid AND vc.vtxo_vout = v.vout
AND vc.commitment_txid = ANY(@commitment_txids::text[])
)
);

-- name: SelectSweepableVtxoOutpointsByCommitmentTxid :many
SELECT DISTINCT v.txid AS vtxo_txid, v.vout AS vtxo_vout
FROM vtxo_vw v
Expand Down
25 changes: 25 additions & 0 deletions internal/infrastructure/db/postgres/vtxo_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,31 @@ func (v *vtxoRepository) GetVtxoPubKeysByCommitmentTxid(
return taprootKeys, nil
}

// GetVtxoPubKeysByCommitmentTxids is the bulk variant of
// GetVtxoPubKeysByCommitmentTxid. It returns the deduplicated set of vtxo
// pubkeys whose root commitment_txid is in the given list, or whose
// vtxo_commitment_txid join row references one of those commitment txids.
// This replaces a per-round loop in restoreWatchingVtxos / stopWatchingVtxos
// that previously fired one query per sweepable round (the N+1 pattern).
func (v *vtxoRepository) GetVtxoPubKeysByCommitmentTxids(
ctx context.Context, commitmentTxids []string, withMinimumAmount uint64,
) ([]string, error) {
if len(commitmentTxids) == 0 {
return nil, nil
}

taprootKeys, err := v.querier.SelectVtxoPubKeysByCommitmentTxids(ctx,
queries.SelectVtxoPubKeysByCommitmentTxidsParams{
MinAmount: int64(withMinimumAmount),
CommitmentTxids: commitmentTxids,
})
if err != nil {
return nil, err
}

return taprootKeys, nil
}

func (v *vtxoRepository) GetPendingSpentVtxosWithPubKeys(
ctx context.Context, pubkeys []string, after, before int64,
) ([]domain.Vtxo, error) {
Expand Down
Loading
Loading