Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 141 additions & 3 deletions mirror_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
package tessera

import (
"bytes"
"context"
"encoding/gob"
"encoding/hex"
"errors"
"fmt"
"io"
"iter"
"log/slog"

"github.com/transparency-dev/tessera/api"
"github.com/transparency-dev/tessera/api/layout"
"github.com/transparency-dev/tessera/internal/parse"
"golang.org/x/mod/sumdb/note"
)

Expand Down Expand Up @@ -133,12 +139,144 @@ type MirrorPackage struct {
// AddEntries processes a stream of entry packages, verifies subtree consistency proofs,
// and durably commits entries to the log.
//
// Returns the next required entry index, a recent pending checkpoint size, an opaque ticket for future invocations, and, optionally, a cosignature over a pending checkpoint whose size matches uploadEnd if one exists.
func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticket []byte, next func() (*MirrorPackage, error)) (uint64, uint64, []byte, []byte, error) {
return 0, 0, nil, nil, errors.New("unimplemented")
// Returns the next required entry index, a recent pending checkpoint size, an opaque
// ticket for future invocations, and, optionally, a cosignature over a pending checkpoint
// whose size matches uploadEnd if one exists.
func (mt *MirrorTarget) AddEntries(ctx context.Context, uploadStart, uploadEnd uint64, ticketBytes []byte, next func() (*MirrorPackage, error)) (nextEntry uint64, pendingSize uint64, newTicket []byte, cosigs []byte, err error) {
curIntegratedSize, err := mt.reader.IntegratedSize(ctx)
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to read integrated size: %w", err)
}
var t *ticket
if t, err = mt.openTicket(ctx, ticketBytes); err != nil {
// Invalid or empty ticket, return a new one.
pendingCP, err := mt.cpSource(ctx)
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to get pending checkpoint: %v", err)
}
if len(pendingCP) == 0 {
return 0, 0, nil, nil, ErrNoPendingCheckpoint
}
t = &ticket{
PendingCP: pendingCP,
}
ticketBytes, err = mt.sealTicket(ctx, t)
if err != nil {
return 0, 0, nil, nil, fmt.Errorf("failed to create ticket: %v", err)
}

// If the client didn't provide a [valid] ticket, then we don't have a pending
// checkpoint to validate against, so we return a new ticket with the
// current checkpoint.
_, pendingSize, _, err := parse.CheckpointUnsafe(t.PendingCP)
if err != nil {
slog.ErrorContext(ctx, "Invalid pending checkpoint from source", slog.String("pending_checkpoint", string(t.PendingCP)), slog.String("error", err.Error()))
return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint while creating ticket: %v", err)
}
return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict
}

var pendingRoot []byte
_, pendingSize, pendingRoot, err = parse.CheckpointUnsafe(t.PendingCP)
if err != nil {
slog.ErrorContext(ctx, "Invalid pending checkpoint in ticket", slog.String("pending_checkpoint", string(t.PendingCP)), slog.String("error", err.Error()))
return 0, 0, nil, nil, fmt.Errorf("failed to parse pending checkpoint from ticket: %v", err)
}

// Handle 409 Conflicts:
// - Zero-request check: If upload_start == 0 and upload_end == 0, the client is
// requesting initial mirror information.
// - upload_end:
// * MUST be equal to the tree size of a known pending checkpoint value.
// * MUST NOT be less than the mirror checkpoint's tree size.
// - upload_start:
// * MUST NOT be greater than the mirror's next expected entry index.
// * MUST NOT be too far below the mirror's next entry index.
if (uploadStart == 0 && uploadEnd == 0) ||
(uploadEnd != pendingSize || uploadEnd < curIntegratedSize) ||
(uploadStart > curIntegratedSize) {
// TODO(al): add flexibility about re-writing some entries
return curIntegratedSize, pendingSize, ticketBytes, nil, ErrConflict
}


bi := func(yield func(api.EntryBundle) bool) {
for {
pkg, err := next()
if err != nil {
if err == io.EOF {
return
}
// TODO(al): handle this
slog.WarnContext(ctx, "NextPackage returned an error", slog.String("error", err.Error()))
return
}

// TODO(al): verify entries+proof under checkpoint (Failure -> 422 Unprocessable Entity).

if !yield(api.EntryBundle{Entries: pkg.Entries}) {
return
}
}
}

// TODO(al): Check uploadStart is aligned to EntryBundleWidth.
bundleIdx := uploadStart/layout.EntryBundleWidth

nextEntry, newRoot, err := mt.writer.IntegrateBundles(ctx, bundleIdx, bi)
switch {
case err != nil:
return 0, 0, nil, nil, err
case nextEntry == pendingSize:
if !bytes.Equal(pendingRoot, newRoot) {
slog.ErrorContext(ctx, "CORRUPTION DETECTED - pending root != calculated root", slog.String("calculated_root", hex.EncodeToString(newRoot)), slog.String("pending_checkpoint", string(t.PendingCP)))
return 0, 0, nil, nil, errors.New("internal error")
}
// This is a complete upload.
// TODO(al):
// - cosign the pending checkpoint,
// - publish it IFF we not overwriting a larger checkpoint
// - If published, then return the cosig(s) to the caller.
return nextEntry, pendingSize, nil, []byte("— test cosig\n"), nil
case nextEntry > pendingSize:
// TODO(al): ticket is stale, probably need to update the ticket?
slog.WarnContext(ctx, "nextEntry > pendingSize", slog.Uint64("nextEntry", nextEntry), slog.Uint64("pendingSize", pendingSize))
return nextEntry, pendingSize, ticketBytes, nil, nil
default:
// Incomplete upload, return an updated ticket with the current checkpoint.
return nextEntry, pendingSize, ticketBytes, nil, nil
}
}

// IntegratedSize returns the size of the current integrated log.
func (mt *MirrorTarget) IntegratedSize(ctx context.Context) (uint64, error) {
return mt.reader.IntegratedSize(ctx)
}

// ticket is the underlying structure of an add-entries ticket.
type ticket struct {
// PendingCP holds the raw pending checkpoint bytes.
PendingCP []byte
}

func (mt *MirrorTarget) sealTicket(ctx context.Context, t *ticket) ([]byte, error) {
out := bytes.Buffer{}
if err := gob.NewEncoder(&out).Encode(t); err != nil {
return nil, fmt.Errorf("ticket encoding failed: %v", err)
}
// TODO(al): harden ticket & bind to this particular log mirror.
return out.Bytes(), nil
}

func (mt *MirrorTarget) openTicket(ctx context.Context, ticketBytes []byte) (*ticket, error) {
if len(ticketBytes) == 0 {
return nil, errors.New("empty ticket")
}
// TODO(al): harden ticket & verify it's for this particular log mirror.
var t ticket
if err := gob.NewDecoder(bytes.NewReader(ticketBytes)).Decode(&t); err != nil {
return nil, fmt.Errorf("ticket decoding failed: %v", err)
}
return &t, nil
}

192 changes: 192 additions & 0 deletions mirror_lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2026 The Tessera authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tessera

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"iter"
"testing"

"github.com/transparency-dev/tessera/api"
)

type fakeMirrorWriter struct {
integrateFunc func(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error)
sizeFunc func(ctx context.Context) (uint64, error)
}

func (f *fakeMirrorWriter) IntegrateBundles(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) {
if f.integrateFunc != nil {
return f.integrateFunc(ctx, from, bundles)
}
return from, nil, nil
}

func (f *fakeMirrorWriter) IntegratedSize(ctx context.Context) (uint64, error) {
if f.sizeFunc != nil {
return f.sizeFunc(ctx)
}
return 0, nil
}

type fakeLogReader struct {
sizeFunc func(ctx context.Context) (uint64, error)
}

func (f *fakeLogReader) IntegratedSize(ctx context.Context) (uint64, error) {
if f.sizeFunc != nil {
return f.sizeFunc(ctx)
}
return 0, nil
}
func (f *fakeLogReader) ReadCheckpoint(ctx context.Context) ([]byte, error) { return nil, nil }
func (f *fakeLogReader) ReadTile(ctx context.Context, level, index uint64, p uint8) ([]byte, error) {
return nil, nil
}
func (f *fakeLogReader) ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) {
return nil, nil
}
func (f *fakeLogReader) NextIndex(ctx context.Context) (uint64, error) { return 0, nil }

const (
testPendingCPOrigin = "test-origin"
testPendingCPSize = uint64(200)
testPendingCPRoot = "47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU="
)

var testPendingCP = fmt.Sprintf("%s\n%d\n%s\n— test-sig\n", testPendingCPOrigin, testPendingCPSize, testPendingCPRoot)

func newTestMirrorTarget(size uint64) *MirrorTarget {
return &MirrorTarget{
writer: &fakeMirrorWriter{
sizeFunc: func(ctx context.Context) (uint64, error) { return size, nil },
},
reader: &fakeLogReader{
sizeFunc: func(ctx context.Context) (uint64, error) { return size, nil },
},
cpSource: func(ctx context.Context) ([]byte, error) {
return []byte(testPendingCP), nil
},
}
}

func TestMirrorTarget_AddEntries_NoTicket(t *testing.T) {
const (
testIntegratedSize = uint64(100)
)
ctx := context.Background()
mt := newTestMirrorTarget(testIntegratedSize)

nextEntry, pendingSize, newTicket, _, err := mt.AddEntries(ctx, 0, 0, nil, func() (*MirrorPackage, error) {
return nil, io.EOF
})
if !errors.Is(err, ErrConflict) {
t.Errorf("got %v, want ErrConflict", err)
}
if got, want := nextEntry, testIntegratedSize; got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := pendingSize, testPendingCPSize; got != want {
t.Errorf("got %d, want %d", got, want)
}
if len(newTicket) == 0 {
t.Errorf("got empty ticket, want non-empty")
}
}

func TestMirrorTarget_AddEntries_RangeConflict(t *testing.T) {
const (
testUploadStart = uint64(100)
testUploadEnd = uint64(250)
testPendingSize = uint64(200)
testIntegratedSize = uint64(150)
)
ctx := context.Background()
mt := &MirrorTarget{
writer: &fakeMirrorWriter{
sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil },
},
reader: &fakeLogReader{
sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil },
},
cpSource: func(ctx context.Context) ([]byte, error) { return []byte(testPendingCP), nil },
}

validTicket, err := mt.sealTicket(ctx, &ticket{PendingCP: []byte(testPendingCP)})
if err != nil {
t.Fatalf("sealTicket failed: %v", err)
}

// testUploadEnd != pendingSize -> conflict
_, _, _, _, err = mt.AddEntries(ctx, testUploadStart, testUploadEnd, validTicket, func() (*MirrorPackage, error) {
return nil, io.EOF
})
if !errors.Is(err, ErrConflict) {
t.Errorf("want ErrConflict, got %v", err)
}
}

func TestMirrorTarget_AddEntries_CompleteUpload(t *testing.T) {
const (
testIntegratedSize = uint64(100)
testUploadStart = uint64(100)
testUploadEnd = uint64(200)
)

ctx := context.Background()
mt := &MirrorTarget{
writer: &fakeMirrorWriter{
integrateFunc: func(ctx context.Context, from uint64, bundles iter.Seq[api.EntryBundle]) (uint64, []byte, error) {
// Consume iterator
for range bundles {
}
decodedRoot, err := base64.StdEncoding.DecodeString(testPendingCPRoot)
if err != nil {
return 0, nil, fmt.Errorf("TEST ERROR: %v", err)
}
return testPendingCPSize, decodedRoot, nil
},
},
reader: &fakeLogReader{
sizeFunc: func(ctx context.Context) (uint64, error) { return testIntegratedSize, nil },
},
cpSource: func(ctx context.Context) ([]byte, error) { return []byte(testPendingCP), nil },
}

validTicket, err := mt.sealTicket(ctx, &ticket{PendingCP: []byte(testPendingCP)})
if err != nil {
t.Fatalf("sealTicket failed: %v", err)
}

nextEntry, pendingSize, _, cosigs, err := mt.AddEntries(ctx, testUploadStart, testUploadEnd, validTicket, func() (*MirrorPackage, error) {
return &MirrorPackage{Entries: [][]byte{[]byte("e1")}}, io.EOF

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to return both package and EOF error at the same time?

})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if got, want := nextEntry, testUploadEnd; got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := pendingSize, testUploadEnd; got != want {
t.Errorf("got %d, want %d", got, want)
}
if len(cosigs) == 0 {
t.Errorf("got empty cosigs, want non-empty")
}
}
Loading