diff --git a/mirror_lifecycle.go b/mirror_lifecycle.go index 41d69df7e..e358e6e05 100644 --- a/mirror_lifecycle.go +++ b/mirror_lifecycle.go @@ -15,13 +15,19 @@ package tessera import ( + "bytes" "context" + "encoding/gob" + "encoding/hex" "errors" "fmt" + "io" "iter" + "log/slog" "github.com/transparency-dev/tessera/api" "github.com/transparency-dev/tessera/api/layout" + "github.com/transparency-dev/tessera/internal/parse" "golang.org/x/mod/sumdb/note" ) @@ -133,12 +139,144 @@ type MirrorPackage struct { // AddEntries processes a stream of entry packages, verifies subtree consistency proofs, // and durably commits entries to the log. // -// Returns the next required entry index, a recent pending checkpoint size, an opaque ticket for future invocations, and, optionally, a cosignature over a pending checkpoint whose size matches uploadEnd if one exists. -func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticket []byte, next func() (*MirrorPackage, error)) (uint64, uint64, []byte, []byte, error) { - return 0, 0, nil, nil, errors.New("unimplemented") +// Returns the next required entry index, a recent pending checkpoint size, an opaque +// ticket for future invocations, and, optionally, a cosignature over a pending checkpoint +// whose size matches uploadEnd if one exists. +func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticketBytes []byte, next func() (*MirrorPackage, error)) (nextEntry uint64, pendingSize uint64, newTicket []byte, cosigs []byte, err error) { + curIntegratedSize, err := mt.reader.IntegratedSize(ctx) + if err != nil { + return 0, 0, nil, nil, fmt.Errorf("failed to read integrated size: %w", err) + } + var t *ticket + if t, err = mt.openTicket(ctx, ticketBytes); err != nil { + // Invalid or empty ticket, return a new one. + pendingCP, err := mt.cpSource(ctx) + if err != nil { + return 0, 0, nil, nil, fmt.Errorf("failed to get pending checkpoint: %v", err) + } + if len(pendingCP) == 0 { + return 0, 0, nil, nil, ErrNoPendingCheckpoint + } + t = &ticket{ + PendingCP: pendingCP, + } + ticketBytes, err = mt.sealTicket(ctx, t) + if err != nil { + return 0, 0, nil, nil, fmt.Errorf("failed to create ticket: %v", err) + } + + // If the client didn't provide a [valid] ticket, then we don't have a pending + // checkpoint to validate against, so we return a new ticket with the + // current checkpoint. + _, pendingSize, _, err := parse.CheckpointUnsafe(t.PendingCP) + if err != nil { + slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(t.PendingCP)), slog.String("error", err.Error())) + return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint while creating ticket: %v", err) + } + return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict + } + + var pendingRoot []byte + _, pendingSize, pendingRoot, err = parse.CheckpointUnsafe(t.PendingCP) + if err != nil { + slog.ErrorContext(ctx, "Invalid pending checkpoint in ticket", slog.String("pending_checkpoint", string(t.PendingCP)), slog.String("error", err.Error())) + return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint from ticket: %v", err) + } + + // Handle 409 Conflicts: + // - Zero-request check: If upload_start == 0 and upload_end == 0, the client is + // requesting initial mirror information. + // - upload_end: + // * MUST be equal to the tree size of a known pending checkpoint value. + // * MUST NOT be less than the mirror checkpoint's tree size. + // - upload_start: + // * MUST NOT be greater than the mirror's next expected entry index. + // * MUST NOT be too far below the mirror's next entry index. + if (uploadStart == 0 && uploadEnd == 0) || + (uploadEnd != pendingSize || uploadEnd < curIntegratedSize) || + (uploadStart > curIntegratedSize) { + // TODO(al): add flexibility about re-writing some entries + return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict + } + + + bi := func(yield func(api.EntryBundle) bool) { + for { + pkg, err := next() + if err != nil { + if err == io.EOF { + return + } + // TODO(al): handle this + slog.WarnContext(ctx, "NextPackage returned an error", slog.String("error", err.Error())) + return + } + + // TODO(al): verify entries+proof under checkpoint (Failure -> 422 Unprocessable Entity). + + if !yield(api.EntryBundle{Entries: pkg.Entries}) { + return + } + } + } + + // TODO(al): Check uploadStart is aligned to EntryBundleWidth. + bundleIdx := uploadStart/layout.EntryBundleWidth + + nextEntry, newRoot, err := mt.writer.IntegrateBundles(ctx, bundleIdx, bi) + switch { + case err != nil: + return 0, 0, nil, nil, err + case nextEntry == pendingSize: + if !bytes.Equal(pendingRoot, newRoot) { + slog.ErrorContext(ctx, "CORRUPTION DETECTED - pending root != calculated root", slog.String("calculated_root", hex.EncodeToString(newRoot)), slog.String("pending_checkpoint", string(t.PendingCP))) + return 0, 0, nil, nil, errors.New("internal error") + } + // This is a complete upload. + // TODO(al): + // - cosign the pending checkpoint, + // - publish it IFF we not overwriting a larger checkpoint + // - If published, then return the cosig(s) to the caller. + return nextEntry, pendingSize, nil, []byte("— test cosig\n"), nil + case nextEntry > pendingSize: + // TODO(al): ticket is stale, probably need to update the ticket? + slog.WarnContext(ctx, "nextEntry > pendingSize", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingSize)) + return nextEntry, pendingSize, ticketBytes, nil, nil + default: + // Incomplete upload, return an updated ticket with the current checkpoint. + return nextEntry, pendingSize, ticketBytes, nil, nil + } } // IntegratedSize returns the size of the current integrated log. func (mt *MirrorTarget) IntegratedSize(ctx context.Context) (uint64, error) { return mt.reader.IntegratedSize(ctx) } + +// ticket is the underlying structure of an add-entries ticket. +type ticket struct { + // PendingCP holds the raw pending checkpoint bytes. + PendingCP []byte +} + +func (mt *MirrorTarget) sealTicket(ctx context.Context, t *ticket) ([]byte, error) { + out := bytes.Buffer{} + if err := gob.NewEncoder(&out).Encode(t); err != nil { + return nil, fmt.Errorf("ticket encoding failed: %v", err) + } + // TODO(al): harden ticket & bind to this particular log mirror. + return out.Bytes(), nil +} + +func (mt *MirrorTarget) openTicket(ctx context.Context, ticketBytes []byte) (*ticket, error) { + if len(ticketBytes) == 0 { + return nil, errors.New("empty ticket") + } + // TODO(al): harden ticket & verify it's for this particular log mirror. + var t ticket + if err := gob.NewDecoder(bytes.NewReader(ticketBytes)).Decode(&t); err != nil { + return nil, fmt.Errorf("ticket decoding failed: %v", err) + } + return &t, nil +} + diff --git a/mirror_lifecycle_test.go b/mirror_lifecycle_test.go new file mode 100644 index 000000000..3292aa96b --- /dev/null +++ b/mirror_lifecycle_test.go @@ -0,0 +1,192 @@ +// Copyright 2026 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tessera + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "iter" + "testing" + + "github.com/transparency-dev/tessera/api" +) + +type fakeMirrorWriter struct { + integrateFunc func(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) + sizeFunc func(ctx context.Context) (uint64, error) +} + +func (f *fakeMirrorWriter) IntegrateBundles(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) { + if f.integrateFunc != nil { + return f.integrateFunc(ctx, from, bundles) + } + return from, nil, nil +} + +func (f *fakeMirrorWriter) IntegratedSize(ctx context.Context) (uint64, error) { + if f.sizeFunc != nil { + return f.sizeFunc(ctx) + } + return 0, nil +} + +type fakeLogReader struct { + sizeFunc func(ctx context.Context) (uint64, error) +} + +func (f *fakeLogReader) IntegratedSize(ctx context.Context) (uint64, error) { + if f.sizeFunc != nil { + return f.sizeFunc(ctx) + } + return 0, nil +} +func (f *fakeLogReader) ReadCheckpoint(ctx context.Context) ([]byte, error) { return nil, nil } +func (f *fakeLogReader) ReadTile(ctx context.Context, level, index uint64, p uint8) ([]byte, error) { + return nil, nil +} +func (f *fakeLogReader) ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) { + return nil, nil +} +func (f *fakeLogReader) NextIndex(ctx context.Context) (uint64, error) { return 0, nil } + +const ( + testPendingCPOrigin = "test-origin" + testPendingCPSize = uint64(200) + testPendingCPRoot = "47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=" +) + +var testPendingCP = fmt.Sprintf("%s\n%d\n%s\n— test-sig\n", testPendingCPOrigin, testPendingCPSize, testPendingCPRoot) + +func newTestMirrorTarget(size uint64) *MirrorTarget { + return &MirrorTarget{ + writer: &fakeMirrorWriter{ + sizeFunc: func(ctx context.Context) (uint64, error) { return size, nil }, + }, + reader: &fakeLogReader{ + sizeFunc: func(ctx context.Context) (uint64, error) { return size, nil }, + }, + cpSource: func(ctx context.Context) ([]byte, error) { + return []byte(testPendingCP), nil + }, + } +} + +func TestMirrorTarget_AddEntries_NoTicket(t *testing.T) { + const ( + testIntegratedSize = uint64(100) + ) + ctx := context.Background() + mt := newTestMirrorTarget(testIntegratedSize) + + nextEntry, pendingSize, newTicket, _, err := mt.AddEntries(ctx, 0, 0, nil, func() (*MirrorPackage, error) { + return nil, io.EOF + }) + if !errors.Is(err, ErrConflict) { + t.Errorf("got %v, want ErrConflict", err) + } + if got, want := nextEntry, testIntegratedSize; got != want { + t.Errorf("got %d, want %d", got, want) + } + if got, want := pendingSize, testPendingCPSize; got != want { + t.Errorf("got %d, want %d", got, want) + } + if len(newTicket) == 0 { + t.Errorf("got empty ticket, want non-empty") + } +} + +func TestMirrorTarget_AddEntries_RangeConflict(t *testing.T) { + const ( + testUploadStart = uint64(100) + testUploadEnd = uint64(250) + testPendingSize = uint64(200) + testIntegratedSize = uint64(150) + ) + ctx := context.Background() + mt := &MirrorTarget{ + writer: &fakeMirrorWriter{ + sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil }, + }, + reader: &fakeLogReader{ + sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil }, + }, + cpSource: func(ctx context.Context) ([]byte, error) { return []byte(testPendingCP), nil }, + } + + validTicket, err := mt.sealTicket(ctx, &ticket{PendingCP: []byte(testPendingCP)}) + if err != nil { + t.Fatalf("sealTicket failed: %v", err) + } + + // testUploadEnd != pendingSize -> conflict + _, _, _, _, err = mt.AddEntries(ctx, testUploadStart, testUploadEnd, validTicket, func() (*MirrorPackage, error) { + return nil, io.EOF + }) + if !errors.Is(err, ErrConflict) { + t.Errorf("want ErrConflict, got %v", err) + } +} + +func TestMirrorTarget_AddEntries_CompleteUpload(t *testing.T) { + const ( + testIntegratedSize = uint64(100) + testUploadStart = uint64(100) + testUploadEnd = uint64(200) + ) + + ctx := context.Background() + mt := &MirrorTarget{ + writer: &fakeMirrorWriter{ + integrateFunc: func(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) { + // Consume iterator + for range bundles { + } + decodedRoot, err := base64.StdEncoding.DecodeString(testPendingCPRoot) + if err != nil { + return 0, nil, fmt.Errorf("TEST ERROR: %v", err) + } + return testPendingCPSize, decodedRoot, nil + }, + }, + reader: &fakeLogReader{ + sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil }, + }, + cpSource: func(ctx context.Context) ([]byte, error) { return []byte(testPendingCP), nil }, + } + + validTicket, err := mt.sealTicket(ctx, &ticket{PendingCP: []byte(testPendingCP)}) + if err != nil { + t.Fatalf("sealTicket failed: %v", err) + } + + nextEntry, pendingSize, _, cosigs, err := mt.AddEntries(ctx, testUploadStart, testUploadEnd, validTicket, func() (*MirrorPackage, error) { + return nil, io.EOF + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if got, want := nextEntry, testUploadEnd; got != want { + t.Errorf("got %d, want %d", got, want) + } + if got, want := pendingSize, testUploadEnd; got != want { + t.Errorf("got %d, want %d", got, want) + } + if len(cosigs) == 0 { + t.Errorf("got empty cosigs, want non-empty") + } +}