Skip to content

Commit 42cdd61

Browse files
authored
Merge pull request #27774: Override encoding positions for nested schemas
2 parents 0c15645 + 4a8fc73 commit 42cdd61

2 files changed

Lines changed: 41 additions & 10 deletions

File tree

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.beam.runners.dataflow.util;
1919

2020
import java.io.IOException;
21-
import java.util.UUID;
22-
import javax.annotation.Nullable;
2321
import org.apache.beam.model.pipeline.v1.SchemaApi;
2422
import org.apache.beam.runners.core.construction.SdkComponents;
2523
import org.apache.beam.sdk.coders.RowCoder;
@@ -58,10 +56,7 @@ public RowCoder fromCloudObject(CloudObject cloudObject) {
5856
SchemaApi.Schema.Builder schemaBuilder = SchemaApi.Schema.newBuilder();
5957
JsonFormat.parser().merge(Structs.getString(cloudObject, SCHEMA), schemaBuilder);
6058
Schema schema = SchemaTranslation.schemaFromProto(schemaBuilder.build());
61-
@Nullable UUID uuid = schema.getUUID();
62-
if (schema.isEncodingPositionsOverridden() && uuid != null) {
63-
RowCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions());
64-
}
59+
SchemaCoderCloudObjectTranslator.overrideEncodingPositions(schema);
6560
return RowCoder.of(schema);
6661
} catch (IOException e) {
6762
throw new RuntimeException(e);

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import javax.annotation.Nullable;
2323
import org.apache.beam.model.pipeline.v1.SchemaApi;
2424
import org.apache.beam.runners.core.construction.SdkComponents;
25+
import org.apache.beam.sdk.coders.RowCoder;
2526
import org.apache.beam.sdk.schemas.Schema;
2627
import org.apache.beam.sdk.schemas.SchemaCoder;
2728
import org.apache.beam.sdk.schemas.SchemaTranslation;
2829
import org.apache.beam.sdk.transforms.SerializableFunction;
30+
import org.apache.beam.sdk.util.Preconditions;
2931
import org.apache.beam.sdk.util.SerializableUtils;
3032
import org.apache.beam.sdk.util.StringUtils;
3133
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -100,16 +102,50 @@ public SchemaCoder fromCloudObject(CloudObject cloudObject) {
100102
SchemaApi.Schema.Builder schemaBuilder = SchemaApi.Schema.newBuilder();
101103
JsonFormat.parser().merge(Structs.getString(cloudObject, SCHEMA), schemaBuilder);
102104
Schema schema = SchemaTranslation.schemaFromProto(schemaBuilder.build());
103-
@Nullable UUID uuid = schema.getUUID();
104-
if (schema.isEncodingPositionsOverridden() && uuid != null) {
105-
SchemaCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions());
106-
}
105+
overrideEncodingPositions(schema);
107106
return SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction);
108107
} catch (IOException e) {
109108
throw new RuntimeException(e);
110109
}
111110
}
112111

112+
static void overrideEncodingPositions(Schema schema) {
113+
@Nullable UUID uuid = schema.getUUID();
114+
if (schema.isEncodingPositionsOverridden() && uuid != null) {
115+
RowCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions());
116+
}
117+
schema.getFields().stream()
118+
.map(Schema.Field::getType)
119+
.forEach(SchemaCoderCloudObjectTranslator::overrideEncodingPositions);
120+
}
121+
122+
private static void overrideEncodingPositions(Schema.FieldType fieldType) {
123+
switch (fieldType.getTypeName()) {
124+
case ROW:
125+
overrideEncodingPositions(Preconditions.checkArgumentNotNull(fieldType.getRowSchema()));
126+
break;
127+
case ARRAY:
128+
case ITERABLE:
129+
overrideEncodingPositions(
130+
Preconditions.checkArgumentNotNull(fieldType.getCollectionElementType()));
131+
break;
132+
case MAP:
133+
overrideEncodingPositions(Preconditions.checkArgumentNotNull(fieldType.getMapKeyType()));
134+
overrideEncodingPositions(Preconditions.checkArgumentNotNull(fieldType.getMapValueType()));
135+
break;
136+
case LOGICAL_TYPE:
137+
Schema.LogicalType logicalType =
138+
Preconditions.checkArgumentNotNull(fieldType.getLogicalType());
139+
@Nullable Schema.FieldType argumentType = logicalType.getArgumentType();
140+
if (argumentType != null) {
141+
overrideEncodingPositions(argumentType);
142+
}
143+
overrideEncodingPositions(logicalType.getBaseType());
144+
break;
145+
default:
146+
}
147+
}
148+
113149
@Override
114150
public Class<? extends SchemaCoder> getSupportedClass() {
115151
return SchemaCoder.class;

0 commit comments

Comments
 (0)