Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker
@GuardedBy("this")
private long activeMetadataVersion;

@GuardedBy("metadataLock")
Comment thread
parveensania marked this conversation as resolved.
private WindmillEndpoints.Type activeMetadataType;

@GuardedBy("metadataLock")
private long pendingMetadataVersion;

Expand Down Expand Up @@ -141,6 +144,7 @@ private FanOutStreamingEngineWorkerHarness(
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
this.totalGetWorkBudget = totalGetWorkBudget;
this.activeMetadataVersion = Long.MIN_VALUE;
this.activeMetadataType = WindmillEndpoints.Type.UNKNOWN;
this.workCommitterFactory = workCommitterFactory;
}

Expand Down Expand Up @@ -271,7 +275,11 @@ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
synchronized (metadataLock) {
// Only process versions greater than what we currently have to prevent double processing of
// metadata. workerMetadataConsumer is single-threaded so we maintain ordering.
if (windmillEndpoints.version() > pendingMetadataVersion) {
// The endpoints are also consumed if the version is the same but the type of endpoints
// sent by the server has changed.
if (windmillEndpoints.version() > pendingMetadataVersion
|| (windmillEndpoints.version() == pendingMetadataVersion
Comment thread
parveensania marked this conversation as resolved.
Outdated
&& windmillEndpoints.type() != activeMetadataType)) {
Comment thread
parveensania marked this conversation as resolved.
Outdated
pendingMetadataVersion = windmillEndpoints.version();
workerMetadataConsumer.execute(() -> consumeWindmillWorkerEndpoints(windmillEndpoints));
}
Expand All @@ -288,11 +296,17 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
}
}

LOG.debug(
"Consuming new endpoints: {}. previous metadata version: {}, current metadata version: {}",
WindmillEndpoints.Type previousType;
synchronized (metadataLock) {
Comment thread
parveensania marked this conversation as resolved.
Outdated
previousType = activeMetadataType;
}
LOG.info(
"Consuming new endpoints: {}. previous metadata version: {}, current metadata version: {}, previous endpoint type: {}, current endpoint type: {}",
newWindmillEndpoints,
activeMetadataVersion,
newWindmillEndpoints.version());
newWindmillEndpoints.version(),
previousType,
newWindmillEndpoints.type());
closeStreamsNotIn(newWindmillEndpoints).join();
ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
Expand All @@ -305,6 +319,9 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
backends.set(newBackends);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
activeMetadataVersion = newWindmillEndpoints.version();
synchronized (metadataLock) {
activeMetadataType = newWindmillEndpoints.type();
}
}

/** Close the streams that are no longer valid asynchronously. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,34 @@
*/
@AutoValue
public abstract class WindmillEndpoints {
public enum Type {
UNKNOWN,
CLOUDPATH,
BORG,
Comment thread
parveensania marked this conversation as resolved.
Outdated
DIRECTPATH;

static Type fromProto(Windmill.WorkerMetadataResponse.EndpointType protoType) {
switch (protoType) {
case CLOUDPATH:
return CLOUDPATH;
case BORG:
return BORG;
case DIRECTPATH:
return DIRECTPATH;
default:
return UNKNOWN;
}
}
}

public static final int DEFAULT_WINDMILL_SERVICE_PORT = 443;
private static final Logger LOG = LoggerFactory.getLogger(WindmillEndpoints.class);
private static final WindmillEndpoints NO_ENDPOINTS =
WindmillEndpoints.builder()
.setVersion(Long.MAX_VALUE)
.setWindmillEndpoints(ImmutableSet.of())
.setGlobalDataEndpoints(ImmutableMap.of())
.setType(Type.UNKNOWN)
.build();

public static WindmillEndpoints none() {
Expand Down Expand Up @@ -75,6 +96,7 @@ public static WindmillEndpoints from(
.setVersion(workerMetadataResponseProto.getMetadataVersion())
.setGlobalDataEndpoints(globalDataServers)
.setWindmillEndpoints(windmillServers)
.setType(Type.fromProto(workerMetadataResponseProto.getEndpointType()))
.build();
}

Expand Down Expand Up @@ -138,6 +160,8 @@ private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address(
/** Version of the endpoints which increases with every modification. */
public abstract long version();

public abstract Type type();

/**
* Used by GetData GlobalDataRequest(s) to support Beam side inputs. Returns a map where the key
* is a global data tag and the value is the endpoint where the data associated with the global
Expand Down Expand Up @@ -221,6 +245,8 @@ public abstract static class Builder {
public abstract static class Builder {
public abstract Builder setVersion(long version);

public abstract Builder setType(Type type);

public abstract Builder setGlobalDataEndpoints(
ImmutableMap<String, WindmillEndpoints.Endpoint> globalDataServers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,10 @@ public static GrpcGetWorkerMetadataStream create(
private Optional<WindmillEndpoints> extractWindmillEndpointsFrom(
WorkerMetadataResponse response) {
synchronized (metadataLock) {
if (response.getMetadataVersion() > latestResponse.getMetadataVersion()) {
this.latestResponse = response;
this.latestResponseReceived = Instant.now();
return Optional.of(WindmillEndpoints.from(response));
} else {
// If the currentMetadataVersion is greater than or equal to one in the response, the
// response data is stale, and we do not want to do anything.
LOG.debug(
"Received metadata version={}; Current metadata version={}. "
+ "Skipping update because received stale metadata",
response.getMetadataVersion(),
latestResponse.getMetadataVersion());
}
this.latestResponse = response;
this.latestResponseReceived = Instant.now();
return Optional.of(WindmillEndpoints.from(response));
}

return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,48 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
}

@Test
public void testOnNewWorkerMetadata_consumesEndpointsOnConnectivityTypeChange()
Comment thread
parveensania marked this conversation as resolved.
Outdated
throws InterruptedException {
String workerToken = "workerToken1";

WorkerMetadataResponse firstWorkerMetadata =
WorkerMetadataResponse.newBuilder()
.setMetadataVersion(1)
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.CLOUDPATH)
.addWorkEndpoints(
WorkerMetadataResponse.Endpoint.newBuilder()
.setBackendWorkerToken(workerToken)
.build())
.putAllGlobalDataEndpoints(DEFAULT)
.build();
WorkerMetadataResponse secondWorkerMetadata =
WorkerMetadataResponse.newBuilder()
.setMetadataVersion(1)
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.DIRECTPATH)
.addWorkEndpoints(
WorkerMetadataResponse.Endpoint.newBuilder()
.setBackendWorkerToken(workerToken)
.build())
.putAllGlobalDataEndpoints(DEFAULT)
.build();

TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor());
fanOutStreamingEngineWorkProvider =
newFanOutStreamingEngineWorkerHarness(
GetWorkBudget.builder().setItems(1).setBytes(1).build(),
getWorkBudgetDistributor,
noOpProcessWorkItemFn());

fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);
Comment thread
parveensania marked this conversation as resolved.
Outdated
verify(getWorkBudgetDistributor, times(1)).distributeBudget(any(), any());
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
// Same metadata version, different endpoint type should still trigger consumption
fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata);
verify(getWorkBudgetDistributor, times(2)).distributeBudget(any(), any());
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
}

private static class WindmillServiceFakeStub
extends CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -196,45 +195,6 @@ public void testGetWorkerMetadata_consumesSubsequentResponseMetadata() {
.collect(Collectors.toList()));
}

@Test
public void testGetWorkerMetadata_doesNotConsumeResponseIfMetadataStale() {
WorkerMetadataResponse freshEndpoints =
WorkerMetadataResponse.newBuilder()
.setMetadataVersion(2)
.addAllWorkEndpoints(DIRECT_PATH_ENDPOINTS)
.putAllGlobalDataEndpoints(GLOBAL_DATA_ENDPOINTS)
.setExternalEndpoint(AUTHENTICATING_SERVICE)
.build();

TestWindmillEndpointsConsumer testWindmillEndpointsConsumer =
Mockito.spy(new TestWindmillEndpointsConsumer());
GetWorkerMetadataTestStub testStub =
new GetWorkerMetadataTestStub(new TestGetWorkMetadataRequestObserver());
stream = getWorkerMetadataTestStream(testStub, testWindmillEndpointsConsumer);
testStub.injectWorkerMetadata(freshEndpoints);

List<WorkerMetadataResponse.Endpoint> staleDirectPathEndpoints =
Lists.newArrayList(
WorkerMetadataResponse.Endpoint.newBuilder()
.setDirectEndpoint("staleWindmillEndpoint")
.build());
Map<String, WorkerMetadataResponse.Endpoint> staleGlobalDataEndpoints = new HashMap<>();
staleGlobalDataEndpoints.put(
"stale_global_data",
WorkerMetadataResponse.Endpoint.newBuilder().setDirectEndpoint("staleGlobalData").build());

testStub.injectWorkerMetadata(
WorkerMetadataResponse.newBuilder()
.setMetadataVersion(1)
.addAllWorkEndpoints(staleDirectPathEndpoints)
.putAllGlobalDataEndpoints(staleGlobalDataEndpoints)
.build());

// Should have ignored the stale update and only used initial.
verify(testWindmillEndpointsConsumer).accept(WindmillEndpoints.from(freshEndpoints));
verifyNoMoreInteractions(testWindmillEndpointsConsumer);
}

@Test
public void testGetWorkerMetadata_correctlyAddsAndRemovesStreamFromRegistry()
throws InterruptedException {
Expand Down
Loading