Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions tests/continue_as_new_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
Expand Down Expand Up @@ -455,6 +456,130 @@ func (s *ContinueAsNewTestSuite) TestWorkflowContinueAsNewTaskID() {
}
}

// TestContinueAsNewWithDelayStart verifies that the server honors a "delay start" on a
// continue-as-new command: the first workflow task of the new run is deferred by the requested
// interval instead of being scheduled immediately.
//
// The SDKs do not yet expose a delay-start option on the continue-as-new command, so we set the
// BackoffStartInterval field directly on the raw ContinueAsNewWorkflowExecutionCommandAttributes.
// The task poller sends this command over gRPC verbatim, which is how we simulate an SDK that does
// support the flag.
func (s *ContinueAsNewTestSuite) TestContinueAsNewWithDelayStart() {
// Disable the minimal continue-as-new interval so the only backoff applied to the new run is the
// one we explicitly request. Otherwise the server enforces WorkflowIdReuseMinimalInterval and the
// observed backoff could come from there rather than from our requested delay.
s.OverrideDynamicConfig(dynamicconfig.WorkflowIdReuseMinimalInterval, time.Duration(0))

id := "functional-continue-as-new-delay-start-test"
wt := "functional-continue-as-new-delay-start-test-type"
tl := "functional-continue-as-new-delay-start-test-taskqueue"
identity := "worker1"

workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.NewString(),
Namespace: s.Namespace().String(),
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowRunTimeout: durationpb.New(100 * time.Second),
WorkflowTaskTimeout: durationpb.New(10 * time.Second),
Identity: identity,
}

we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request)
s.NoError(err)
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

const delayStart = 1 * time.Second

continueAsNewed := false
workflowComplete := false
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
if !continueAsNewed {
continueAsNewed = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{
ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowRunTimeout: durationpb.New(100 * time.Second),
WorkflowTaskTimeout: durationpb.New(10 * time.Second),
// "delay start": SDKs don't expose this yet, so we set it on the raw command.
BackoffStartInterval: durationpb.New(delayStart),
},
},
}}, nil
}

workflowComplete = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
},
},
}}, nil
}

poller := &testcore.TaskPoller{
Client: s.FrontendClient(),
Namespace: s.Namespace().String(),
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.Logger,
T: s.T(),
}

// Process the first workflow task: it issues a continue-as-new with a delayed start.
_, err = poller.PollAndProcessWorkflowTask()
s.NoError(err)
s.False(workflowComplete)

// The original run's history must record the requested backoff on the continue-as-new event.
firstRunEvents := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{WorkflowId: id, RunId: we.RunId})
continuedAsNewEvent := firstRunEvents[len(firstRunEvents)-1]
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW, continuedAsNewEvent.GetEventType())
canAttrs := continuedAsNewEvent.GetWorkflowExecutionContinuedAsNewEventAttributes()
s.Equal(delayStart, canAttrs.GetBackoffStartInterval().AsDuration())
newRunID := canAttrs.GetNewExecutionRunId()
s.NotEmpty(newRunID)
s.NotEqual(we.RunId, newRunID)

newRunExecution := &commonpb.WorkflowExecution{WorkflowId: id, RunId: newRunID}

// Process the new run's first workflow task. The poller long-polls and blocks until the task
// becomes available, which only happens once the delay-start backoff timer fires (~delayStart
// later). Long-polling here keeps the test deterministic: we don't have to inspect intermediate
// history to observe that the task was deferred.
_, err = poller.PollAndProcessWorkflowTask(testcore.WithDumpHistory)
s.NoError(err)
s.True(workflowComplete)

// The new run started with the requested first-workflow-task backoff and ran to completion.
finalEvents := s.GetHistory(s.Namespace().String(), newRunExecution)
startedAttrs := finalEvents[0].GetWorkflowExecutionStartedEventAttributes()
s.Equal(delayStart, startedAttrs.GetFirstWorkflowTaskBackoff().AsDuration())
s.Equal(we.RunId, startedAttrs.GetContinuedExecutionRunId())
s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 WorkflowExecutionCompleted`, finalEvents)

// The first workflow task of the new run was scheduled only after the delay-start backoff: its
// scheduled-event timestamp is delayStart after the run's start event.
startedTime := finalEvents[0].GetEventTime().AsTime()
scheduledTime := finalEvents[1].GetEventTime().AsTime()
s.GreaterOrEqual(scheduledTime.Sub(startedTime), delayStart)
}

type (
ParentWithChildContinueAsNew struct {
suite *ContinueAsNewTestSuite
Expand Down
Loading