Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ dependencies {
runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")
}

runtimeOnly project(":sdks:java:io:mongodb")
runtimeOnly library.java.kafka_clients
runtimeOnly library.java.slf4j_jdk14

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Configuration class for the MongoDB Write transform. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class MongoDbWriteSchemaTransformConfiguration implements Serializable {

@SchemaFieldDescription("The connection URI for the MongoDB server.")
public abstract String getUri();

@SchemaFieldDescription("The MongoDB database to write to.")
public abstract String getDatabase();

@SchemaFieldDescription("The MongoDB collection to write to.")
public abstract String getCollection();

@SchemaFieldDescription("The number of documents to include in each batch write.")
@Nullable
public abstract Long getBatchSize();

@SchemaFieldDescription(
"This option specifies whether and where to output unwritable rows. Note: Error handling is currently limited to data conversion failures before sending to the MongoDB driver, as the underlying MongoDbIO does not yet support dead-letter queues for write failures.")
@Nullable
public abstract ErrorHandling getErrorHandling();

public void validate() {
checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified.");
checkArgument(
getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified.");
checkArgument(
getCollection() != null && !getCollection().isEmpty(),
"MongoDB collection must be specified.");

Long batchSize = getBatchSize();
if (batchSize != null) {
checkArgument(batchSize > 0, "Batch size must be positive.");
}
}

public static Builder builder() {
return new AutoValue_MongoDbWriteSchemaTransformConfiguration.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setUri(String uri);

public abstract Builder setDatabase(String database);

public abstract Builder setCollection(String collection);

public abstract Builder setBatchSize(Long batchSize);

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

public abstract MongoDbWriteSchemaTransformConfiguration build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;

import com.google.auto.service.AutoService;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.bson.Document;

/** An implementation of {@link TypedSchemaTransformProvider} for writing to MongoDB. */
@AutoService(SchemaTransformProvider.class)
public class MongoDbWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<MongoDbWriteSchemaTransformConfiguration> {

private static final String INPUT_TAG = "input";
public static final TupleTag<Document> OUTPUT_TAG = new TupleTag<Document>() {};
public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};

private static final org.apache.beam.sdk.metrics.Counter errorCounter =
org.apache.beam.sdk.metrics.Metrics.counter(
MongoDbWriteSchemaTransformProvider.class, "MongoDB-write-error-counter");

@Override
protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration configuration) {
return new MongoDbWriteSchemaTransform(configuration);
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:mongodb_write:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}

/** The {@link SchemaTransform} that performs the write operation. */
private static class MongoDbWriteSchemaTransform extends SchemaTransform {
private final MongoDbWriteSchemaTransformConfiguration configuration;

MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rows = input.get(INPUT_TAG);
org.apache.beam.sdk.schemas.Schema inputSchema = rows.getSchema();

boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
org.apache.beam.sdk.schemas.Schema errorSchema = ErrorHandling.errorSchema(inputSchema);

PCollectionTuple outputTuple =
rows.apply(
"ConvertToDocument",
ParDo.of(new RowToBsonDocumentFn(handleErrors, errorSchema))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));

PCollection<Document> documents = outputTuple.get(OUTPUT_TAG);

MongoDbIO.Write write =
MongoDbIO.write()
.withUri(configuration.getUri())
.withDatabase(configuration.getDatabase())
.withCollection(configuration.getCollection());

Long batchSize = configuration.getBatchSize();
if (batchSize != null) {
write = write.withBatchSize(batchSize);
}

documents.apply("WriteToMongo", write);

PCollection<Row> errorOutput =
outputTuple.get(ERROR_TAG).setRowSchema(ErrorHandling.errorSchema(errorSchema));

ErrorHandling errorHandling = configuration.getErrorHandling();
return PCollectionRowTuple.of(
(handleErrors && errorHandling != null) ? errorHandling.getOutput() : "errors",
errorOutput);
}
}

/** Converts a Beam {@link Row} to a BSON {@link Document}. */
static class RowToBsonDocumentFn extends DoFn<Row, Document> {
private final boolean handleErrors;
private final org.apache.beam.sdk.schemas.Schema errorSchema;

RowToBsonDocumentFn(boolean handleErrors, org.apache.beam.sdk.schemas.Schema errorSchema) {
this.handleErrors = handleErrors;
this.errorSchema = errorSchema;
}

@ProcessElement
public void processElement(@Element Row row, MultiOutputReceiver receiver) {
try {
Document doc = new Document();
for (Field field : row.getSchema().getFields()) {
doc.append(field.getName(), row.getValue(field.getName()));
}
receiver.get(OUTPUT_TAG).output(doc);
} catch (Exception e) {
if (!handleErrors) {
throw new RuntimeException(e);
}
errorCounter.inc();
receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, row, e));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;

import java.util.Collections;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTagList;
import org.bson.Document;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link MongoDbWriteSchemaTransformProvider}. */
@RunWith(JUnit4.class)
public class MongoDbWriteSchemaTransformProviderTest {

@Rule public transient TestPipeline p = TestPipeline.create();

@Test
public void testInvalidConfigMissingUri() {
assertThrows(
IllegalStateException.class,
() -> {
MongoDbWriteSchemaTransformConfiguration.builder()
.setDatabase("db")
.setCollection("col")
.build()
.validate();
});
}

@Test
public void testInvalidConfigMissingDatabase() {
assertThrows(
IllegalStateException.class,
() -> {
MongoDbWriteSchemaTransformConfiguration.builder()
.setUri("mongodb://localhost:27017")
.setCollection("col")
.build()
.validate();
});
}

@Test
public void testInvalidConfigMissingCollection() {
assertThrows(
IllegalStateException.class,
() -> {
MongoDbWriteSchemaTransformConfiguration.builder()
.setUri("mongodb://localhost:27017")
.setDatabase("db")
.build()
.validate();
});
}

@Test
public void testInvalidConfigNegativeBatchSize() {
assertThrows(
IllegalArgumentException.class,
() -> {
MongoDbWriteSchemaTransformConfiguration.builder()
.setUri("mongodb://localhost:27017")
.setDatabase("db")
.setCollection("col")
.setBatchSize(-1L)
.build()
.validate();
});
}

@Test
public void testConfigurationSchema() throws Exception {
Schema schema =
SchemaRegistry.createDefault().getSchema(MongoDbWriteSchemaTransformConfiguration.class);

// We expect 5 fields now (uri, database, collection, batchSize, errorHandling)
assertEquals(5, schema.getFieldCount());
assertNotNull(schema.getField("uri"));
assertNotNull(schema.getField("database"));
assertNotNull(schema.getField("collection"));
assertNotNull(schema.getField("batchSize"));
assertNotNull(schema.getField("errorHandling"));
}

@Test
public void testRowToBsonDocumentFn() {
Schema beamSchema =
Schema.builder()
.addStringField("name")
.addInt32Field("age")
.addNullableStringField("country")
.build();

Row row =
Row.withSchema(beamSchema)
.withFieldValue("name", "John")
.withFieldValue("age", 30)
.withFieldValue("country", null)
.build();

PCollection<Row> inputRows =
p.apply(Create.of(Collections.singletonList(row))).setRowSchema(beamSchema);

Schema errorSchema = ErrorHandling.errorSchema(beamSchema);
PCollectionTuple outputTuple =
inputRows.apply(
"ConvertToDocument",
ParDo.of(
new MongoDbWriteSchemaTransformProvider.RowToBsonDocumentFn(false, errorSchema))
.withOutputTags(
MongoDbWriteSchemaTransformProvider.OUTPUT_TAG,
TupleTagList.of(MongoDbWriteSchemaTransformProvider.ERROR_TAG)));

PCollection<Document> bsonDocuments =
outputTuple.get(MongoDbWriteSchemaTransformProvider.OUTPUT_TAG);

outputTuple.get(MongoDbWriteSchemaTransformProvider.ERROR_TAG).setRowSchema(errorSchema);

PAssert.that(bsonDocuments)
.satisfies(
documents -> {
Document doc = documents.iterator().next();
assertEquals("John", doc.get("name"));
assertEquals(30, doc.get("age"));
// The RowToBsonDocumentFn retains nulls explicitly in the BSON document
assertEquals(null, doc.get("country"));
Comment thread
derrickaw marked this conversation as resolved.
return null;
});

p.run().waitUntilFinish();
}
}
Loading
Loading