From 14b518f70703a63a407088d1bc0a87832ceab0aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Bubacz?= Date: Wed, 15 Apr 2026 14:25:36 +0200 Subject: [PATCH] feat: implement checkpoint deletion after successful graph run --- compose/checkpoint.go | 18 ++++++ compose/checkpoint_test.go | 113 ++++++++++++++++++++++++++++++++++ compose/graph_call_options.go | 11 ++-- compose/graph_run.go | 30 +++++++-- internal/core/interrupt.go | 8 +++ 5 files changed, 171 insertions(+), 9 deletions(-) diff --git a/compose/checkpoint.go b/compose/checkpoint.go index 1d29a0732..04679ced6 100644 --- a/compose/checkpoint.go +++ b/compose/checkpoint.go @@ -50,6 +50,12 @@ func RegisterSerializableType[T any](name string) (err error) { type CheckPointStore = core.CheckPointStore +// CheckPointDeleter is an optional interface that a CheckPointStore can implement to support +// deleting checkpoints. When the store passed to WithCheckPointStore also implements +// CheckPointDeleter, eino calls Delete after a graph run that used WithDeleteCheckpointAfterRun +// completes successfully. +type CheckPointDeleter = core.CheckPointDeleter + type Serializer interface { Marshal(v any) ([]byte, error) Unmarshal(data []byte, v any) error @@ -93,6 +99,18 @@ func WithForceNewRun() Option { } } +// WithDeleteCheckpointAfterRun deletes the checkpoint when the graph run completes successfully. +// The checkpoint is only deleted on success; it is preserved when the graph is interrupted so that +// execution can be resumed later. +// +// The checkpoint store must implement CheckPointDeleter for this option to have any effect. +// If the store does not implement CheckPointDeleter, this option is silently ignored. +func WithDeleteCheckpointAfterRun() Option { + return Option{ + deleteCheckpointAfterRun: true, + } +} + // StateModifier modifies state during checkpoint operations for a given node path. type StateModifier func(ctx context.Context, path NodePath, state any) error diff --git a/compose/checkpoint_test.go b/compose/checkpoint_test.go index c24b6ce6f..9e3fcfdb3 100644 --- a/compose/checkpoint_test.go +++ b/compose/checkpoint_test.go @@ -2002,3 +2002,116 @@ func (f *checkpointTestTool[I, O]) InvokableRun(ctx context.Context, argumentsIn } return sonic.MarshalString(o) } + +// deletableStore wraps inMemoryStore and also implements CheckPointDeleter. +type deletableStore struct { + *inMemoryStore + deleted []string +} + +func newDeletableStore() *deletableStore { + return &deletableStore{inMemoryStore: newInMemoryStore()} +} + +func (d *deletableStore) Delete(_ context.Context, checkPointID string) error { + delete(d.m, checkPointID) + d.deleted = append(d.deleted, checkPointID) + return nil +} + +// TestDeleteCheckpointAfterRun verifies the behaviour of WithDeleteCheckpointAfterRun. +func TestDeleteCheckpointAfterRun(t *testing.T) { + buildGraph := func(store CheckPointStore) (Runnable[string, string], error) { + g := NewGraph[string, string]() + err := g.AddLambdaNode("node", InvokableLambda(func(ctx context.Context, input string) (string, error) { + return input + "_done", nil + })) + if err != nil { + return nil, err + } + if err = g.AddEdge(START, "node"); err != nil { + return nil, err + } + if err = g.AddEdge("node", END); err != nil { + return nil, err + } + return g.Compile(context.Background(), WithCheckPointStore(store)) + } + + t.Run("deletes checkpoint after successful run", func(t *testing.T) { + store := newDeletableStore() + r, err := buildGraph(store) + assert.NoError(t, err) + + _, err = r.Invoke(context.Background(), "hello", + WithCheckPointID("sess-1"), + WithDeleteCheckpointAfterRun(), + ) + assert.NoError(t, err) + + assert.Equal(t, []string{"sess-1"}, store.deleted) + _, exists, _ := store.Get(context.Background(), "sess-1") + assert.False(t, exists, "checkpoint should have been deleted after successful run") + }) + + t.Run("does not delete checkpoint on interrupt", func(t *testing.T) { + store := newDeletableStore() + + g := NewGraph[string, string]() + err := g.AddLambdaNode("node", InvokableLambda(func(ctx context.Context, input string) (string, error) { + return input + "_done", nil + })) + assert.NoError(t, err) + assert.NoError(t, g.AddEdge(START, "node")) + assert.NoError(t, g.AddEdge("node", END)) + + r, err := g.Compile(context.Background(), + WithCheckPointStore(store), + WithInterruptBeforeNodes([]string{"node"}), + ) + assert.NoError(t, err) + + _, err = r.Invoke(context.Background(), "hello", + WithCheckPointID("sess-2"), + WithDeleteCheckpointAfterRun(), + ) + assert.NotNil(t, err) + _, isInterrupt := ExtractInterruptInfo(err) + assert.True(t, isInterrupt) + + // Checkpoint must still exist so it can be resumed. + assert.Empty(t, store.deleted) + _, exists, _ := store.Get(context.Background(), "sess-2") + assert.True(t, exists, "checkpoint should be preserved after an interrupt") + }) + + t.Run("silently ignored when store does not implement CheckPointDeleter", func(t *testing.T) { + store := newInMemoryStore() // does NOT implement CheckPointDeleter + r, err := buildGraph(store) + assert.NoError(t, err) + + // Should succeed without any panic or error, even though the store cannot delete. + result, err := r.Invoke(context.Background(), "hello", + WithCheckPointID("sess-3"), + WithDeleteCheckpointAfterRun(), + ) + assert.NoError(t, err) + assert.Equal(t, "hello_done", result) + }) + + t.Run("deletes write checkpoint when different from read checkpoint", func(t *testing.T) { + store := newDeletableStore() + r, err := buildGraph(store) + assert.NoError(t, err) + + _, err = r.Invoke(context.Background(), "hello", + WithCheckPointID("read-cp"), + WithWriteToCheckPointID("write-cp"), + WithDeleteCheckpointAfterRun(), + ) + assert.NoError(t, err) + + // Only the write checkpoint should be deleted. + assert.Equal(t, []string{"write-cp"}, store.deleted) + }) +} diff --git a/compose/graph_call_options.go b/compose/graph_call_options.go index eb2ab1643..2762b422f 100644 --- a/compose/graph_call_options.go +++ b/compose/graph_call_options.go @@ -99,11 +99,12 @@ type Option struct { paths []*NodePath - maxRunSteps int - checkPointID *string - writeToCheckPointID *string - forceNewRun bool - stateModifier StateModifier + maxRunSteps int + checkPointID *string + writeToCheckPointID *string + forceNewRun bool + stateModifier StateModifier + deleteCheckpointAfterRun bool } func (o Option) deepCopy() Option { diff --git a/compose/graph_run.go b/compose/graph_run.go index a3e81ecf1..ef4fe65a8 100644 --- a/compose/graph_run.go +++ b/compose/graph_run.go @@ -108,6 +108,10 @@ func runnableTransform(ctx context.Context, r *composableRunnable, input any, op func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Option) (result any, err error) { haveOnStart := false // delay triggering onGraphStart until state initialization is complete, so that the state can be accessed within onGraphStart. + // Pre-declare so the defer below can capture them before they are assigned. + var deleteAfterRun bool + var writeToCheckPointID *string + var isSubGraph bool defer func() { if !haveOnStart { ctx, input = onGraphStart(ctx, input, isStream) @@ -116,6 +120,15 @@ func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Opti ctx, err = onGraphError(ctx, err) } else { ctx, result = onGraphEnd(ctx, result, isStream) + // Delete the checkpoint when the run completes successfully and the caller + // opted in via WithDeleteCheckpointAfterRun. Only delete at the top-level + // graph (not in subgraphs). Silently skip if the store does not implement + // CheckPointDeleter. + if deleteAfterRun && !isSubGraph && writeToCheckPointID != nil { + if deleter, ok := r.checkPointer.store.(CheckPointDeleter); ok { + _ = deleter.Delete(ctx, *writeToCheckPointID) + } + } } }() @@ -142,13 +155,17 @@ func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Opti } // Extract CheckPointID - checkPointID, writeToCheckPointID, stateModifier, forceNewRun := getCheckPointInfo(opts...) + var checkPointID *string + var stateModifier StateModifier + var forceNewRun bool + checkPointID, writeToCheckPointID, stateModifier, forceNewRun, deleteAfterRun = getCheckPointInfo(opts...) if checkPointID != nil && r.checkPointer.store == nil { return nil, newGraphRunError(fmt.Errorf("receive checkpoint id but have not set checkpoint store")) } // Extract subgraph - path, isSubGraph := getNodePath(ctx) + var path *NodePath + path, isSubGraph = getNodePath(ctx) // load checkpoint from ctx/store or init graph initialized := false @@ -753,7 +770,7 @@ func (r *runner) createTasks(ctx context.Context, nodeMap map[string]any, optMap return nextTasks, nil } -func getCheckPointInfo(opts ...Option) (checkPointID *string, writeToCheckPointID *string, stateModifier StateModifier, forceNewRun bool) { +func getCheckPointInfo(opts ...Option) (checkPointID *string, writeToCheckPointID *string, stateModifier StateModifier, forceNewRun bool, deleteAfterRun bool) { for _, opt := range opts { if opt.checkPointID != nil { checkPointID = opt.checkPointID @@ -764,7 +781,12 @@ func getCheckPointInfo(opts ...Option) (checkPointID *string, writeToCheckPointI if opt.stateModifier != nil { stateModifier = opt.stateModifier } - forceNewRun = opt.forceNewRun + if opt.forceNewRun { + forceNewRun = opt.forceNewRun + } + if opt.deleteCheckpointAfterRun { + deleteAfterRun = opt.deleteCheckpointAfterRun + } } if writeToCheckPointID == nil { writeToCheckPointID = checkPointID diff --git a/internal/core/interrupt.go b/internal/core/interrupt.go index d7a934a3d..b094c8881 100644 --- a/internal/core/interrupt.go +++ b/internal/core/interrupt.go @@ -29,6 +29,14 @@ type CheckPointStore interface { Set(ctx context.Context, checkPointID string, checkPoint []byte) error } +// CheckPointDeleter is an optional extension of CheckPointStore that supports deleting checkpoints. +// When a CheckPointStore also implements CheckPointDeleter, eino will call Delete after a graph run +// completes successfully (and WithDeleteCheckpointAfterRun was used), so that stale checkpoints are +// not left behind. +type CheckPointDeleter interface { + Delete(ctx context.Context, checkPointID string) error +} + type InterruptSignal struct { ID string Address