Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions compose/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func RegisterSerializableType[T any](name string) (err error) {

type CheckPointStore = core.CheckPointStore

// CheckPointDeleter is an optional interface that a CheckPointStore can implement to support
// deleting checkpoints. When the store passed to WithCheckPointStore also implements
// CheckPointDeleter, eino calls Delete after a graph run that used WithDeleteCheckpointAfterRun
// completes successfully.
type CheckPointDeleter = core.CheckPointDeleter

type Serializer interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
Expand Down Expand Up @@ -93,6 +99,18 @@ func WithForceNewRun() Option {
}
}

// WithDeleteCheckpointAfterRun deletes the checkpoint when the graph run completes successfully.
// The checkpoint is only deleted on success; it is preserved when the graph is interrupted so that
// execution can be resumed later.
//
// The checkpoint store must implement CheckPointDeleter for this option to have any effect.
// If the store does not implement CheckPointDeleter, this option is silently ignored.
func WithDeleteCheckpointAfterRun() Option {
return Option{
deleteCheckpointAfterRun: true,
}
}

// StateModifier modifies state during checkpoint operations for a given node path.
type StateModifier func(ctx context.Context, path NodePath, state any) error

Expand Down
113 changes: 113 additions & 0 deletions compose/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2002,3 +2002,116 @@ func (f *checkpointTestTool[I, O]) InvokableRun(ctx context.Context, argumentsIn
}
return sonic.MarshalString(o)
}

// deletableStore wraps inMemoryStore and also implements CheckPointDeleter.
type deletableStore struct {
*inMemoryStore
deleted []string
}

func newDeletableStore() *deletableStore {
return &deletableStore{inMemoryStore: newInMemoryStore()}
}

func (d *deletableStore) Delete(_ context.Context, checkPointID string) error {
delete(d.m, checkPointID)
d.deleted = append(d.deleted, checkPointID)
return nil
}

// TestDeleteCheckpointAfterRun verifies the behaviour of WithDeleteCheckpointAfterRun.
func TestDeleteCheckpointAfterRun(t *testing.T) {
buildGraph := func(store CheckPointStore) (Runnable[string, string], error) {
g := NewGraph[string, string]()
err := g.AddLambdaNode("node", InvokableLambda(func(ctx context.Context, input string) (string, error) {
return input + "_done", nil
}))
if err != nil {
return nil, err
}
if err = g.AddEdge(START, "node"); err != nil {
return nil, err
}
if err = g.AddEdge("node", END); err != nil {
return nil, err
}
return g.Compile(context.Background(), WithCheckPointStore(store))
}

t.Run("deletes checkpoint after successful run", func(t *testing.T) {
store := newDeletableStore()
r, err := buildGraph(store)
assert.NoError(t, err)

_, err = r.Invoke(context.Background(), "hello",
WithCheckPointID("sess-1"),
WithDeleteCheckpointAfterRun(),
)
assert.NoError(t, err)

assert.Equal(t, []string{"sess-1"}, store.deleted)
_, exists, _ := store.Get(context.Background(), "sess-1")
assert.False(t, exists, "checkpoint should have been deleted after successful run")
})

t.Run("does not delete checkpoint on interrupt", func(t *testing.T) {
store := newDeletableStore()

g := NewGraph[string, string]()
err := g.AddLambdaNode("node", InvokableLambda(func(ctx context.Context, input string) (string, error) {
return input + "_done", nil
}))
assert.NoError(t, err)
assert.NoError(t, g.AddEdge(START, "node"))
assert.NoError(t, g.AddEdge("node", END))

r, err := g.Compile(context.Background(),
WithCheckPointStore(store),
WithInterruptBeforeNodes([]string{"node"}),
)
assert.NoError(t, err)

_, err = r.Invoke(context.Background(), "hello",
WithCheckPointID("sess-2"),
WithDeleteCheckpointAfterRun(),
)
assert.NotNil(t, err)
_, isInterrupt := ExtractInterruptInfo(err)
assert.True(t, isInterrupt)

// Checkpoint must still exist so it can be resumed.
assert.Empty(t, store.deleted)
_, exists, _ := store.Get(context.Background(), "sess-2")
assert.True(t, exists, "checkpoint should be preserved after an interrupt")
})

t.Run("silently ignored when store does not implement CheckPointDeleter", func(t *testing.T) {
store := newInMemoryStore() // does NOT implement CheckPointDeleter
r, err := buildGraph(store)
assert.NoError(t, err)

// Should succeed without any panic or error, even though the store cannot delete.
result, err := r.Invoke(context.Background(), "hello",
WithCheckPointID("sess-3"),
WithDeleteCheckpointAfterRun(),
)
assert.NoError(t, err)
assert.Equal(t, "hello_done", result)
})

t.Run("deletes write checkpoint when different from read checkpoint", func(t *testing.T) {
store := newDeletableStore()
r, err := buildGraph(store)
assert.NoError(t, err)

_, err = r.Invoke(context.Background(), "hello",
WithCheckPointID("read-cp"),
WithWriteToCheckPointID("write-cp"),
WithDeleteCheckpointAfterRun(),
)
assert.NoError(t, err)

// Only the write checkpoint should be deleted.
assert.Equal(t, []string{"write-cp"}, store.deleted)
})
}
11 changes: 6 additions & 5 deletions compose/graph_call_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ type Option struct {

paths []*NodePath

maxRunSteps int
checkPointID *string
writeToCheckPointID *string
forceNewRun bool
stateModifier StateModifier
maxRunSteps int
checkPointID *string
writeToCheckPointID *string
forceNewRun bool
stateModifier StateModifier
deleteCheckpointAfterRun bool
}

func (o Option) deepCopy() Option {
Expand Down
30 changes: 26 additions & 4 deletions compose/graph_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func runnableTransform(ctx context.Context, r *composableRunnable, input any, op

func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Option) (result any, err error) {
haveOnStart := false // delay triggering onGraphStart until state initialization is complete, so that the state can be accessed within onGraphStart.
// Pre-declare so the defer below can capture them before they are assigned.
var deleteAfterRun bool
var writeToCheckPointID *string
var isSubGraph bool
defer func() {
if !haveOnStart {
ctx, input = onGraphStart(ctx, input, isStream)
Expand All @@ -116,6 +120,15 @@ func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Opti
ctx, err = onGraphError(ctx, err)
} else {
ctx, result = onGraphEnd(ctx, result, isStream)
// Delete the checkpoint when the run completes successfully and the caller
// opted in via WithDeleteCheckpointAfterRun. Only delete at the top-level
// graph (not in subgraphs). Silently skip if the store does not implement
// CheckPointDeleter.
if deleteAfterRun && !isSubGraph && writeToCheckPointID != nil {
if deleter, ok := r.checkPointer.store.(CheckPointDeleter); ok {
_ = deleter.Delete(ctx, *writeToCheckPointID)
}
}
}
}()

Expand All @@ -142,13 +155,17 @@ func (r *runner) run(ctx context.Context, isStream bool, input any, opts ...Opti
}

// Extract CheckPointID
checkPointID, writeToCheckPointID, stateModifier, forceNewRun := getCheckPointInfo(opts...)
var checkPointID *string
var stateModifier StateModifier
var forceNewRun bool
checkPointID, writeToCheckPointID, stateModifier, forceNewRun, deleteAfterRun = getCheckPointInfo(opts...)
if checkPointID != nil && r.checkPointer.store == nil {
return nil, newGraphRunError(fmt.Errorf("receive checkpoint id but have not set checkpoint store"))
}

// Extract subgraph
path, isSubGraph := getNodePath(ctx)
var path *NodePath
path, isSubGraph = getNodePath(ctx)

// load checkpoint from ctx/store or init graph
initialized := false
Expand Down Expand Up @@ -753,7 +770,7 @@ func (r *runner) createTasks(ctx context.Context, nodeMap map[string]any, optMap
return nextTasks, nil
}

func getCheckPointInfo(opts ...Option) (checkPointID *string, writeToCheckPointID *string, stateModifier StateModifier, forceNewRun bool) {
func getCheckPointInfo(opts ...Option) (checkPointID *string, writeToCheckPointID *string, stateModifier StateModifier, forceNewRun bool, deleteAfterRun bool) {
for _, opt := range opts {
if opt.checkPointID != nil {
checkPointID = opt.checkPointID
Expand All @@ -764,7 +781,12 @@ func getCheckPointInfo(opts ...Option) (checkPointID *string, writeToCheckPointI
if opt.stateModifier != nil {
stateModifier = opt.stateModifier
}
forceNewRun = opt.forceNewRun
if opt.forceNewRun {
forceNewRun = opt.forceNewRun
}
if opt.deleteCheckpointAfterRun {
deleteAfterRun = opt.deleteCheckpointAfterRun
}
}
if writeToCheckPointID == nil {
writeToCheckPointID = checkPointID
Expand Down
8 changes: 8 additions & 0 deletions internal/core/interrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type CheckPointStore interface {
Set(ctx context.Context, checkPointID string, checkPoint []byte) error
}

// CheckPointDeleter is an optional extension of CheckPointStore that supports deleting checkpoints.
// When a CheckPointStore also implements CheckPointDeleter, eino will call Delete after a graph run
// completes successfully (and WithDeleteCheckpointAfterRun was used), so that stale checkpoints are
// not left behind.
type CheckPointDeleter interface {
Delete(ctx context.Context, checkPointID string) error
}

type InterruptSignal struct {
ID string
Address
Expand Down