Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.durabletask;

import io.dapr.durabletask.implementation.protobuf.Orchestration;

/**
* Controls how execution history is propagated to a child workflow or activity.
*/
public enum HistoryPropagationScope {
/**
* No propagation. The child receives no history from the caller.
*/
NONE,

/**
* Propagate the caller's own history events only. The child does not see
* any ancestral history (trust boundary).
*/
OWN_HISTORY,

/**
* Propagate the caller's own history events AND the full ancestral chain.
* Any propagated history this workflow received from its parent is forwarded to the child.
*/
LINEAGE;

Orchestration.HistoryPropagationScope toProto() {
switch (this) {
case OWN_HISTORY:
return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY;
case LINEAGE:
return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE;
default:
return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_NONE;
}
}

static HistoryPropagationScope fromProto(Orchestration.HistoryPropagationScope proto) {
switch (proto) {
case HISTORY_PROPAGATION_SCOPE_OWN_HISTORY:
return OWN_HISTORY;
case HISTORY_PROPAGATION_SCOPE_LINEAGE:
return LINEAGE;
default:
return NONE;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.durabletask;

import io.dapr.durabletask.implementation.protobuf.HistoryEvents;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Represents propagated execution history from a parent workflow to a child workflow or activity.
* Provides query methods for inspecting ancestor execution history.
*/
public final class PropagatedHistory {
private final HistoryPropagationScope scope;
private final List<PropagatedHistoryChunk> chunks;

PropagatedHistory(HistoryPropagationScope scope, List<PropagatedHistoryChunk> chunks) {
this.scope = scope;
this.chunks = Collections.unmodifiableList(new ArrayList<>(chunks));
}

static PropagatedHistory fromProto(HistoryEvents.PropagatedHistory proto) {
List<PropagatedHistoryChunk> chunks = proto.getChunksList().stream()
.map(PropagatedHistoryChunk::fromProto)
.collect(Collectors.toList());
HistoryPropagationScope scope = HistoryPropagationScope.fromProto(proto.getScope());
return new PropagatedHistory(scope, chunks);
}

/**
* Gets the raw history events that were propagated, flattened across all chunks
* in chunk order.
*
* @return an unmodifiable list of history events
*/
public List<HistoryEvents.HistoryEvent> getEvents() {
List<HistoryEvents.HistoryEvent> all = new ArrayList<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
all.addAll(chunk.getEvents());
}
return Collections.unmodifiableList(all);
}

/**
* Gets the propagation scope that was used to produce this history.
*
* @return the history propagation scope
*/
public HistoryPropagationScope getScope() {
return this.scope;
}

/**
* Gets the workflow chunks identifying which app/workflow produced which events.
*
* @return an unmodifiable list of history chunks
*/
public List<PropagatedHistoryChunk> getWorkflows() {
return this.chunks;
}

/**
* Gets a deduplicated, ordered list of app IDs in the propagation chain.
*
* @return list of app IDs in the order they appear in chunks
*/
public List<String> getAppIDs() {
Set<String> seen = new LinkedHashSet<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
if (chunk.getAppId() != null && !chunk.getAppId().isEmpty()) {
seen.add(chunk.getAppId());
}
}
return new ArrayList<>(seen);
}

/**
* Gets the first workflow chunk matching the given workflow name.
* Returns the last match (most recent occurrence) if multiple exist.
*
* @param name the workflow name to search for
* @return an Optional containing the matching chunk, or empty if not found
*/
public Optional<PropagatedHistoryChunk> getWorkflowByName(String name) {
List<PropagatedHistoryChunk> matches = getWorkflowsByName(name);
if (matches.isEmpty()) {
return Optional.empty();
}
return Optional.of(matches.get(matches.size() - 1));
}

/**
* Gets all workflow chunks matching the given workflow name.
*
* @param name the workflow name to search for
* @return a list of matching chunks in execution order
*/
public List<PropagatedHistoryChunk> getWorkflowsByName(String name) {
return this.chunks.stream()
.filter(c -> name.equals(c.getWorkflowName()))
.collect(Collectors.toList());
}

/**
* Gets the history events produced by the given app ID.
*
* @param appId the app ID to filter by
* @return a list of history events from the specified app
*/
public List<HistoryEvents.HistoryEvent> getEventsByAppID(String appId) {
List<HistoryEvents.HistoryEvent> result = new ArrayList<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
if (appId.equals(chunk.getAppId())) {
result.addAll(chunk.getEvents());
}
}
return result;
}

/**
* Gets the history events produced by the given workflow instance ID.
*
* @param instanceId the instance ID to filter by
* @return a list of history events from the specified instance
*/
public List<HistoryEvents.HistoryEvent> getEventsByInstanceID(String instanceId) {
List<HistoryEvents.HistoryEvent> result = new ArrayList<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
if (instanceId.equals(chunk.getInstanceId())) {
result.addAll(chunk.getEvents());
}
}
return result;
}

/**
* Gets the history events produced by the given workflow name.
*
* @param workflowName the workflow name to filter by
* @return a list of history events from the specified workflow
*/
public List<HistoryEvents.HistoryEvent> getEventsByWorkflowName(String workflowName) {
List<HistoryEvents.HistoryEvent> result = new ArrayList<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
if (workflowName.equals(chunk.getWorkflowName())) {
result.addAll(chunk.getEvents());
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.durabletask;

import com.google.protobuf.InvalidProtocolBufferException;
import io.dapr.durabletask.implementation.protobuf.HistoryEvents;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Represents a chunk of propagated history events from a specific workflow instance.
* Each chunk owns the raw event bytes its producer signed; receivers parse them
* into typed history events on demand.
*/
public final class PropagatedHistoryChunk {
private final String appId;
private final String instanceId;
private final String workflowName;
private final List<HistoryEvents.HistoryEvent> events;

PropagatedHistoryChunk(String appId,
String instanceId,
String workflowName,
List<HistoryEvents.HistoryEvent> events) {
this.appId = appId;
this.instanceId = instanceId;
this.workflowName = workflowName;
this.events = Collections.unmodifiableList(new ArrayList<>(events));
}

static PropagatedHistoryChunk fromProto(HistoryEvents.PropagatedHistoryChunk proto) {
List<HistoryEvents.HistoryEvent> parsed = new ArrayList<>(proto.getRawEventsCount());
for (int i = 0; i < proto.getRawEventsCount(); i++) {
try {
parsed.add(HistoryEvents.HistoryEvent.parseFrom(proto.getRawEvents(i)));
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
"Failed to parse raw event " + i + " in chunk for app " + proto.getAppId(), e);
}
}
return new PropagatedHistoryChunk(
proto.getAppId(),
proto.getInstanceId(),
proto.getWorkflowName(),
parsed);
}

/**
* Gets the app ID that produced the events in this chunk.
*
* @return the app ID
*/
public String getAppId() {
return this.appId;
}

/**
* Gets the workflow instance ID that produced the events in this chunk.
*
* @return the instance ID
*/
public String getInstanceId() {
return this.instanceId;
}

/**
* Gets the workflow name that produced the events in this chunk.
*
* @return the workflow name
*/
public String getWorkflowName() {
return this.workflowName;
}

/**
* Gets the history events that were produced in this chunk.
*
* @return an unmodifiable list of history events
*/
public List<HistoryEvents.HistoryEvent> getEvents() {
return this.events;
}

/**
* Gets the number of events in this chunk.
*
* @return the event count
*/
public int getEventCount() {
return this.events.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.dapr.durabletask;

import java.util.Optional;

/**
* Interface that provides {@link TaskActivity} implementations with activity context, such as an activity's name and
* its input.
Expand Down Expand Up @@ -53,4 +55,11 @@ public interface TaskActivityContext {
* @return trace parent id
*/
String getTraceParent();

/**
* Gets the propagated history from a parent workflow, if any was propagated.
*
* @return an Optional containing the propagated history, or empty if none was propagated
*/
Optional<PropagatedHistory> getPropagatedHistory();
}
Loading
Loading