diff --git a/docs/svs-v3-revision.md b/docs/svs-v3-revision.md new file mode 100644 index 00000000..f32d480f --- /dev/null +++ b/docs/svs-v3-revision.md @@ -0,0 +1,377 @@ +# State Vector Sync (SVS) v3: Revision Specification + +This document **revises** SVS v3 for large synchronization groups. It is **not** a new protocol. Existing SVS v3 semantics apply whenever the complete State Vector fits within the configured size threshold. + +--- + +## 1. Basic Protocol Design + +### 1.1 Small groups + +For most deployments, the complete State Vector fits in one Sync packet. Nodes exchange **full** State Vectors using existing SVS v3 logic (suppression, steady state, merge, `OnUpdate`). + +### 1.2 Large groups + +When the encoded State Vector exceeds **`SyncVectorThreshold`** (configurable application limit), nodes use three dissemination modes: + +| Mode | When | On the wire | +|------|------|-------------| +| **Inline FULL** | Encoded FULL fits in threshold | `mhash` + `VectorType=FULL` + complete `StateVector` in Sync Data | +| **Inline PARTIAL** | **New publication** and FULL exceeds threshold | `mhash` + `VectorType=PARTIAL` + subset `StateVector` in Sync Data | +| **Announce + pull** | **Periodic sync** (large group), or **`mhash` mismatch** | Produce full vector Data at `32=sv/`; Sync Data carries `mhash` + reference Name only | + +**MemberSetHash (`mhash`)** is always carried inside `SvsData`. It is a **membership hash**, not a full-vector hash. + +**Full state recovery** (fetch complete State Vector from a remote sync member) uses **announce + pull** when: + +1. **`mhash` differs** from the local membership hash, or +2. **Periodic sync** runs while the local FULL encoding exceeds `SyncVectorThreshold`, or +3. An inline **`VectorType = FULL`** State Vector is outdated per Section 6.2. + +Link-level **fragmentation** (NDNLPv2) is not a concern for implementers. Publishers use **existing ndnd object segmentation** APIs when retrievable full-vector Data is large. + +--- + +## 2. Format and Naming + +### 2.1 Sync Interest + +**Sync Interest Name:** + +``` +//v=3 +``` + +Implementations MAY append additional name components after `v=3`. The Interest **nonce** is carried in Interest packet fields, **not** as a name component. + +- Signed **Sync Data** is carried in `ApplicationParameters`. +- Interest Lifetime SHOULD be 1 second. +- Sync Interests are **not** acknowledged (unchanged). + +### 2.2 Sync Data (in ApplicationParameters) + +**Sync Data Name** (signing identity for the Sync message): + +``` +//// +``` + +- **`version`:** microsecond timestamp (default). A hash suffix component is deferred. + +**Sync Data Content:** encoded `SvsData` (Section 3) — either **inline** or **announce-only** form. + +### 2.3 Application publication Data + +``` +////seq= +``` + +Application-level naming may vary. **Sync vector Data MUST NOT share the application publication namespace.** The `32=sv` keyword (Section 2.4) separates sync state from application Data. + +### 2.4 Published full State Vector Data + +Retrievable full State Vector objects use a dedicated sync namespace: + +**Name:** + +``` +////32=sv/ +``` + +**Content:** signed `SvsData` in **inline FULL** form: `mhash` + `VectorType = FULL` + complete `StateVector`. + +**Announce + pull procedure** (periodic sync, `mhash` recovery, join when FULL exceeds threshold): + +1. **Produce** the complete full-vector Data at `////32=sv/` (use ndnd segmentation when large). +2. **Send** a Sync Interest whose AppParam Sync Data contains **announce-only** `SvsData`: `mhash` + `SvsDataRef` pointing at that published name (Section 3.1). +3. Receivers **pull** the referenced Data, validate, and merge. + +Do **not** send an inline PARTIAL vector alongside an announce-only Sync message. + +--- + +## 3. Packet Specification + +### 3.1 `SvsData` + +`SvsData` has two forms. **`mhash` is always present.** It is not a separate protocol message. + +#### 3.1.1 Inline form (FULL or PARTIAL) + +Used when the State Vector (or a publication-time PARTIAL subset) is carried inline in Sync Data, or in published full-vector Data at `32=sv/`. + +``` +SvsData = SVS-DATA-TYPE TLV-LENGTH + MemberSetHash + VectorType + StateVector +``` + +| Field | TLV type | Value | +|-------|----------|-------| +| `MemberSetHash` | `0xCB` | 32-byte SHA-256 digest (`mhash`) | +| `VectorType` | `0xCD` | `0` = FULL, `1` = PARTIAL | +| `StateVector` | `0xC9` | See Section 3.2 | + +#### 3.1.2 Announce-only form + +Used when Sync Data only advertises a retrievable full-vector Data name (periodic sync, `mhash` recovery). **No `VectorType` or `StateVector`.** + +``` +SvsData = SVS-DATA-TYPE TLV-LENGTH + MemberSetHash + SvsDataRef +``` + +| Field | TLV type | Value | +|-------|----------|-------| +| `MemberSetHash` | `0xCB` | 32-byte SHA-256 digest (`mhash`) | +| `SvsDataRef` | `0x07` (Name) | Name of published full-vector Data: `////32=sv/` | + +> **Note:** The inline layout extends ndnd v3 `SvsData` with `MemberSetHash` and `VectorType` before `StateVector`, matching the Python strawman (`mhash` at `0xCB`, vector at `0xC9`/`0xCA`). + +### 3.2 `StateVector` + +``` +StateVector = STATE-VECTOR-TYPE TLV-LENGTH + *StateVectorEntry + +StateVectorEntry = STATE-VECTOR-ENTRY-TYPE TLV-LENGTH + Name + *SeqNoEntry + +SeqNoEntry = SEQ-NO-ENTRY-TYPE TLV-LENGTH + BootstrapTime + SeqNo +``` + +| TLV | Type (decimal) | Type (hex) | +|-----|----------------|------------| +| `STATE-VECTOR-TYPE` | 201 | `0xC9` | +| `STATE-VECTOR-ENTRY-TYPE` | 202 | `0xCA` | +| `SEQ-NO-ENTRY-TYPE` | 210 | `0xD2` | +| `BOOTSTRAP-TIME-TYPE` | 212 | `0xD4` | +| `SEQ-NO-TYPE` | 214 | `0xD6` | + +**Rules (unchanged from SVS v3):** + +- Sequence numbers are 1-indexed. +- Bootstrap time is seconds since Unix epoch. +- If an entry is absent, its sequence number is treated as 0 for comparison. +- If any received `BootstrapTime` is more than 86400s in the future, the entire `StateVector` SHOULD be ignored. + +### 3.3 `MemberSetHash` (`mhash`) + +**`mhash` is a membership hash.** It is **not** a hash of the full State Vector and **not** a hash of sequence numbers. + +**Membership** is the set of participants, each identified by: + +``` +(Producer Name, Bootstrap Time) +``` + +**Computation:** + +``` +members = { (Name, BootstrapTime) | node knows this member in the sync group } +sort by NDN canonical order of Name, then by BootstrapTime ascending +mhash = SHA-256( concatenation of canonical TLV bytes of each (Name, BootstrapTime) pair ) +``` + +Recompute `mhash` whenever membership changes (member added, removed, or new bootstrap time for a name). + +> **Note:** The Python strawman hashes sorted producer **names only**. This revision includes **Bootstrap Time** in each membership tuple, consistent with SVS v3 identity. + +**Membership data and State Vector data are separate concepts.** Membership is carried implicitly in the full State Vector. `mhash` summarizes membership for quick comparison. + +### 3.4 `VectorType` (inline form only) + +| Value | Name | Meaning | +|-------|------|---------| +| `0` | **FULL** | `StateVector` contains the complete advertised state (Section 4.1 ordering). | +| `1` | **PARTIAL** | `StateVector` contains a subset (Section 4.2). Used for **new publication** only when FULL exceeds threshold. | + +`mhash` is present in both inline and announce-only `SvsData` messages. + +--- + +## 4. State Vector Encoding + +### 4.1 FULL State Vector + +- Include all known members and their latest sequence numbers per bootstrap. +- Entries ordered in **NDN canonical order** of `Name` (unchanged SVS v3 rule). +- Set `VectorType = FULL`. + +### 4.2 PARTIAL State Vector + +Used **only on new publication** when `encoded_size(inline FULL SvsData) > SyncVectorThreshold`. + +- Set `VectorType = PARTIAL`. +- **Entry `[0]`** MUST be the **sender's** own `StateVectorEntry`. +- **Entries `[1…n]`** MUST be in **NDN canonical order** among included peers. + +An **implementation** MAY use the following selection priority: + +| Priority | Include | +|----------|---------| +| 1 | Sender (always) | +| 2 | Repair targets | +| 3 | Propagation targets | +| 4 | Random inactive producers | +| 5 | Others by recency | + +Stop adding entries when estimated inline `SvsData` size approaches `SyncVectorThreshold`. + +### 4.3 `SyncVectorThreshold` + +- Configurable implementation parameter (application packet size budget). +- Default value is implementation-defined. +- When `encoded_size(FULL) ≤ SyncVectorThreshold`, nodes use **inline FULL** only (existing SVS v3 behavior). + +--- + +## 5. State Sync + +Sections 5.1–5.4 unchanged in spirit from [SVS v3 Section 4](https://named-data.github.io/StateVectorSync/Specification.html). This revision adds Sections 5.5–5.9. + +### 5.1 Sync Interest timer + +- `PeriodicTimeout` default 30s (±10% jitter). +- `SuppressionPeriod` default 200ms. +- `SuppressionTimeout` exponential decay (unchanged formula). + +### 5.2 Send Sync Interest on new publication + +When the node generates a new publication, it immediately emits a Sync Interest and resets the timer to `PeriodicTimeout`. + +| Condition | Action | +|-----------|--------| +| `encoded_size(inline FULL) ≤ SyncVectorThreshold` | Send **inline FULL** (`mhash` + `VectorType=FULL` + `StateVector`) | +| `encoded_size(inline FULL) > SyncVectorThreshold` | Send **inline PARTIAL** (`mhash` + `VectorType=PARTIAL` + subset `StateVector`) | + +### 5.3 Sync Ack policy + +Do not acknowledge Sync Interests. + +### 5.4 Steady state and suppression (unchanged for inline FULL) + +For incoming Sync Data with inline `VectorType = FULL`, apply existing SVS v3 steady-state and suppression rules. + +### 5.5 PARTIAL State Vector processing + +When `VectorType = PARTIAL`: + +1. Parse `mhash` and `StateVector`. +2. **Do not** treat names **omitted** from the partial `StateVector` as producer removal, outdated sender (by omission alone), or sequence rollback. +3. For each **present** entry, merge newer sequence numbers into local state (unchanged merge rule). +4. If `mhash` differs from local `mhash`, perform **announce + pull** recovery (Section 5.6). + +This is the primary **receive-side change** in ndnd (`svs.go`). + +### 5.6 Full state recovery (announce + pull) + +**Triggers:** + +| # | Condition | Action | +|---|-----------|--------| +| 1 | `mhash` in received `SvsData` ≠ locally computed `mhash` | Announce + pull | +| 2 | Inline `VectorType = FULL` is outdated per Section 6.2 | Merge inline if complete; otherwise announce + pull | +| 3 | Periodic sync while local FULL exceeds `SyncVectorThreshold` | Sender: announce + pull (Section 5.8) | + +**There is no separate membership-only retrieval.** Recovery always fetches the **complete State Vector** from the referenced `32=sv/` Data. + +**Procedure (sender on `mhash` mismatch or periodic large-group sync):** + +1. Produce full-vector Data at `////32=sv/` with inline FULL `SvsData`. +2. Send Sync Interest with announce-only `SvsData` (`mhash` + `SvsDataRef`). + +**Procedure (receiver):** + +1. Identify sender from Sync Data signature and/or PARTIAL entry `[0]`. +2. If Sync Data is **inline FULL** and complete: merge directly. +3. If Sync Data is **announce-only**: read `SvsDataRef`; express Interest for that name; validate; merge; update local `mhash`. +4. Continue application data fetch via SvsALO (`OnUpdate`) as today. + +Use **ndnd segmentation** when fetched Data content is large. + +### 5.7 New node join + +1. Joining node **N** multicasts Sync Interest with **only itself**: `(Name=N, SeqNo=0)` and its current `mhash`. +2. Existing members receive the announcement. +3. **Suppression** limits duplicate responses; typically one member **A** provides recovery state. +4. If FULL fits inline: **A** responds with inline `VectorType = FULL`. +5. If FULL exceeds `SyncVectorThreshold`: **A** uses **announce + pull** (produce at `32=sv/`, then announce-only Sync Data). +6. Normal synchronization proceeds through SvsALO. + +### 5.8 Periodic sync in large groups + +| Local FULL size | Periodic Sync behavior | +|-----------------|------------------------| +| `≤ SyncVectorThreshold` | **Inline FULL** (existing SVS v3) | +| `> SyncVectorThreshold` | **Always announce + pull** (produce full-vector Data, then announce-only Sync Data) | + +Periodic sync does **not** send inline PARTIAL vectors. + +### 5.9 Summary of sync triggers + +| Event | `size ≤ threshold` | `size > threshold` | +|-------|--------------------|--------------------| +| **New publication** | Inline FULL | Inline PARTIAL | +| **Periodic sync** | Inline FULL | Announce + pull | +| **`mhash` mismatch** | Announce + pull (if recovery needed) | Announce + pull | + +--- + +## 6. Comparing and Merging State Vectors + +### 6.1 Merge rule + +For each matching `(Name, BootstrapTime)`, retain the maximum `SeqNo`. + +### 6.2 Outdated vector (inline FULL only) + +State Vector `A` is outdated to `B` if: + +- `A` is missing a name present in `B`, or +- `A` has a strictly smaller `SeqNo` for any entry. + +For `VectorType = PARTIAL`, the missing-name rule **does not** apply to names omitted from the partial message. + +--- + +## 7. Examples + +### 7.1 Small group + +Three nodes `A`, `B`, `C`. Full State Vector fits. `A` publishes; sends inline FULL Sync Interest `[A:11, B:15, C:25]`. Peers merge. + +### 7.2 Large group + +Group exceeds `SyncVectorThreshold`. Producer `P` publishes: + +- `P` sends inline PARTIAL `SvsData { mhash, VectorType=PARTIAL, StateVector=[P:…, A:…, …] }`. +- Receiver merges present entries only. +- If `mhash` differs, `P` (or receiver per policy) triggers announce + pull (Section 5.6). + +### 7.3 Large group + +- `A` produces full vector at `/group/A/boot/32=sv/`. +- `A` sends announce-only Sync Data `{ mhash, SvsDataRef=/group/A/boot/32=sv/ }`. +- Peers pull and merge. + +### 7.4 New node join + +- `N` sends self-only vector `[N:0]` with `mhash`. +- `A` responds with inline FULL or announce + pull. +- `N` merges and synchronizes via SvsALO. + +--- + +## 8. Open Items + +1. **Mixed-version interoperability** — how revised nodes coexist with plain SVS v3 peers in the same sync group, if at all. + +--- + + diff --git a/std/examples/svs/large-sync/main.go b/std/examples/svs/large-sync/main.go new file mode 100644 index 00000000..3437e04c --- /dev/null +++ b/std/examples/svs/large-sync/main.go @@ -0,0 +1,89 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + enc "github.com/named-data/ndnd/std/encoding" + "github.com/named-data/ndnd/std/engine" + "github.com/named-data/ndnd/std/log" + "github.com/named-data/ndnd/std/ndn" + "github.com/named-data/ndnd/std/object" + "github.com/named-data/ndnd/std/object/storage" + "github.com/named-data/ndnd/std/sync" +) + +func main() { + // Before running this example, make sure the strategy is correctly setup + // to multicast for the sync prefix. For example, using the following: + // + // ndnd fw strategy-set prefix=/ndn/svs/32=svs strategy=/localhost/nfd/strategy/multicast + // + + threshold := flag.Int("threshold", 1200, "SyncVectorThreshold in bytes (0 = legacy inline FULL only)") + flag.Parse() + + if flag.NArg() < 1 { + fmt.Fprintf(os.Stderr, "Usage: %s [-threshold N] \n", os.Args[0]) + os.Exit(1) + } + + name, err := enc.NameFromStr(flag.Arg(0)) + if err != nil { + log.Fatal(nil, "Invalid node ID", "name", flag.Arg(0), "err", err) + } + + app := engine.NewBasicEngine(engine.NewDefaultFace()) + err = app.Start() + if err != nil { + log.Fatal(nil, "Unable to start engine", "err", err) + } + defer app.Stop() + + store := storage.NewMemoryStore() + client := object.NewClient(app, store, nil) + err = client.Start() + if err != nil { + log.Fatal(nil, "Unable to start object client", "err", err) + } + defer client.Stop() + + group, _ := enc.NameFromStr("/ndn/svs") + boot := uint64(time.Now().Unix()) + syncDataName := name. + Append(enc.NewTimestampComponent(boot)). + Append(enc.NewKeywordComponent("svs")) + + svsync := sync.NewSvSync(sync.SvSyncOpts{ + Client: client, + GroupPrefix: group, + SyncDataName: syncDataName, + BootTime: boot, + SyncVectorThreshold: *threshold, + OnUpdate: func(ssu sync.SvSyncUpdate) { + log.Info(nil, "Received update", "update", ssu) + }, + }) + + client.AnnouncePrefix(ndn.Announcement{Name: group}) + defer client.WithdrawPrefix(group, nil) + + err = svsync.Start() + if err != nil { + log.Fatal(nil, "Unable to start SvSync", "err", err) + } + defer svsync.Stop() + + log.Info(nil, "Large-group SVS started", + "name", name, + "threshold", *threshold, + "syncData", syncDataName) + + ticker := time.NewTicker(3 * time.Second) + for range ticker.C { + seq := svsync.IncrSeqNo(name) + log.Info(nil, "Published new sequence number", "seq", seq) + } +} diff --git a/std/ndn/svs/v3/definitions.go b/std/ndn/svs/v3/definitions.go index 3f7777bc..a3fcf415 100644 --- a/std/ndn/svs/v3/definitions.go +++ b/std/ndn/svs/v3/definitions.go @@ -3,9 +3,22 @@ package svs import ( enc "github.com/named-data/ndnd/std/encoding" + "github.com/named-data/ndnd/std/types/optional" +) + +// VectorType values for inline SvsData (TLV 0xCD). +const ( + VectorTypeFull uint64 = 0 + VectorTypePartial uint64 = 1 ) type SvsData struct { + //+field:binary:optional + MemberSetHash []byte `tlv:"0xcb"` + //+field:natural:optional + VectorType optional.Optional[uint64] `tlv:"0xcd"` + //+field:name + SvsDataRef enc.Name `tlv:"0x07"` //+field:struct:StateVector StateVector *StateVector `tlv:"0xc9"` } diff --git a/std/ndn/svs/v3/zz_generated.go b/std/ndn/svs/v3/zz_generated.go index 0fb2e7fc..138dbed3 100644 --- a/std/ndn/svs/v3/zz_generated.go +++ b/std/ndn/svs/v3/zz_generated.go @@ -11,6 +11,7 @@ import ( type SvsDataEncoder struct { Length uint + SvsDataRef_length uint StateVector_encoder StateVectorEncoder } @@ -19,11 +20,32 @@ type SvsDataParsingContext struct { } func (encoder *SvsDataEncoder) Init(value *SvsData) { + + if value.SvsDataRef != nil { + encoder.SvsDataRef_length = 0 + for _, c := range value.SvsDataRef { + encoder.SvsDataRef_length += uint(c.EncodingLength()) + } + } if value.StateVector != nil { encoder.StateVector_encoder.Init(value.StateVector) } l := uint(0) + if value.MemberSetHash != nil { + l += 1 + l += uint(enc.TLNum(len(value.MemberSetHash)).EncodingLength()) + l += uint(len(value.MemberSetHash)) + } + if optval, ok := value.VectorType.Get(); ok { + l += 1 + l += uint(1 + enc.Nat(optval).EncodingLength()) + } + if value.SvsDataRef != nil { + l += 1 + l += uint(enc.TLNum(encoder.SvsDataRef_length).EncodingLength()) + l += encoder.SvsDataRef_length + } if value.StateVector != nil { l += 1 l += uint(enc.TLNum(encoder.StateVector_encoder.Length).EncodingLength()) @@ -34,6 +56,7 @@ func (encoder *SvsDataEncoder) Init(value *SvsData) { } func (context *SvsDataParsingContext) Init() { + context.StateVector_context.Init() } @@ -41,6 +64,29 @@ func (encoder *SvsDataEncoder) EncodeInto(value *SvsData, buf []byte) { pos := uint(0) + if value.MemberSetHash != nil { + buf[pos] = byte(203) + pos += 1 + pos += uint(enc.TLNum(len(value.MemberSetHash)).EncodeInto(buf[pos:])) + copy(buf[pos:], value.MemberSetHash) + pos += uint(len(value.MemberSetHash)) + } + if optval, ok := value.VectorType.Get(); ok { + buf[pos] = byte(205) + pos += 1 + + buf[pos] = byte(enc.Nat(optval).EncodeInto(buf[pos+1:])) + pos += uint(1 + buf[pos]) + + } + if value.SvsDataRef != nil { + buf[pos] = byte(7) + pos += 1 + pos += uint(enc.TLNum(encoder.SvsDataRef_length).EncodeInto(buf[pos:])) + for _, c := range value.SvsDataRef { + pos += uint(c.EncodeInto(buf[pos:])) + } + } if value.StateVector != nil { buf[pos] = byte(201) pos += 1 @@ -64,6 +110,9 @@ func (encoder *SvsDataEncoder) Encode(value *SvsData) enc.Wire { func (context *SvsDataParsingContext) Parse(reader enc.WireView, ignoreCritical bool) (*SvsData, error) { + var handled_MemberSetHash bool = false + var handled_VectorType bool = false + var handled_SvsDataRef bool = false var handled_StateVector bool = false progress := -1 @@ -91,6 +140,43 @@ func (context *SvsDataParsingContext) Parse(reader enc.WireView, ignoreCritical err = nil if handled := false; true { switch typ { + case 203: + if true { + handled = true + handled_MemberSetHash = true + value.MemberSetHash = make([]byte, l) + _, err = reader.ReadFull(value.MemberSetHash) + } + case 205: + if true { + handled = true + handled_VectorType = true + { + optval := uint64(0) + optval = uint64(0) + { + for i := 0; i < int(l); i++ { + x := byte(0) + x, err = reader.ReadByte() + if err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + break + } + optval = uint64(optval<<8) | uint64(x) + } + } + value.VectorType.Set(optval) + } + } + case 7: + if true { + handled = true + handled_SvsDataRef = true + delegate := reader.Delegate(int(l)) + value.SvsDataRef, err = delegate.ReadName() + } case 201: if true { handled = true @@ -115,6 +201,15 @@ func (context *SvsDataParsingContext) Parse(reader enc.WireView, ignoreCritical startPos = reader.Pos() err = nil + if !handled_MemberSetHash && err == nil { + value.MemberSetHash = nil + } + if !handled_VectorType && err == nil { + value.VectorType.Unset() + } + if !handled_SvsDataRef && err == nil { + value.SvsDataRef = nil + } if !handled_StateVector && err == nil { value.StateVector = nil } diff --git a/std/sync/svs.go b/std/sync/svs.go index 60e71015..3dc7caf5 100644 --- a/std/sync/svs.go +++ b/std/sync/svs.go @@ -40,6 +40,9 @@ type SvSync struct { // Channel for incoming state vectors recvSv chan svSyncRecvSvArgs + // Prefix for published full State Vector Data (.../32=sv). + fullVectorPrefix enc.Name + // cancellation for face hook faceCancel func() } @@ -73,6 +76,10 @@ type SvSyncOpts struct { UseSignatureTime optional.Optional[bool] // IgnoreValidity ignores validity period in the validation chain IgnoreValidity optional.Optional[bool] + + // SyncVectorThreshold is the max inline SvsData size (bytes). + // 0 = legacy mode (StateVector-only wire, no mhash or announce+pull recovery). + SyncVectorThreshold int } type SvSyncUpdate struct { @@ -83,8 +90,11 @@ type SvSyncUpdate struct { } type svSyncRecvSvArgs struct { - sv *spec_svs.StateVector - data enc.Wire + sv *spec_svs.StateVector + data enc.Wire + vectorType optional.Optional[uint64] + mhash []byte + svsDataRef enc.Name } // NewSvSync creates a new SV Sync instance. @@ -145,6 +155,8 @@ func NewSvSync(opts SvSyncOpts) *SvSync { recvSv: make(chan svSyncRecvSvArgs, 128), + fullVectorPrefix: deriveFullVectorPrefix(opts.SyncDataName), + faceCancel: func() {}, } } @@ -180,7 +192,7 @@ func (s *SvSync) main() { // Notify everyone when we are back online s.faceCancel = s.o.Client.Engine().Face().OnUp(func() { - time.AfterFunc(100*time.Millisecond, s.sendSyncInterest) + time.AfterFunc(100*time.Millisecond, func() { s.sendSyncInterest(syncSendOther) }) }) defer s.faceCancel() @@ -190,7 +202,7 @@ func (s *SvSync) main() { go s.loadPassiveWires() } else { // Send the initial Sync Interest - go s.sendSyncInterest() + go s.sendSyncInterest(syncSendOther) } for { @@ -242,7 +254,7 @@ func (s *SvSync) SetSeqNo(name enc.Name, seqNo uint64) error { // [Spec] When the node generates a new publication, // immediately emit a Sync Interest s.state.Set(hash, s.o.BootTime, seqNo) - go s.sendSyncInterest() + go s.sendSyncInterest(syncSendPublication, name) return nil } @@ -264,7 +276,7 @@ func (s *SvSync) IncrSeqNo(name enc.Name) uint64 { // [Spec] When the node generates a new publication, // immediately emit a Sync Interest - go s.sendSyncInterest() + go s.sendSyncInterest(syncSendPublication, name) return entry } @@ -372,7 +384,14 @@ func (s *SvSync) onReceiveStateVector(args svSyncRecvSvArgs) { // The above checks each node in the incoming state vector, but // does not check if a node is missing from the incoming state vector. - if !isOutdated && s.state.IsNewerThan(recvSv, func(_, _ uint64) bool { return false }) { + isPartial := false + if vt, ok := args.vectorType.Get(); ok && vt == spec_svs.VectorTypePartial { + isPartial = true + } + if len(args.mhash) > 0 { + s.handleMhashMismatch(args, recvSv) + } + if !isPartial && !isOutdated && s.state.IsNewerThan(recvSv, func(_, _ uint64) bool { return false }) { isOutdated = true canDrop = false } @@ -420,11 +439,11 @@ func (s *SvSync) timerExpired() { // [Spec] On expiration of timer emit a Sync Interest // with the current local state vector. - go s.sendSyncInterest() + go s.sendSyncInterest(syncSendPeriodic) } // (AI GENERATED DESCRIPTION): Sends a sync Interest: if passive mode is enabled, it publishes all buffered state updates without duplicates; otherwise, it encodes the current state vector into a wire and transmits it, provided the sync service is running. -func (s *SvSync) sendSyncInterest() { +func (s *SvSync) sendSyncInterest(reason syncSendReason, pubName ...enc.Name) { if !s.running.Load() { return } @@ -437,8 +456,13 @@ func (s *SvSync) sendSyncInterest() { return } + var sender enc.Name + if reason == syncSendPublication && len(pubName) > 0 { + sender = pubName[0] + } + // Encode and sign the current state vector - wire := s.encodeSyncData() + wire := s.encodeSyncData(reason, sender) s.sendSyncInterestWith(wire) } @@ -466,18 +490,37 @@ func (s *SvSync) sendSyncInterestWith(dataWire enc.Wire) { } // (AI GENERATED DESCRIPTION): Builds a signed Data packet containing the current state vector for SVS v3 synchronization. -func (s *SvSync) encodeSyncData() enc.Wire { - // Critical section - sv := func() *spec_svs.StateVector { - s.mutex.Lock() - defer s.mutex.Unlock() - - // [Spec*] Sending always triggers Steady State - s.enterSteadyState() - - return s.state.Encode(func(s uint64) uint64 { return s }) - }() - svWire := (&spec_svs.SvsData{StateVector: sv}).Encode() +func (s *SvSync) encodeSyncData(reason syncSendReason, sender enc.Name) enc.Wire { + s.mutex.Lock() + s.enterSteadyState() + stateSnap := cloneSvMap(s.state) + mtimeSnap := make(map[string]time.Time, len(s.mtime)) + for k, v := range s.mtime { + mtimeSnap[k] = v + } + repair, propagation := s.partialTargets() + s.mutex.Unlock() + + var svsData *spec_svs.SvsData + if shouldUseAnnouncePull(reason, s.o.SyncVectorThreshold, stateSnap) { + ref, err := s.publishFullVectorData(stateSnap) + if err != nil { + log.Error(s, "publishFullVectorData failed", "err", err) + return nil + } + svsData = buildAnnounceSvsData(stateSnap, ref) + } else { + svsData = buildSvsDataForSend(svsSendInput{ + State: stateSnap, + Reason: reason, + Threshold: s.o.SyncVectorThreshold, + Sender: sender, + Repair: repair, + Propagation: propagation, + Mtime: mtimeSnap, + }) + } + svWire := svsData.Encode() // SVS v3 Sync Data name := s.o.SyncDataName.WithVersion(enc.VersionUnixMicro) @@ -536,18 +579,36 @@ func (s *SvSync) onSyncData(dataWire enc.Wire) { return } - // Decode state vector + // Decode SvsData (inline FULL, PARTIAL, announce-only, or legacy v3). svWire := data.Content().Join() params, err := spec_svs.ParseSvsData(enc.NewBufferView(svWire), false) - if err != nil || params.StateVector == nil { - log.Warn(s, "onSyncInterest failed to parse StateVec", "err", err) + if err != nil { + log.Warn(s, "onSyncInterest failed to parse SvsData", "err", err) + return + } + + // Announce-only: retrievable full vector reference. + if params.StateVector == nil && len(params.SvsDataRef) > 0 { + trustPrefix := pullRefFromSyncDataWire(dataWire) + go s.pullFullVector(params.SvsDataRef, trustPrefix) + return + } + if params.StateVector == nil { + log.Warn(s, "onSyncInterest SvsData has no StateVector") return } - s.recvSv <- svSyncRecvSvArgs{ - sv: params.StateVector, - data: dataWire, + args := svSyncRecvSvArgs{ + sv: params.StateVector, + data: dataWire, + mhash: params.MemberSetHash, + svsDataRef: params.SvsDataRef, } + if vt, ok := params.VectorType.Get(); ok { + args.vectorType = optional.Some(vt) + } + + s.recvSv <- args }, }) } @@ -664,5 +725,16 @@ func (s *SvSync) loadPassiveWires() { } // This is hacky but pragmatic - wait for the state to be processed - time.AfterFunc(500*time.Millisecond, s.sendSyncInterest) + time.AfterFunc(500*time.Millisecond, func() { s.sendSyncInterest(syncSendOther) }) +} + +// partialTargets returns repair and propagation name targets from suppression merge state. +func (s *SvSync) partialTargets() (repair, propagation []enc.Name) { + if !s.suppress { + return nil, nil + } + for name := range s.merge.Iter() { + repair = append(repair, name) + } + return repair, nil } diff --git a/std/sync/svs_encode.go b/std/sync/svs_encode.go new file mode 100644 index 00000000..c3a1f343 --- /dev/null +++ b/std/sync/svs_encode.go @@ -0,0 +1,256 @@ +package sync + +import ( + "cmp" + "math/rand/v2" + "slices" + "time" + + enc "github.com/named-data/ndnd/std/encoding" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" + "github.com/named-data/ndnd/std/types/optional" +) + +// syncSendReason distinguishes why a Sync Interest is being sent. +type syncSendReason int + +const ( + syncSendOther syncSendReason = iota + syncSendPublication + syncSendPeriodic + syncSendRecovery +) + +// PartialEncodeOpts configures subset selection for inline PARTIAL vectors. +type PartialEncodeOpts struct { + Sender enc.Name + Threshold int + Repair []enc.Name + Propagation []enc.Name + Mtime map[string]time.Time +} + +// buildLegacySvsData is the pre-revision SVS v3 wire format used when threshold is 0. +func buildLegacySvsData(state SvMap[uint64]) *spec_svs.SvsData { + sv := state.Encode(func(seq uint64) uint64 { return seq }) + return &spec_svs.SvsData{StateVector: sv} +} + +// buildInlineSvsData constructs inline SvsData (mhash + VectorType + StateVector). +func buildInlineSvsData(state SvMap[uint64], vectorType uint64, sv *spec_svs.StateVector) *spec_svs.SvsData { + return &spec_svs.SvsData{ + MemberSetHash: ComputeMhash(state), + VectorType: optional.Some(vectorType), + StateVector: sv, + } +} + +// inlineSvsDataSize returns the encoded byte length of SvsData content. +func inlineSvsDataSize(data *spec_svs.SvsData) int { + return len(data.Encode().Join()) +} + +// exceedsSyncThreshold reports whether size is over the configured inline budget. +// threshold 0 disables the large-group size limit (legacy mode). +func exceedsSyncThreshold(threshold, size int) bool { + return threshold > 0 && size > threshold +} + +// buildInlineFullFromState builds inline FULL SvsData for the complete local state. +func buildInlineFullFromState(state SvMap[uint64]) *spec_svs.SvsData { + sv := state.Encode(func(seq uint64) uint64 { return seq }) + return buildInlineSvsData(state, spec_svs.VectorTypeFull, sv) +} + +// inlineFullSize returns encoded inline FULL SvsData size for threshold checks. +func inlineFullSize(state SvMap[uint64]) int { + return inlineSvsDataSize(buildInlineFullFromState(state)) +} + +// svsSendInput carries everything needed to build inline Sync Data for send. +type svsSendInput struct { + State SvMap[uint64] + Reason syncSendReason + Threshold int + Sender enc.Name + Repair []enc.Name + Propagation []enc.Name + Mtime map[string]time.Time +} + +// buildSvsDataForSend picks inline FULL or PARTIAL SvsData for an outgoing Sync message. +func buildSvsDataForSend(in svsSendInput) *spec_svs.SvsData { + if in.Threshold <= 0 { + return buildLegacySvsData(in.State) + } + + fullSv := in.State.Encode(func(seq uint64) uint64 { return seq }) + fullData := buildInlineSvsData(in.State, spec_svs.VectorTypeFull, fullSv) + + if in.Reason != syncSendPublication || !exceedsSyncThreshold(in.Threshold, inlineSvsDataSize(fullData)) { + return fullData + } + + partialSv := encodePartialStateVector(in.State, PartialEncodeOpts{ + Sender: in.Sender, + Threshold: in.Threshold, + Repair: in.Repair, + Propagation: in.Propagation, + Mtime: in.Mtime, + }) + return buildInlineSvsData(in.State, spec_svs.VectorTypePartial, partialSv) +} + +// encodePartialStateVector builds a PARTIAL StateVector for new publication. +// Entry [0] is the sender; entries [1..n] are in NDN canonical order. +func encodePartialStateVector(state SvMap[uint64], opts PartialEncodeOpts) *spec_svs.StateVector { + seq := func(v uint64) uint64 { return v } + senderHash := opts.Sender.TlvStr() + + senderEntry := state.encodeNameEntry(opts.Sender, seq) + if senderEntry == nil { + senderEntry = &spec_svs.StateVectorEntry{Name: opts.Sender} + } + + if opts.Threshold <= 0 { + return &spec_svs.StateVector{Entries: []*spec_svs.StateVectorEntry{senderEntry}} + } + + // Sender-only baseline must always fit when possible. + baseline := &spec_svs.StateVector{Entries: []*spec_svs.StateVectorEntry{senderEntry}} + if inlineSvsDataSize(buildInlineSvsData(state, spec_svs.VectorTypePartial, baseline)) > opts.Threshold { + return baseline + } + + candidates := partialCandidateNames(state, senderHash, opts) + included := map[string]bool{senderHash: true} + entries := []*spec_svs.StateVectorEntry{senderEntry} + + for _, name := range candidates { + hash := name.TlvStr() + if included[hash] { + continue + } + entry := state.encodeNameEntry(name, seq) + if entry == nil { + continue + } + + trial := append(slices.Clone(entries), entry) + sortPartialTail(trial) + trialSv := &spec_svs.StateVector{Entries: trial} + if exceedsSyncThreshold(opts.Threshold, inlineSvsDataSize(buildInlineSvsData(state, spec_svs.VectorTypePartial, trialSv))) { + break + } + + entries = trial + included[hash] = true + } + + sortPartialTail(entries) + return &spec_svs.StateVector{Entries: entries} +} + +func partialCandidateNames(state SvMap[uint64], senderHash string, opts PartialEncodeOpts) []enc.Name { + seen := map[string]bool{senderHash: true} + out := make([]enc.Name, 0, len(state)) + + appendUnique := func(names []enc.Name) { + for _, name := range names { + hash := name.TlvStr() + if seen[hash] { + continue + } + if _, ok := state[hash]; !ok { + continue + } + seen[hash] = true + out = append(out, name) + } + } + + appendUnique(opts.Repair) + appendUnique(opts.Propagation) + + inactive := make([]enc.Name, 0) + remaining := make([]enc.Name, 0) + for name, vals := range state.Iter() { + hash := name.TlvStr() + if seen[hash] { + continue + } + if isInactiveProducer(vals) { + inactive = append(inactive, name) + continue + } + remaining = append(remaining, name) + } + + rand.Shuffle(len(inactive), func(i, j int) { + inactive[i], inactive[j] = inactive[j], inactive[i] + }) + appendUnique(inactive) + + slices.SortFunc(remaining, func(a, b enc.Name) int { + return a.Compare(b) + }) + slices.SortFunc(remaining, func(a, b enc.Name) int { + return cmp.Compare(recencyScore(opts.Mtime, b), recencyScore(opts.Mtime, a)) + }) + appendUnique(remaining) + + return out +} + +func isInactiveProducer(vals []SvMapVal[uint64]) bool { + for _, val := range vals { + if val.Value > 0 { + return false + } + } + return true +} + +func recencyScore(mtime map[string]time.Time, name enc.Name) int64 { + if mtime == nil { + return 0 + } + t, ok := mtime[name.TlvStr()] + if !ok { + return 0 + } + return t.UnixNano() +} + +// sortPartialTail keeps entry [0] fixed and sorts [1..n] in canonical name order. +func sortPartialTail(entries []*spec_svs.StateVectorEntry) { + if len(entries) <= 1 { + return + } + slices.SortFunc(entries[1:], func(a, b *spec_svs.StateVectorEntry) int { + return a.Name.Compare(b.Name) + }) +} + +// encodeNameEntry encodes one producer name from the map. +func (m SvMap[V]) encodeNameEntry(name enc.Name, seq func(V) uint64) *spec_svs.StateVectorEntry { + hash := name.TlvStr() + vals, ok := m[hash] + if !ok { + return nil + } + + entry := &spec_svs.StateVectorEntry{ + Name: name, + SeqNoEntries: make([]*spec_svs.SeqNoEntry, 0, len(vals)), + } + for _, val := range vals { + if seqNo := seq(val.Value); seqNo > 0 { + entry.SeqNoEntries = append(entry.SeqNoEntries, &spec_svs.SeqNoEntry{ + BootstrapTime: val.Boot, + SeqNo: seqNo, + }) + } + } + return entry +} diff --git a/std/sync/svs_encode_test.go b/std/sync/svs_encode_test.go new file mode 100644 index 00000000..e3492c56 --- /dev/null +++ b/std/sync/svs_encode_test.go @@ -0,0 +1,198 @@ +package sync + +import ( + "fmt" + "testing" + "time" + + enc "github.com/named-data/ndnd/std/encoding" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" + "github.com/named-data/ndnd/std/types/optional" + tu "github.com/named-data/ndnd/std/utils/testutils" + "github.com/stretchr/testify/require" +) + +func testSvMapAliceBob() SvMap[uint64] { + m := NewSvMap[uint64](0) + m.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 5) + m.Set(tu.NoErr(enc.NameFromStr("/ndn/bob")).TlvStr(), 150, 3) + return m +} + +func TestBuildInlineFullSvsData(t *testing.T) { + tu.SetT(t) + + m := testSvMapAliceBob() + sv := m.Encode(func(s uint64) uint64 { return s }) + data := buildInlineSvsData(m, spec_svs.VectorTypeFull, sv) + + require.Equal(t, ComputeMhash(m), data.MemberSetHash) + vt, ok := data.VectorType.Get() + require.True(t, ok) + require.Equal(t, spec_svs.VectorTypeFull, vt) + require.NotNil(t, data.StateVector) + require.Len(t, data.StateVector.Entries, 2) +} + +func TestExceedsSyncThreshold(t *testing.T) { + tu.SetT(t) + + require.False(t, exceedsSyncThreshold(0, 99999)) + require.False(t, exceedsSyncThreshold(1000, 500)) + require.True(t, exceedsSyncThreshold(1000, 1001)) +} + +func TestOnReceivePartialSkipsMissingNameOutdated(t *testing.T) { + tu.SetT(t) + + s := &SvSync{ + o: SvSyncOpts{ + OnUpdate: func(SvSyncUpdate) {}, + SuppressionPeriod: 200 * time.Millisecond, + PeriodicTimeout: 30 * time.Second, + }, + state: testSvMapAliceBob(), + mtime: make(map[string]time.Time), + ticker: time.NewTicker(30 * time.Second), + suppress: false, + } + + // PARTIAL with only bob; local knows alice — must not enter suppression. + bobOnly := NewSvMap[uint64](0) + bobOnly.Set(tu.NoErr(enc.NameFromStr("/ndn/bob")).TlvStr(), 150, 3) + partialSv := bobOnly.Encode(func(s uint64) uint64 { return s }) + + s.onReceiveStateVector(svSyncRecvSvArgs{ + sv: partialSv, + vectorType: optional.Some(spec_svs.VectorTypePartial), + mhash: ComputeMhash(bobOnly), + }) + + require.False(t, s.suppress) +} + +func TestOnReceiveFullTreatsMissingNameOutdated(t *testing.T) { + tu.SetT(t) + + s := &SvSync{ + o: SvSyncOpts{ + OnUpdate: func(SvSyncUpdate) {}, + SuppressionPeriod: 200 * time.Millisecond, + PeriodicTimeout: 30 * time.Second, + }, + state: testSvMapAliceBob(), + mtime: make(map[string]time.Time), + ticker: time.NewTicker(30 * time.Second), + suppress: false, + } + + bobOnly := NewSvMap[uint64](0) + bobOnly.Set(tu.NoErr(enc.NameFromStr("/ndn/bob")).TlvStr(), 150, 3) + fullSv := bobOnly.Encode(func(s uint64) uint64 { return s }) + + s.onReceiveStateVector(svSyncRecvSvArgs{ + sv: fullSv, + vectorType: optional.Some(spec_svs.VectorTypeFull), + mhash: ComputeMhash(bobOnly), + }) + + require.True(t, s.suppress) +} + +func TestEncodePartialSenderFirst(t *testing.T) { + tu.SetT(t) + + alice := tu.NoErr(enc.NameFromStr("/ndn/alice")) + bob := tu.NoErr(enc.NameFromStr("/ndn/bob")) + carol := tu.NoErr(enc.NameFromStr("/ndn/carol")) + + m := NewSvMap[uint64](0) + m.Set(alice.TlvStr(), 100, 5) + m.Set(bob.TlvStr(), 150, 3) + m.Set(carol.TlvStr(), 150, 7) + + // Threshold large enough for sender + one peer. + full := buildInlineSvsData(m, spec_svs.VectorTypeFull, m.Encode(func(s uint64) uint64 { return s })) + threshold := inlineSvsDataSize(full) - 1 + + partial := encodePartialStateVector(m, PartialEncodeOpts{ + Sender: carol, + Threshold: threshold, + Mtime: map[string]time.Time{ + alice.TlvStr(): time.Unix(10, 0), + bob.TlvStr(): time.Unix(20, 0), + }, + }) + + require.NotEmpty(t, partial.Entries) + require.Equal(t, carol, partial.Entries[0].Name) + if len(partial.Entries) > 2 { + require.Less(t, partial.Entries[1].Name.Compare(partial.Entries[2].Name), 0) + } +} + +func TestBuildSvsDataForSendPublicationPartial(t *testing.T) { + tu.SetT(t) + + alice := tu.NoErr(enc.NameFromStr("/ndn/alice")) + m := NewSvMap[uint64](0) + m.Set(alice.TlvStr(), 100, 5) + for i := range 20 { + name := tu.NoErr(enc.NameFromStr(fmt.Sprintf("/ndn/peer%d", i))) + m.Set(name.TlvStr(), 150, uint64(i+1)) + } + + full := buildInlineSvsData(m, spec_svs.VectorTypeFull, m.Encode(func(s uint64) uint64 { return s })) + threshold := inlineSvsDataSize(full) / 2 + + pub := buildSvsDataForSend(svsSendInput{ + State: m, Reason: syncSendPublication, Threshold: threshold, Sender: alice, + }) + vt, ok := pub.VectorType.Get() + require.True(t, ok) + require.Equal(t, spec_svs.VectorTypePartial, vt) + require.Less(t, len(pub.StateVector.Entries), len(full.StateVector.Entries)) + + periodic := buildSvsDataForSend(svsSendInput{ + State: m, Reason: syncSendPeriodic, Threshold: threshold, Sender: alice, + }) + vt, ok = periodic.VectorType.Get() + require.True(t, ok) + require.Equal(t, spec_svs.VectorTypeFull, vt) +} + +func TestOnReceivePartialMergesPresentEntriesOnly(t *testing.T) { + tu.SetT(t) + + var updates []SvSyncUpdate + s := &SvSync{ + o: SvSyncOpts{ + OnUpdate: func(u SvSyncUpdate) { updates = append(updates, u) }, + PeriodicTimeout: 30 * time.Second, + }, + state: NewSvMap[uint64](0), + mtime: make(map[string]time.Time), + ticker: time.NewTicker(30 * time.Second), + } + + alice := tu.NoErr(enc.NameFromStr("/ndn/alice")) + bob := tu.NoErr(enc.NameFromStr("/ndn/bob")) + s.state.Set(alice.TlvStr(), 100, 1) + s.state.Set(bob.TlvStr(), 150, 1) + + bobOnly := NewSvMap[uint64](0) + bobOnly.Set(bob.TlvStr(), 150, 4) + partialSv := bobOnly.Encode(func(s uint64) uint64 { return s }) + + s.onReceiveStateVector(svSyncRecvSvArgs{ + sv: partialSv, + vectorType: optional.Some(spec_svs.VectorTypePartial), + mhash: ComputeMhash(bobOnly), + }) + + require.Len(t, updates, 1) + require.Equal(t, bob, updates[0].Name) + require.EqualValues(t, 4, updates[0].High) + require.EqualValues(t, 1, s.state.Get(alice.TlvStr(), 100)) + require.EqualValues(t, 4, s.state.Get(bob.TlvStr(), 150)) +} diff --git a/std/sync/svs_map.go b/std/sync/svs_map.go index be1ffd0c..daf5534b 100644 --- a/std/sync/svs_map.go +++ b/std/sync/svs_map.go @@ -29,6 +29,15 @@ func NewSvMap[V any](size int) SvMap[V] { return make(SvMap[V], size) } +// cloneSvMap returns a shallow copy safe for use without holding SvSync.mutex. +func cloneSvMap[V any](m SvMap[V]) SvMap[V] { + out := NewSvMap[V](len(m)) + for hash, vals := range m { + out[hash] = slices.Clone(vals) + } + return out +} + // Get seq entry for a bootstrap time. func (m SvMap[V]) Get(hash string, boot uint64) (value V) { entry := SvMapVal[V]{boot, value} diff --git a/std/sync/svs_mesh_test.go b/std/sync/svs_mesh_test.go new file mode 100644 index 00000000..9d44463d --- /dev/null +++ b/std/sync/svs_mesh_test.go @@ -0,0 +1,261 @@ +package sync + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + enc "github.com/named-data/ndnd/std/encoding" + basic_engine "github.com/named-data/ndnd/std/engine/basic" + "github.com/named-data/ndnd/std/ndn" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" + "github.com/named-data/ndnd/std/object" + "github.com/named-data/ndnd/std/object/storage" + tu "github.com/named-data/ndnd/std/utils/testutils" + "github.com/stretchr/testify/require" +) + +// In-process multi-node SVS mesh for integration tests (svs_test.go). +// +// Each node runs a real ndnd engine + SvSync instance. A test-only multicast hub +// delivers packets between nodes so Sync Interests propagate without NFD/mininet. +// This exercises small-group sync, PARTIAL publication, and announce+pull recovery. + +const testMeshBoot = uint64(1_700_000_000) + +type meshMulticastHub struct { + mu sync.Mutex + faces []*meshMulticastFace +} + +type meshMulticastFace struct { + hub *meshMulticastHub + id int + running atomic.Bool + local bool + onPkt func(frame []byte) + onError func(err error) + onUp sync.Map + onDown sync.Map + onUpID int + onDownID int +} + +func newMeshMulticastHub() *meshMulticastHub { + return &meshMulticastHub{} +} + +func (h *meshMulticastHub) newFace() *meshMulticastFace { + h.mu.Lock() + defer h.mu.Unlock() + f := &meshMulticastFace{hub: h, id: len(h.faces), local: true} + h.faces = append(h.faces, f) + return f +} + +func (f *meshMulticastFace) String() string { + return fmt.Sprintf("mesh-multicast-face-%d", f.id) +} + +func (f *meshMulticastFace) IsRunning() bool { return f.running.Load() } +func (f *meshMulticastFace) IsLocal() bool { return f.local } +func (f *meshMulticastFace) OnPacket(fn func([]byte)) { f.onPkt = fn } +func (f *meshMulticastFace) OnError(fn func(error)) { f.onError = fn } + +func (f *meshMulticastFace) OnUp(fn func()) (cancel func()) { + id := f.onUpID + f.onUpID++ + f.onUp.Store(id, fn) + return func() { f.onUp.Delete(id) } +} + +func (f *meshMulticastFace) OnDown(fn func()) (cancel func()) { + id := f.onDownID + f.onDownID++ + f.onDown.Store(id, fn) + return func() { f.onDown.Delete(id) } +} + +func (f *meshMulticastFace) Open() error { + if f.onError == nil || f.onPkt == nil { + return fmt.Errorf("face callbacks are not set") + } + if f.running.Load() { + return fmt.Errorf("face is already running") + } + f.running.Store(true) + f.onUp.Range(func(_, cb any) bool { + cb.(func())() + return true + }) + return nil +} + +func (f *meshMulticastFace) Close() error { + if !f.running.Swap(false) { + return fmt.Errorf("face is not running") + } + return nil +} + +func (f *meshMulticastFace) Send(pkt enc.Wire) error { + if !f.running.Load() { + return fmt.Errorf("face is not running") + } + f.hub.broadcast(f.id, pkt.Join()) + return nil +} + +func (h *meshMulticastHub) broadcast(from int, frame []byte) { + h.mu.Lock() + peers := append([]*meshMulticastFace(nil), h.faces...) + h.mu.Unlock() + + for _, peer := range peers { + if peer.id == from || !peer.running.Load() || peer.onPkt == nil { + continue + } + frameCopy := make([]byte, len(frame)) + copy(frameCopy, frame) + peer.onPkt(frameCopy) + } +} + +type testMeshNode struct { + id string + producer enc.Name + svs *SvSync + client ndn.Client + engine *basic_engine.Engine + updates []SvSyncUpdate + updatesMu sync.Mutex +} + +type testMesh struct { + t *testing.T + hub *meshMulticastHub + timer *basic_engine.DummyTimer + group enc.Name + nodes map[string]*testMeshNode +} + +func newTestMesh(t *testing.T, ids []string, configure func(id string, opts *SvSyncOpts)) *testMesh { + t.Helper() + tu.SetT(t) + + mesh := &testMesh{ + t: t, + hub: newMeshMulticastHub(), + timer: basic_engine.NewDummyTimer(), + group: tu.NoErr(enc.NameFromStr("/test/svs-mesh/32=svs")), + nodes: make(map[string]*testMeshNode, len(ids)), + } + + for _, id := range ids { + producer := mesh.group.Append(enc.NewGenericComponent(id)) + syncDataName := producer. + Append(enc.NewTimestampComponent(testMeshBoot)). + Append(enc.NewKeywordComponent("svs")) + + f := mesh.hub.newFace() + engine := basic_engine.NewEngine(f, mesh.timer) + store := storage.NewMemoryStore() + client := object.NewClient(engine, store, nil) + + opts := SvSyncOpts{ + Client: client, + GroupPrefix: mesh.group, + SyncDataName: syncDataName, + BootTime: testMeshBoot, + PeriodicTimeout: 30 * time.Second, + SuppressionPeriod: 50 * time.Millisecond, + } + if configure != nil { + configure(id, &opts) + } + + node := &testMeshNode{ + id: id, + producer: producer, + client: client, + engine: engine, + } + opts.OnUpdate = func(u SvSyncUpdate) { + node.updatesMu.Lock() + node.updates = append(node.updates, u) + node.updatesMu.Unlock() + } + + node.svs = NewSvSync(opts) + require.NoError(t, engine.Start()) + require.NoError(t, client.Start()) + require.NoError(t, node.svs.Start()) + + mesh.nodes[id] = node + } + + mesh.wait(80 * time.Millisecond) + return mesh +} + +func (m *testMesh) node(id string) *testMeshNode { + return m.nodes[id] +} + +func (m *testMesh) wait(d time.Duration) { + time.Sleep(d) +} + +func (m *testMesh) waitUpdates(id string, n int, timeout time.Duration) { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + m.node(id).updatesMu.Lock() + got := len(m.node(id).updates) + m.node(id).updatesMu.Unlock() + if got >= n { + return + } + time.Sleep(10 * time.Millisecond) + } + m.t.Fatalf("node %s: want %d updates, got %d", id, n, len(m.node(id).updates)) +} + +func (m *testMesh) stop() { + for _, node := range m.nodes { + _ = node.svs.Stop() + _ = node.client.Stop() + _ = node.engine.Stop() + } +} + +func (n *testMeshNode) publish() uint64 { + return n.svs.IncrSeqNo(n.producer) +} + +func (n *testMeshNode) memberCount() int { + return len(n.svs.GetNames()) +} + +func seedLargeLocalState(s *SvSync, self enc.Name, peers int) { + for i := range peers { + name := tu.NoErr(enc.NameFromStr(fmt.Sprintf("/test/svs-mesh/peer%d", i))) + s.state.Set(name.TlvStr(), testMeshBoot, uint64(i+1)) + } + s.state.Set(self.TlvStr(), testMeshBoot, 1) +} + +func selfOnlyVector(name enc.Name, boot uint64) *spec_svs.StateVector { + return &spec_svs.StateVector{ + Entries: []*spec_svs.StateVectorEntry{ + { + Name: name, + SeqNoEntries: []*spec_svs.SeqNoEntry{{ + BootstrapTime: boot, + SeqNo: 0, + }}, + }, + }, + } +} diff --git a/std/sync/svs_mhash.go b/std/sync/svs_mhash.go new file mode 100644 index 00000000..510c16cc --- /dev/null +++ b/std/sync/svs_mhash.go @@ -0,0 +1,54 @@ +package sync + +import ( + "crypto/sha256" + "slices" + + enc "github.com/named-data/ndnd/std/encoding" +) + +// membershipTuple is one (Name, BootstrapTime) pair in the sync group. +type membershipTuple struct { + name enc.Name + boot uint64 +} + +// ComputeMhash returns the membership hash over all (Name, BootstrapTime) pairs in state. +func ComputeMhash(state SvMap[uint64]) []byte { + members := make([]membershipTuple, 0) + for name, vals := range state.Iter() { + for _, val := range vals { + members = append(members, membershipTuple{name: name, boot: val.Boot}) + } + } + + slices.SortFunc(members, func(a, b membershipTuple) int { + if c := a.name.Compare(b.name); c != 0 { + return c + } + if a.boot < b.boot { + return -1 + } + if a.boot > b.boot { + return 1 + } + return 0 + }) + + h := sha256.New() + for _, m := range members { + h.Write(m.name.Bytes()) + h.Write(bootstrapTimeTLV(m.boot)) + } + return h.Sum(nil) +} + +// bootstrapTimeTLV encodes BOOTSTRAP-TIME-TYPE (0xD4) matching SVS v3 SeqNoEntry layout. +func bootstrapTimeTLV(boot uint64) []byte { + valLen := enc.Nat(boot).EncodingLength() + buf := make([]byte, 2+valLen) + buf[0] = 0xD4 + buf[1] = byte(valLen) + enc.Nat(boot).EncodeInto(buf[2:]) + return buf +} diff --git a/std/sync/svs_mhash_test.go b/std/sync/svs_mhash_test.go new file mode 100644 index 00000000..b37597e3 --- /dev/null +++ b/std/sync/svs_mhash_test.go @@ -0,0 +1,118 @@ +package sync_test + +import ( + "testing" + + enc "github.com/named-data/ndnd/std/encoding" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" + ndn_sync "github.com/named-data/ndnd/std/sync" + "github.com/named-data/ndnd/std/types/optional" + tu "github.com/named-data/ndnd/std/utils/testutils" + "github.com/stretchr/testify/require" +) + +func TestComputeMhashStable(t *testing.T) { + tu.SetT(t) + + m := ndn_sync.NewSvMap[uint64](0) + m.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 1) + m.Set(tu.NoErr(enc.NameFromStr("/ndn/bob")).TlvStr(), 150, 3) + + h1 := ndn_sync.ComputeMhash(m) + h2 := ndn_sync.ComputeMhash(m) + require.Equal(t, h1, h2) + require.Len(t, h1, 32) +} + +func TestComputeMhashOrderIndependent(t *testing.T) { + tu.SetT(t) + + m1 := ndn_sync.NewSvMap[uint64](0) + m1.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 1) + m1.Set(tu.NoErr(enc.NameFromStr("/ndn/bob")).TlvStr(), 150, 3) + + m2 := ndn_sync.NewSvMap[uint64](0) + m2.Set(tu.NoErr(enc.NameFromStr("/ndn/bob")).TlvStr(), 150, 3) + m2.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 1) + + require.Equal(t, ndn_sync.ComputeMhash(m1), ndn_sync.ComputeMhash(m2)) +} + +func TestComputeMhashChangesOnMembership(t *testing.T) { + tu.SetT(t) + + m := ndn_sync.NewSvMap[uint64](0) + m.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 1) + before := ndn_sync.ComputeMhash(m) + + m.Set(tu.NoErr(enc.NameFromStr("/ndn/bob")).TlvStr(), 150, 3) + after := ndn_sync.ComputeMhash(m) + require.NotEqual(t, before, after) +} + +func TestComputeMhashIgnoresSeqNo(t *testing.T) { + tu.SetT(t) + + m1 := ndn_sync.NewSvMap[uint64](0) + m1.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 1) + + m2 := ndn_sync.NewSvMap[uint64](0) + m2.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 99) + + require.Equal(t, ndn_sync.ComputeMhash(m1), ndn_sync.ComputeMhash(m2)) +} + +func TestSvsDataInlineTLV(t *testing.T) { + tu.SetT(t) + + m := ndn_sync.NewSvMap[uint64](0) + m.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 1) + sv := m.Encode(func(s uint64) uint64 { return s }) + + original := &spec_svs.SvsData{ + MemberSetHash: ndn_sync.ComputeMhash(m), + VectorType: optional.Some(spec_svs.VectorTypeFull), + StateVector: sv, + } + wire := original.Encode().Join() + + parsed, err := spec_svs.ParseSvsData(enc.NewBufferView(wire), false) + require.NoError(t, err) + require.Equal(t, original.MemberSetHash, parsed.MemberSetHash) + require.Equal(t, original.VectorType, parsed.VectorType) + require.Equal(t, original.StateVector.Entries[0].Name.String(), parsed.StateVector.Entries[0].Name.String()) +} + +func TestSvsDataAnnounceTLV(t *testing.T) { + tu.SetT(t) + + ref := tu.NoErr(enc.NameFromStr("/ndn/svs/alice/100/32=sv/1")) + mhash := make([]byte, 32) + + original := &spec_svs.SvsData{ + MemberSetHash: mhash, + SvsDataRef: ref, + } + wire := original.Encode().Join() + + parsed, err := spec_svs.ParseSvsData(enc.NewBufferView(wire), false) + require.NoError(t, err) + require.Equal(t, mhash, parsed.MemberSetHash) + require.Equal(t, ref.String(), parsed.SvsDataRef.String()) + require.Nil(t, parsed.StateVector) + require.False(t, parsed.VectorType.IsSet()) +} + +func TestSvsDataLegacyParse(t *testing.T) { + tu.SetT(t) + + m := ndn_sync.NewSvMap[uint64](0) + m.Set(tu.NoErr(enc.NameFromStr("/ndn/alice")).TlvStr(), 100, 1) + legacy := &spec_svs.SvsData{StateVector: m.Encode(func(s uint64) uint64 { return s })} + wire := legacy.Encode().Join() + + parsed, err := spec_svs.ParseSvsData(enc.NewBufferView(wire), false) + require.NoError(t, err) + require.Nil(t, parsed.MemberSetHash) + require.NotNil(t, parsed.StateVector) +} diff --git a/std/sync/svs_pull.go b/std/sync/svs_pull.go new file mode 100644 index 00000000..77f9b163 --- /dev/null +++ b/std/sync/svs_pull.go @@ -0,0 +1,229 @@ +package sync + +import ( + "bytes" + "fmt" + + enc "github.com/named-data/ndnd/std/encoding" + "github.com/named-data/ndnd/std/log" + "github.com/named-data/ndnd/std/ndn" + spec "github.com/named-data/ndnd/std/ndn/spec_2022" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" + "github.com/named-data/ndnd/std/types/optional" +) + +const ( + syncDataKeyword = "svs" + fullVectorKeyword = "sv" +) + +// deriveFullVectorPrefix maps SyncDataName (.../32=svs) to the published full-vector prefix (.../32=sv). +func deriveFullVectorPrefix(syncDataName enc.Name) enc.Name { + if len(syncDataName) == 0 { + return nil + } + base := syncDataName + if base.At(-1).IsKeyword(syncDataKeyword) { + base = base.Prefix(-1) + } + return base.Append(enc.NewKeywordComponent(fullVectorKeyword)) +} + +func pullRefFromSyncDataWire(dataWire enc.Wire) enc.Name { + data, _, err := spec.Spec{}.ReadData(enc.NewWireView(dataWire)) + if err != nil { + return nil + } + name := data.Name() + if len(name) == 0 { + return nil + } + if name.At(-1).IsVersion() { + name = name.Prefix(-1) + } + return deriveFullVectorPrefix(name) +} + +func buildAnnounceSvsData(state SvMap[uint64], ref enc.Name) *spec_svs.SvsData { + return &spec_svs.SvsData{ + MemberSetHash: ComputeMhash(state), + SvsDataRef: ref, + } +} + +// shouldUseAnnouncePull reports whether the sender should publish at 32=sv and announce-only Sync Data. +// Large-group announce+pull is disabled when threshold <= 0 (legacy mode). +func shouldUseAnnouncePull(reason syncSendReason, threshold int, state SvMap[uint64]) bool { + if threshold <= 0 { + return false + } + if reason == syncSendRecovery { + return true + } + if reason == syncSendPublication { + return false + } + return exceedsSyncThreshold(threshold, inlineFullSize(state)) +} + +// publishFullVectorData produces retrievable inline FULL SvsData at .../32=sv/. +func (s *SvSync) publishFullVectorData(state SvMap[uint64]) (enc.Name, error) { + if len(s.fullVectorPrefix) == 0 { + return nil, fmt.Errorf("full vector prefix unset") + } + content := buildInlineFullFromState(state).Encode() + name := s.fullVectorPrefix.WithVersion(enc.VersionUnixMicro) + return s.o.Client.Produce(ndn.ProduceArgs{ + Name: name, + Content: content, + }) +} + +// pullFullVector fetches a published full State Vector and merges it on the main loop. +// trustPrefix is the sender's .../32=sv prefix; ref must be equal to or below it. +func (s *SvSync) pullFullVector(ref enc.Name, trustPrefix enc.Name) { + if len(ref) == 0 { + return + } + if !isTrustedSvsDataRef(ref, trustPrefix) { + log.Warn(s, "pullFullVector rejected untrusted SvsDataRef", "ref", ref, "trust", trustPrefix) + return + } + + s.o.Client.ConsumeExt(ndn.ConsumeExtArgs{ + Name: ref.Clone(), + TryStore: true, + UseSignatureTime: s.o.UseSignatureTime, + IgnoreValidity: s.o.IgnoreValidity, + Callback: func(st ndn.ConsumeState) { + if st.Error() != nil { + log.Warn(s, "pullFullVector failed", "ref", ref, "err", st.Error()) + return + } + if !st.IsComplete() { + return + } + s.onPulledFullVector(st.Content().Join()) + }, + }) +} + +// onPulledFullVector merges a fetched inline FULL SvsData into local state. +// Segment signatures are validated by client.ConsumeExt during fetch. +func (s *SvSync) onPulledFullVector(content []byte) { + params, err := parseFullVectorContent(content) + if err != nil { + log.Warn(s, "onPulledFullVector parse failed", "err", err) + return + } + + s.recvSv <- svSyncRecvSvArgs{ + sv: params.StateVector, + vectorType: optional.Some(spec_svs.VectorTypeFull), + mhash: params.MemberSetHash, + } +} + +func parseFullVectorContent(content []byte) (*spec_svs.SvsData, error) { + params, err := spec_svs.ParseSvsData(enc.NewBufferView(content), false) + if err != nil { + return nil, err + } + if params.StateVector == nil { + return nil, fmt.Errorf("full vector content has no StateVector") + } + if vt, ok := params.VectorType.Get(); ok && vt != spec_svs.VectorTypeFull { + return nil, fmt.Errorf("full vector VectorType=%d, want FULL", vt) + } + if len(params.MemberSetHash) > 0 { + computed := ComputeMhash(stateVectorToMap(params.StateVector)) + if !bytes.Equal(params.MemberSetHash, computed) { + return nil, fmt.Errorf("full vector mhash mismatch") + } + } + return params, nil +} + +func stateVectorToMap(sv *spec_svs.StateVector) SvMap[uint64] { + m := NewSvMap[uint64](len(sv.Entries)) + for _, node := range sv.Entries { + hash := node.Name.TlvStr() + for _, entry := range node.SeqNoEntries { + m.Set(hash, entry.BootstrapTime, entry.SeqNo) + } + } + return m +} + +// sendRecoveryAnnounce publishes at 32=sv and emits announce-only Sync Data (mhash recovery). +func (s *SvSync) sendRecoveryAnnounce() { + if !s.running.Load() || s.o.Passive { + return + } + wire := s.encodeSyncData(syncSendRecovery, enc.Name{}) + s.sendSyncInterestWith(wire) +} + +// handleMhashMismatch schedules announce or pull recovery on membership mismatch. +func (s *SvSync) handleMhashMismatch(args svSyncRecvSvArgs, recvSv SvMap[uint64]) { + // [Spec] Legacy mode (threshold 0): inline FULL is merged above; large-group recovery is off. + if s.o.SyncVectorThreshold <= 0 { + return + } + + localMhash := ComputeMhash(s.state) + if bytes.Equal(localMhash, args.mhash) { + return + } + + trustPrefix := pullRefFromSyncDataWire(args.data) + if len(trustPrefix) == 0 { + trustPrefix = s.fullVectorPrefix + } + + localTuples, remoteTuples := membershipTupleCount(s.state), membershipTupleCount(recvSv) + if localTuples > remoteTuples && membershipContains(s.state, recvSv) { + go s.sendRecoveryAnnounce() + return + } + + // [Spec] Inline FULL is already merged in onReceiveStateVector. + // Pull only when the sender provided a retrievable SvsDataRef (announce-only sync). + if len(args.svsDataRef) == 0 { + return + } + go s.pullFullVector(args.svsDataRef, trustPrefix) +} + +func membershipContains(outer, inner SvMap[uint64]) bool { + for hash, vals := range inner { + for _, v := range vals { + found := false + for _, ov := range outer[hash] { + if ov.Boot == v.Boot { + found = true + break + } + } + if !found { + return false + } + } + } + return true +} + +func membershipTupleCount(m SvMap[uint64]) int { + n := 0 + for _, vals := range m { + n += len(vals) + } + return n +} + +func isTrustedSvsDataRef(ref, senderFullVectorPrefix enc.Name) bool { + if len(ref) == 0 || len(senderFullVectorPrefix) == 0 { + return false + } + return senderFullVectorPrefix.IsPrefix(ref) +} diff --git a/std/sync/svs_pull_test.go b/std/sync/svs_pull_test.go new file mode 100644 index 00000000..56e9d4ed --- /dev/null +++ b/std/sync/svs_pull_test.go @@ -0,0 +1,157 @@ +package sync + +import ( + "testing" + "time" + + enc "github.com/named-data/ndnd/std/encoding" + "github.com/named-data/ndnd/std/ndn" + spec "github.com/named-data/ndnd/std/ndn/spec_2022" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" + sig "github.com/named-data/ndnd/std/security/signer" + "github.com/named-data/ndnd/std/types/optional" + tu "github.com/named-data/ndnd/std/utils/testutils" + "github.com/stretchr/testify/require" +) + +func TestDeriveFullVectorPrefix(t *testing.T) { + tu.SetT(t) + + syncData := tu.NoErr(enc.NameFromStr("/ndn/svs/alice/1700000000/32=svs")) + prefix := deriveFullVectorPrefix(syncData) + require.Equal(t, "/ndn/svs/alice/1700000000/32=sv", prefix.String()) +} + +func TestPullRefFromSyncDataWire(t *testing.T) { + tu.SetT(t) + + syncDataName := tu.NoErr(enc.NameFromStr("/ndn/svs/alice/1700000000/32=svs")). + Append(enc.NewVersionComponent(12345)) + dataWire, err := spec.Spec{}.MakeData( + syncDataName, + &ndn.DataConfig{ContentType: optional.Some(ndn.ContentTypeBlob)}, + enc.Wire{enc.Buffer{0x01}}, + sig.NewSha256Signer(), + ) + require.NoError(t, err) + + ref := pullRefFromSyncDataWire(dataWire.Wire) + require.Equal(t, "/ndn/svs/alice/1700000000/32=sv", ref.String()) +} + +func TestBuildAnnounceSvsData(t *testing.T) { + tu.SetT(t) + + m := testSvMapAliceBob() + ref := tu.NoErr(enc.NameFromStr("/ndn/svs/alice/1700000000/32=sv/999")) + data := buildAnnounceSvsData(m, ref) + + require.Equal(t, ComputeMhash(m), data.MemberSetHash) + require.True(t, ref.Equal(data.SvsDataRef)) + require.Nil(t, data.StateVector) + require.False(t, data.VectorType.IsSet()) + + wire := data.Encode().Join() + parsed, err := spec_svs.ParseSvsData(enc.NewBufferView(wire), false) + require.NoError(t, err) + require.Equal(t, data.MemberSetHash, parsed.MemberSetHash) + require.True(t, ref.Equal(parsed.SvsDataRef)) + require.Nil(t, parsed.StateVector) +} + +func TestShouldUseAnnouncePull(t *testing.T) { + tu.SetT(t) + + m := testSvMapAliceBob() + fullSize := inlineFullSize(m) + + require.False(t, shouldUseAnnouncePull(syncSendPublication, fullSize-1, m)) + require.False(t, shouldUseAnnouncePull(syncSendPeriodic, 0, m)) + require.False(t, shouldUseAnnouncePull(syncSendPeriodic, fullSize+1, m)) + require.True(t, shouldUseAnnouncePull(syncSendPeriodic, fullSize-1, m)) + require.True(t, shouldUseAnnouncePull(syncSendOther, fullSize-1, m)) + require.False(t, shouldUseAnnouncePull(syncSendRecovery, 0, m)) + require.True(t, shouldUseAnnouncePull(syncSendRecovery, fullSize-1, m)) +} + +func TestIsTrustedSvsDataRef(t *testing.T) { + tu.SetT(t) + + trust := tu.NoErr(enc.NameFromStr("/ndn/svs/alice/1/32=sv")) + ref := tu.NoErr(enc.NameFromStr("/ndn/svs/alice/1/32=sv/999")) + bad := tu.NoErr(enc.NameFromStr("/ndn/evil/32=sv/1")) + + require.True(t, isTrustedSvsDataRef(ref, trust)) + require.True(t, isTrustedSvsDataRef(trust, trust)) + require.False(t, isTrustedSvsDataRef(bad, trust)) + require.False(t, isTrustedSvsDataRef(ref, nil)) +} + +func TestParseFullVectorContentRejectsBadMhash(t *testing.T) { + tu.SetT(t) + + m := testSvMapAliceBob() + inline := buildInlineFullFromState(m) + inline.MemberSetHash = []byte("not-a-valid-mhash-padding-000000") + wire := inline.Encode().Join() + + _, err := parseFullVectorContent(wire) + require.Error(t, err) +} + +func TestParseFullVectorContent(t *testing.T) { + tu.SetT(t) + + m := testSvMapAliceBob() + inline := buildInlineFullFromState(m) + wire := inline.Encode().Join() + + parsed, err := parseFullVectorContent(wire) + require.NoError(t, err) + require.Equal(t, inline.MemberSetHash, parsed.MemberSetHash) + require.Len(t, parsed.StateVector.Entries, 2) +} + +func TestOnPulledFullVectorMergesState(t *testing.T) { + tu.SetT(t) + + var updates []SvSyncUpdate + s := &SvSync{ + o: SvSyncOpts{ + OnUpdate: func(u SvSyncUpdate) { updates = append(updates, u) }, + PeriodicTimeout: 30 * time.Second, + }, + state: NewSvMap[uint64](0), + mtime: make(map[string]time.Time), + ticker: time.NewTicker(30 * time.Second), + recvSv: make(chan svSyncRecvSvArgs, 1), + } + + alice := tu.NoErr(enc.NameFromStr("/ndn/alice")) + s.state.Set(alice.TlvStr(), 100, 1) + + remote := testSvMapAliceBob() + content := buildInlineFullFromState(remote).Encode().Join() + + go func() { + s.onPulledFullVector(content) + }() + + s.onReceiveStateVector(<-s.recvSv) + + require.Len(t, updates, 2) + require.EqualValues(t, 3, s.state.Get(tu.NoErr(enc.NameFromStr("/ndn/bob")).TlvStr(), 150)) + require.EqualValues(t, 5, s.state.Get(alice.TlvStr(), 100)) +} + +func TestEncodeSyncDataAnnounceMode(t *testing.T) { + tu.SetT(t) + + m := testSvMapAliceBob() + require.True(t, shouldUseAnnouncePull(syncSendPeriodic, inlineFullSize(m)-1, m)) + + announce := buildAnnounceSvsData(m, tu.NoErr(enc.NameFromStr("/ndn/svs/alice/1/32=sv/2"))) + require.Nil(t, announce.StateVector) + vt, ok := announce.VectorType.Get() + require.False(t, ok || vt == spec_svs.VectorTypePartial) +} diff --git a/std/sync/svs_test.go b/std/sync/svs_test.go new file mode 100644 index 00000000..a85de1c0 --- /dev/null +++ b/std/sync/svs_test.go @@ -0,0 +1,208 @@ +package sync + +import ( + "testing" + "time" + + enc "github.com/named-data/ndnd/std/encoding" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" + "github.com/named-data/ndnd/std/types/optional" + tu "github.com/named-data/ndnd/std/utils/testutils" + "github.com/stretchr/testify/require" +) + +func TestSvSyncSmallGroup(t *testing.T) { + tu.SetT(t) + + mesh := newTestMesh(t, []string{"alice", "bob", "carol"}, nil) + defer mesh.stop() + + seq := mesh.node("alice").publish() + mesh.wait(150 * time.Millisecond) + + mesh.waitUpdates("bob", 1, time.Second) + mesh.waitUpdates("carol", 1, time.Second) + + require.EqualValues(t, seq, mesh.node("bob").svs.GetSeqNo(mesh.node("alice").producer)) + require.EqualValues(t, seq, mesh.node("carol").svs.GetSeqNo(mesh.node("alice").producer)) + + // Default threshold 0: legacy wire (StateVector only, no mhash). + alice := mesh.node("alice") + alice.svs.mutex.Lock() + legacy := buildLegacySvsData(alice.svs.state) + alice.svs.mutex.Unlock() + require.Empty(t, legacy.MemberSetHash) + require.False(t, legacy.VectorType.IsSet()) +} + +func TestSvSyncPartialPublication(t *testing.T) { + tu.SetT(t) + + threshold := 400 + mesh := newTestMesh(t, []string{"alice", "bob"}, func(id string, opts *SvSyncOpts) { + opts.SyncVectorThreshold = threshold + }) + defer mesh.stop() + + seedLargeLocalState(mesh.node("alice").svs, mesh.node("alice").producer, 18) + mesh.node("alice").publish() + mesh.wait(200 * time.Millisecond) + + mesh.waitUpdates("bob", 1, time.Second) + require.EqualValues(t, 2, mesh.node("bob").svs.GetSeqNo(mesh.node("alice").producer)) + + // PARTIAL must not copy the full membership in one shot. + require.Less(t, mesh.node("bob").memberCount(), mesh.node("alice").memberCount()) +} + +func TestSvSyncMhashPull(t *testing.T) { + tu.SetT(t) + + threshold := 400 + mesh := newTestMesh(t, []string{"alice", "bob"}, func(id string, opts *SvSyncOpts) { + opts.SyncVectorThreshold = threshold + }) + defer mesh.stop() + + alice := mesh.node("alice").svs + seedLargeLocalState(alice, mesh.node("alice").producer, 12) + alice.state.Set(mesh.node("alice").producer.TlvStr(), testMeshBoot, 1) + + // Alice publishes full vector and sends announce-only periodic sync. + go alice.sendSyncInterest(syncSendPeriodic) + mesh.wait(300 * time.Millisecond) + + mesh.waitUpdates("bob", 1, 2*time.Second) + require.Greater(t, mesh.node("bob").memberCount(), 1) +} + +func TestSvSyncPeriodicAnnounce(t *testing.T) { + tu.SetT(t) + + threshold := 400 + mesh := newTestMesh(t, []string{"alice", "bob"}, func(id string, opts *SvSyncOpts) { + opts.SyncVectorThreshold = threshold + }) + defer mesh.stop() + + seedLargeLocalState(mesh.node("alice").svs, mesh.node("alice").producer, 12) + mesh.node("alice").svs.state.Set(mesh.node("alice").producer.TlvStr(), testMeshBoot, 2) + + go mesh.node("alice").svs.sendSyncInterest(syncSendPeriodic) + mesh.wait(300 * time.Millisecond) + + mesh.waitUpdates("bob", 1, 2*time.Second) + + // Pulled FULL state should include a seeded peer. + peer0 := tu.NoErr(enc.NameFromStr("/test/svs-mesh/peer0")) + require.Greater(t, mesh.node("bob").svs.GetSeqNo(peer0), uint64(0)) +} + +func TestSvSyncJoin(t *testing.T) { + tu.SetT(t) + + mesh := newTestMesh(t, []string{"alice", "carol"}, nil) + defer mesh.stop() + + // Alice knows about herself and has seq 3. + alice := mesh.node("alice").svs + alice.state.Set(mesh.node("alice").producer.TlvStr(), testMeshBoot, 3) + + // Carol announces join with self-only [carol:0] (simulated multicast to alice). + carolName := mesh.node("carol").producer + carolSv := selfOnlyVector(carolName, testMeshBoot) + carolState := NewSvMap[uint64](0) + carolState.Set(carolName.TlvStr(), testMeshBoot, 0) + mesh.node("alice").svs.onReceiveStateVector(svSyncRecvSvArgs{ + sv: carolSv, + vectorType: optional.Some(spec_svs.VectorTypeFull), + mhash: ComputeMhash(carolState), + }) + + // Alice responds with up-to-date FULL state. + go mesh.node("alice").svs.sendSyncInterest(syncSendPeriodic) + mesh.wait(200 * time.Millisecond) + + mesh.waitUpdates("carol", 1, time.Second) + require.EqualValues(t, 3, mesh.node("carol").svs.GetSeqNo(mesh.node("alice").producer)) +} + +func TestSvSyncJoinLargeGroup(t *testing.T) { + tu.SetT(t) + + threshold := 400 + mesh := newTestMesh(t, []string{"alice", "carol"}, func(id string, opts *SvSyncOpts) { + opts.SyncVectorThreshold = threshold + }) + defer mesh.stop() + + seedLargeLocalState(mesh.node("alice").svs, mesh.node("alice").producer, 12) + mesh.node("alice").svs.state.Set(mesh.node("alice").producer.TlvStr(), testMeshBoot, 3) + + carolName := mesh.node("carol").producer + carolSv := selfOnlyVector(carolName, testMeshBoot) + carolState := NewSvMap[uint64](0) + carolState.Set(carolName.TlvStr(), testMeshBoot, 0) + mesh.node("alice").svs.onReceiveStateVector(svSyncRecvSvArgs{ + sv: carolSv, + vectorType: optional.Some(spec_svs.VectorTypeFull), + mhash: ComputeMhash(carolState), + }) + + go mesh.node("alice").svs.sendSyncInterest(syncSendRecovery) + mesh.wait(400 * time.Millisecond) + + mesh.waitUpdates("carol", 1, 2*time.Second) + require.Greater(t, mesh.node("carol").memberCount(), 1) + require.EqualValues(t, 3, mesh.node("carol").svs.GetSeqNo(mesh.node("alice").producer)) +} + +func TestHandleMhashMismatchNoBlindPull(t *testing.T) { + tu.SetT(t) + + mesh := newTestMesh(t, []string{"alice", "bob"}, nil) + defer mesh.stop() + + alice := mesh.node("alice").svs + bobOnly := NewSvMap[uint64](0) + bobOnly.Set(mesh.node("bob").producer.TlvStr(), testMeshBoot, 1) + partialSv := bobOnly.Encode(func(s uint64) uint64 { return s }) + + // Bob's inline FULL with mismatched mhash; alice is not a strict superset. + alice.onReceiveStateVector(svSyncRecvSvArgs{ + sv: partialSv, + vectorType: optional.Some(spec_svs.VectorTypeFull), + mhash: ComputeMhash(bobOnly), + }) + + // Should not hang or panic from blind pull to unpublished 32=sv prefix. + mesh.wait(100 * time.Millisecond) +} + +func TestSvSyncMhashMismatchSenderAnnounce(t *testing.T) { + tu.SetT(t) + + threshold := 400 + mesh := newTestMesh(t, []string{"alice", "bob"}, func(id string, opts *SvSyncOpts) { + opts.SyncVectorThreshold = threshold + }) + defer mesh.stop() + + seedLargeLocalState(mesh.node("alice").svs, mesh.node("alice").producer, 12) + alice := mesh.node("alice").svs + + bobOnly := NewSvMap[uint64](0) + bobOnly.Set(mesh.node("bob").producer.TlvStr(), testMeshBoot, 1) + partialSv := bobOnly.Encode(func(s uint64) uint64 { return s }) + + // Bob sends PARTIAL; Alice has strict superset membership and announces recovery. + alice.onReceiveStateVector(svSyncRecvSvArgs{ + sv: partialSv, + vectorType: optional.Some(spec_svs.VectorTypePartial), + mhash: ComputeMhash(bobOnly), + }) + + mesh.wait(400 * time.Millisecond) + mesh.waitUpdates("bob", 1, 2*time.Second) + require.Greater(t, mesh.node("bob").memberCount(), 1) +}