Skip to content

Commit 11cc79e

Browse files
authored
Allow SpannerIO.readChangeStream() to query & union a list of change stream TVF's (#38167)
* WIP * trigger: regenerate codecov report
1 parent f87ee76 commit 11cc79e

26 files changed

Lines changed: 934 additions & 175 deletions

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,6 +1746,8 @@ public abstract static class ReadChangeStream
17461746

17471747
abstract String getChangeStreamName();
17481748

1749+
abstract @Nullable List<String> getTvfNameList();
1750+
17491751
abstract @Nullable String getMetadataInstance();
17501752

17511753
abstract @Nullable String getMetadataDatabase();
@@ -1783,6 +1785,8 @@ abstract static class Builder {
17831785

17841786
abstract Builder setChangeStreamName(String changeStreamName);
17851787

1788+
abstract Builder setTvfNameList(List<String> tvfNameList);
1789+
17861790
abstract Builder setMetadataInstance(String metadataInstance);
17871791

17881792
abstract Builder setMetadataDatabase(String metadataDatabase);
@@ -1861,6 +1865,11 @@ public ReadChangeStream withChangeStreamName(String changeStreamName) {
18611865
return toBuilder().setChangeStreamName(changeStreamName).build();
18621866
}
18631867

1868+
/** Specifies the list of TVF names to query and union. */
1869+
public ReadChangeStream withTvfNameList(List<String> tvfNameList) {
1870+
return toBuilder().setTvfNameList(tvfNameList).build();
1871+
}
1872+
18641873
/** Specifies the metadata database. */
18651874
public ReadChangeStream withMetadataInstance(String metadataInstance) {
18661875
return toBuilder().setMetadataInstance(metadataInstance).build();
@@ -2042,6 +2051,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20422051
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
20432052
? MAX_INCLUSIVE_END_AT
20442053
: getInclusiveEndAt();
2054+
final List<String> tvfNameList = getTvfNameList();
20452055
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
20462056
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
20472057
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
@@ -2051,10 +2061,24 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20512061
isMutableChangeStream(
20522062
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
20532063
LOG.info("The change stream {} is mutable: {}", changeStreamName, isMutableChangeStream);
2064+
List<String> quoteEscapedTvfNameList = null;
2065+
if (tvfNameList != null && !tvfNameList.isEmpty()) {
2066+
if (!isMutableChangeStream) {
2067+
throw new IllegalArgumentException(
2068+
"tvfNameList is only supported for change streams with MUTABLE_KEY_RANGE mode");
2069+
}
2070+
// TODO: if !per_placement_tvf=true, throw exception.
2071+
quoteEscapedTvfNameList = new ArrayList<>();
2072+
for (String tvfName : tvfNameList) {
2073+
quoteEscapedTvfNameList.add(escapeQuotes(tvfName));
2074+
}
2075+
checkTvfExistence(spannerAccessor.getDatabaseClient(), quoteEscapedTvfNameList);
2076+
}
20542077
final DaoFactory daoFactory =
20552078
new DaoFactory(
20562079
changeStreamSpannerConfig,
20572080
changeStreamName,
2081+
quoteEscapedTvfNameList,
20582082
partitionMetadataSpannerConfig,
20592083
partitionMetadataTableNames,
20602084
rpcPriority,
@@ -2754,6 +2778,56 @@ static String resolveSpannerProjectId(SpannerConfig config) {
27542778
: config.getProjectId().get();
27552779
}
27562780

2781+
@VisibleForTesting
2782+
static String escapeQuotes(String str) {
2783+
return str.replace("'", "").replace("\"", "").replace("`", "");
2784+
}
2785+
2786+
@VisibleForTesting
2787+
static void checkTvfExistence(
2788+
DatabaseClient databaseClient, List<String> quoteEscapedTvfNameList) {
2789+
if (quoteEscapedTvfNameList == null || quoteEscapedTvfNameList.isEmpty()) {
2790+
return;
2791+
}
2792+
Dialect dialect = databaseClient.getDialect();
2793+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
2794+
StringBuilder sql =
2795+
new StringBuilder(
2796+
"SELECT routine_name FROM information_schema.routines WHERE routine_type LIKE '%FUNCTION' AND routine_name IN (");
2797+
for (int i = 0; i < quoteEscapedTvfNameList.size(); i++) {
2798+
if (dialect == Dialect.POSTGRESQL) {
2799+
sql.append("$").append(i + 1);
2800+
} else {
2801+
sql.append("@p").append(i);
2802+
}
2803+
if (i < quoteEscapedTvfNameList.size() - 1) {
2804+
sql.append(", ");
2805+
}
2806+
}
2807+
sql.append(")");
2808+
Statement.Builder builder = Statement.newBuilder(sql.toString());
2809+
for (int i = 0; i < quoteEscapedTvfNameList.size(); i++) {
2810+
if (dialect == Dialect.POSTGRESQL) {
2811+
builder.bind("p" + (i + 1)).to(quoteEscapedTvfNameList.get(i));
2812+
} else {
2813+
builder.bind("p" + i).to(quoteEscapedTvfNameList.get(i));
2814+
}
2815+
}
2816+
Statement statement = builder.build();
2817+
ResultSet resultSet = tx.executeQuery(statement);
2818+
java.util.Set<String> foundNames = new java.util.HashSet<>();
2819+
while (resultSet.next()) {
2820+
foundNames.add(resultSet.getString(0));
2821+
}
2822+
for (String tvfName : quoteEscapedTvfNameList) {
2823+
if (!foundNames.contains(tvfName)) {
2824+
throw new IllegalArgumentException(
2825+
"TVF specified: " + tvfName + " is not found in the existing TVF's: " + foundNames);
2826+
}
2827+
}
2828+
}
2829+
}
2830+
27572831
@VisibleForTesting
27582832
static boolean isMutableChangeStream(
27592833
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.Timestamp;
2121
import com.google.cloud.spanner.Options.RpcPriority;
2222
import java.util.Collections;
23+
import java.util.List;
2324
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
2425
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
2526
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
@@ -64,6 +65,18 @@ public class ChangeStreamsConstants {
6465
/** The sliding window size in seconds for throughput reporting. */
6566
public static final int THROUGHPUT_WINDOW_SECONDS = 10;
6667

68+
/**
69+
* The delimiter used to separate the partition token and the tvf name. Note this string does not
70+
* exist in the partition token itself.
71+
*/
72+
public static final String PARTITION_TOKEN_TVF_NAME_DELIMITER = "#";
73+
74+
/** The default tvf name for a change stream query is the empty {@link String}. */
75+
public static final String DEFAULT_TVF_NAME = "";
76+
77+
/** The default tvf name list to query and union is empty {@link Collections.emptyList()}. */
78+
public static final List<String> DEFAULT_TVF_NAME_LIST = Collections.emptyList();
79+
6780
/**
6881
* We use the following partition token to provide an estimate size of a partition token. A usual
6982
* partition token has around 140 characters.
@@ -85,6 +98,7 @@ public class ChangeStreamsConstants {
8598
.setState(State.CREATED)
8699
.setWatermark(Timestamp.now())
87100
.setCreatedAt(Timestamp.now())
101+
.setTvfName(DEFAULT_TVF_NAME)
88102
.build();
89103

90104
/**

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,17 @@ private void processChildPartition(
155155
record.getStartTimestamp(),
156156
partition.getEndTimestamp(),
157157
partition.getHeartbeatMillis(),
158+
partition.getTvfName(),
158159
childPartition);
159160
LOG.debug("[{}] Inserting child partition token {}", partitionToken, childPartitionToken);
160161
final Boolean insertedRow =
161162
partitionMetadataDao
162163
.runInTransaction(
163164
transaction -> {
164-
if (transaction.getPartition(childPartitionToken) == null) {
165+
if (transaction.getPartition(
166+
PartitionMetadataDao.composePartitionTokenWithTvfName(
167+
childPartitionToken, partition.getTvfName()))
168+
== null) {
165169
transaction.insert(row);
166170
return true;
167171
} else {
@@ -188,13 +192,15 @@ private PartitionMetadata toPartitionMetadata(
188192
Timestamp startTimestamp,
189193
Timestamp endTimestamp,
190194
long heartbeatMillis,
195+
String tvfName,
191196
ChildPartition childPartition) {
192197
return PartitionMetadata.newBuilder()
193198
.setPartitionToken(childPartition.getToken())
194199
.setParentTokens(childPartition.getParentTokens())
195200
.setStartTimestamp(startTimestamp)
196201
.setEndTimestamp(endTimestamp)
197202
.setHeartbeatMillis(heartbeatMillis)
203+
.setTvfName(tvfName)
198204
.setState(CREATED)
199205
.setWatermark(startTimestamp)
200206
.build();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,14 @@ private ProcessContinuation schedulePartitions(
172172
}
173173

174174
private Timestamp updateBatchToScheduled(List<PartitionMetadata> batchPartitions) {
175-
final List<String> batchPartitionTokens =
175+
final List<String> batchComposedPartitionTokens =
176176
batchPartitions.stream()
177-
.map(PartitionMetadata::getPartitionToken)
177+
.map(
178+
partition ->
179+
PartitionMetadataDao.composePartitionTokenWithTvfName(
180+
partition.getPartitionToken(), partition.getTvfName()))
178181
.collect(Collectors.toList());
179-
return dao.updateToScheduled(batchPartitionTokens);
182+
return dao.updateToScheduled(batchComposedPartitionTokens);
180183
}
181184

182185
private void outputBatch(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ private void processStartPartition(
138138
.setStartTimestamp(record.getStartTimestamp())
139139
.setEndTimestamp(partition.getEndTimestamp())
140140
.setHeartbeatMillis(partition.getHeartbeatMillis())
141+
.setTvfName(partition.getTvfName())
141142
.setState(CREATED)
142143
.setWatermark(record.getStartTimestamp())
143144
.build();
@@ -146,7 +147,10 @@ private void processStartPartition(
146147
partitionMetadataDao
147148
.runInTransaction(
148149
transaction -> {
149-
if (transaction.getPartition(startPartitionToken) == null) {
150+
if (transaction.getPartition(
151+
PartitionMetadataDao.composePartitionTokenWithTvfName(
152+
startPartitionToken, partition.getTvfName()))
153+
== null) {
150154
transaction.insert(row);
151155
return true;
152156
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,14 @@ public ProcessContinuation run(
185185
ManualWatermarkEstimator<Instant> watermarkEstimator,
186186
BundleFinalizer bundleFinalizer) {
187187
final String token = partition.getPartitionToken();
188+
final String tvfName = partition.getTvfName();
188189

189190
// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
190191
// ReadChangeStreamPartitionDoFn#processElement is called
191192
final PartitionMetadata updatedPartition =
192-
Optional.ofNullable(partitionMetadataDao.getPartition(token))
193+
Optional.ofNullable(
194+
partitionMetadataDao.getPartition(
195+
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName)))
193196
.map(partitionMetadataMapper::from)
194197
.orElseThrow(
195198
() ->
@@ -223,7 +226,11 @@ public ProcessContinuation run(
223226

224227
try (ChangeStreamResultSet resultSet =
225228
changeStreamDao.changeStreamQuery(
226-
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
229+
token,
230+
tvfName,
231+
startTimestamp,
232+
changeStreamQueryEndTimestamp,
233+
partition.getHeartbeatMillis())) {
227234

228235
metrics.incQueryCounter();
229236
while (resultSet.next()) {
@@ -298,7 +305,9 @@ public ProcessContinuation run(
298305
LOG.debug("[{}] Continuation present, returning {}", token, maybeContinuation);
299306
bundleFinalizer.afterBundleCommit(
300307
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
301-
updateWatermarkCallback(token, watermarkEstimator));
308+
updateWatermarkCallback(
309+
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName),
310+
watermarkEstimator));
302311
return maybeContinuation.get();
303312
}
304313
}
@@ -341,7 +350,9 @@ public ProcessContinuation run(
341350
}
342351
bundleFinalizer.afterBundleCommit(
343352
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
344-
updateWatermarkCallback(token, watermarkEstimator));
353+
updateWatermarkCallback(
354+
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName),
355+
watermarkEstimator));
345356
LOG.debug("[{}] Rescheduling partition to resume reading", token);
346357
return ProcessContinuation.resume();
347358
}
@@ -361,25 +372,27 @@ public ProcessContinuation run(
361372
LOG.debug("[{}] Finishing partition", token);
362373
// TODO: This should be performed after the commit succeeds. Since bundle finalizers are not
363374
// guaranteed to be called, this needs to be performed in a subsequent fused stage.
364-
partitionMetadataDao.updateToFinished(token);
375+
partitionMetadataDao.updateToFinished(
376+
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName));
365377
metrics.decActivePartitionReadCounter();
366378
LOG.info("[{}] After attempting to finish the partition", token);
367379
return ProcessContinuation.stop();
368380
}
369381

370382
private BundleFinalizer.Callback updateWatermarkCallback(
371-
String token, WatermarkEstimator<Instant> watermarkEstimator) {
383+
String composedToken, WatermarkEstimator<Instant> watermarkEstimator) {
372384
return () -> {
373385
final Instant watermark = watermarkEstimator.currentWatermark();
374-
LOG.debug("[{}] Updating current watermark to {}", token, watermark);
386+
LOG.debug("[{}] Updating current watermark to {}", composedToken, watermark);
375387
try {
376388
partitionMetadataDao.updateWatermark(
377-
token, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 1_000L));
389+
composedToken, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 1_000L));
378390
} catch (SpannerException e) {
379391
if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
380-
LOG.debug("[{}] Unable to update the current watermark, partition NOT FOUND", token);
392+
LOG.debug(
393+
"[{}] Unable to update the current watermark, partition NOT FOUND", composedToken);
381394
} else {
382-
LOG.error("[{}] Error updating the current watermark", token, e);
395+
LOG.error("[{}] Error updating the current watermark", composedToken, e);
383396
}
384397
}
385398
};

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
1919

20+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_TVF_NAME;
21+
2022
import com.google.cloud.Timestamp;
2123
import com.google.cloud.spanner.DatabaseClient;
2224
import com.google.cloud.spanner.Dialect;
@@ -74,6 +76,8 @@ public class ChangeStreamDao {
7476
* @param partitionToken the unique partition token to be queried. If {@link
7577
* InitialPartition#PARTITION_TOKEN} is given, null will be used in the change stream query
7678
* instead.
79+
* @param tvfName the name of the table-valued function to be used for the change stream query. If
80+
* null, the default global Change Stream TVF will be used.
7781
* @param startTimestamp the inclusive start time for the change stream query
7882
* @param endTimestamp the inclusive end time for the change stream query
7983
* @param heartbeatMillis the number of milliseconds after the stream is idle, which a heartbeat
@@ -83,6 +87,7 @@ public class ChangeStreamDao {
8387
*/
8488
public ChangeStreamResultSet changeStreamQuery(
8589
String partitionToken,
90+
String tvfName,
8691
Timestamp startTimestamp,
8792
Timestamp endTimestamp,
8893
long heartbeatMillis) {
@@ -95,10 +100,14 @@ public ChangeStreamResultSet changeStreamQuery(
95100
if (this.isPostgres()) {
96101
// Ensure we have determined whether change stream uses mutable key range
97102
if (this.isMutableChangeStream) {
98-
query =
99-
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
100-
+ changeStreamName
101-
+ "\"($1, $2, $3, $4, null)";
103+
if (tvfName == null || tvfName.equals(DEFAULT_TVF_NAME)) {
104+
query =
105+
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
106+
+ changeStreamName
107+
+ "\"($1, $2, $3, $4, null)";
108+
} else {
109+
query = "SELECT * FROM \"spanner\".\"" + tvfName + "\"($1, $2, $3, $4, null)";
110+
}
102111
} else {
103112
query =
104113
"SELECT * FROM \"spanner\".\"read_json_"
@@ -117,16 +126,29 @@ public ChangeStreamResultSet changeStreamQuery(
117126
.to(heartbeatMillis)
118127
.build();
119128
} else {
120-
query =
121-
"SELECT * FROM READ_"
122-
+ changeStreamName
123-
+ "("
124-
+ " start_timestamp => @startTimestamp,"
125-
+ " end_timestamp => @endTimestamp,"
126-
+ " partition_token => @partitionToken,"
127-
+ " read_options => null,"
128-
+ " heartbeat_milliseconds => @heartbeatMillis"
129-
+ ")";
129+
if (this.isMutableChangeStream && tvfName != null && !tvfName.equals(DEFAULT_TVF_NAME)) {
130+
query =
131+
"SELECT * FROM "
132+
+ tvfName
133+
+ "("
134+
+ " start_timestamp => @startTimestamp,"
135+
+ " end_timestamp => @endTimestamp,"
136+
+ " partition_token => @partitionToken,"
137+
+ " read_options => null,"
138+
+ " heartbeat_milliseconds => @heartbeatMillis"
139+
+ ")";
140+
} else {
141+
query =
142+
"SELECT * FROM READ_"
143+
+ changeStreamName
144+
+ "("
145+
+ " start_timestamp => @startTimestamp,"
146+
+ " end_timestamp => @endTimestamp,"
147+
+ " partition_token => @partitionToken,"
148+
+ " read_options => null,"
149+
+ " heartbeat_milliseconds => @heartbeatMillis"
150+
+ ")";
151+
}
130152
statement =
131153
Statement.newBuilder(query)
132154
.bind("startTimestamp")

0 commit comments

Comments
 (0)