Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/content/append-table/blob.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields,
some BLOB fields in <code>.blob</code> files and some as descriptor references.
</td>
</tr>
<tr>
<td><h5>blob-write-null-on-missing-file</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
When enabled for Flink writes, if a descriptor BLOB value references a file that does not exist, Paimon writes <code>NULL</code> for that value and logs a warning instead of failing when reading the descriptor.
</td>
</tr>
<tr>
<td><h5>blob-view-field</h5></td>
<td>No</td>
Expand Down
13 changes: 13 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,15 @@ public InlineElement getDescription() {
.withDescription(
"Write blob field using blob descriptor rather than blob bytes.");

public static final ConfigOption<Boolean> BLOB_WRITE_NULL_ON_MISSING_FILE =
key("blob-write-null-on-missing-file")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to write NULL for a descriptor BLOB value when the "
+ "referenced file does not exist during Flink writes. When "
+ "false, the write fails when the descriptor is read.");

@Immutable
public static final ConfigOption<String> BLOB_EXTERNAL_STORAGE_PATH =
key("blob-external-storage-path")
Expand Down Expand Up @@ -3789,6 +3798,10 @@ public boolean blobAsDescriptor() {
return options.get(BLOB_AS_DESCRIPTOR);
}

public boolean blobWriteNullOnMissingFile() {
return options.get(BLOB_WRITE_NULL_ON_MISSING_FILE);
}

public boolean postponeBatchWriteFixedBucket() {
return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public FileUriReader(FileIO fileIO) {
public SeekableInputStream newInputStream(String uri) throws IOException {
return fileIO.newInputStream(new Path(uri));
}

public boolean exists(String uri) throws IOException {
return fileIO.exists(new Path(uri));
}
}

/** A {@link UriReader} reads http uri. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public UriReader create(String input) {
return readers.computeIfAbsent(key, k -> newReader(k, uri));
}

public boolean exists(String input) throws IOException {
UriReader reader = create(input);
return !(reader instanceof UriReader.FileUriReader)
|| ((UriReader.FileUriReader) reader).exists(input);
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
this.readers = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.paimon.utils.UriReader.HttpUriReader;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Files;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -33,6 +36,8 @@ public class UriReaderFactoryTest {
private final UriReaderFactory factory =
new UriReaderFactory(CatalogContext.create(new Options()));

@TempDir java.nio.file.Path tempPath;

@Test
public void testCreateHttpUriReader() {
UriReader reader = factory.create("http://example.com/file.txt");
Expand Down Expand Up @@ -78,6 +83,20 @@ public void testCreateUriReaderWithLocalPath() {
assertThat(reader).isInstanceOf(FileUriReader.class);
}

@Test
public void testExistsUsesCachedFileUriReader() throws Exception {
java.nio.file.Path file = tempPath.resolve("file.txt");
Files.write(file, new byte[] {1});

assertThat(factory.exists(file.toUri().toString())).isTrue();
assertThat(factory.exists(tempPath.resolve("missing.txt").toUri().toString())).isFalse();
}

@Test
public void testExistsSkipsHttpUriReader() throws Exception {
assertThat(factory.exists("https://example.com/missing.txt")).isTrue();
}

@Test
public void testReadersReinitializedAfterDeserialization() throws Exception {
UriReaderFactory deserializedFactory = InstantiationUtil.clone(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
Expand All @@ -36,23 +37,38 @@
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;

/** Convert from Flink row data. */
public class FlinkRowWrapper implements InternalRow {

private static final Logger LOG = LoggerFactory.getLogger(FlinkRowWrapper.class);

private final org.apache.flink.table.data.RowData row;
private final UriReaderFactory uriReaderFactory;
private final boolean checkBlobDescriptorExists;

public FlinkRowWrapper(org.apache.flink.table.data.RowData row) {
this(row, null);
}

public FlinkRowWrapper(org.apache.flink.table.data.RowData row, CatalogContext catalogContext) {
this(row, catalogContext, false);
}

public FlinkRowWrapper(
org.apache.flink.table.data.RowData row,
CatalogContext catalogContext,
boolean checkBlobDescriptorExists) {
this.row = row;
this.uriReaderFactory = new UriReaderFactory(catalogContext);
this.checkBlobDescriptorExists = checkBlobDescriptorExists;
}

@Override
Expand All @@ -72,7 +88,10 @@ public void setRowKind(RowKind kind) {

@Override
public boolean isNullAt(int pos) {
return row.isNullAt(pos);
if (row.isNullAt(pos)) {
return true;
}
return checkBlobDescriptorExists && isMissingBlobDescriptor(pos, row.getBinary(pos));
}

@Override
Expand Down Expand Up @@ -139,7 +158,49 @@ public Variant getVariant(int pos) {

@Override
public Blob getBlob(int pos) {
return Blob.fromBytes(row.getBinary(pos), uriReaderFactory, null);
byte[] bytes = row.getBinary(pos);
if (isMissingBlobDescriptor(pos, bytes)) {
return null;
}
return Blob.fromBytes(bytes, uriReaderFactory, null);
}

private boolean isMissingBlobDescriptor(int pos, byte[] bytes) {
if (!checkBlobDescriptorExists
|| bytes == null
|| !BlobDescriptor.isBlobDescriptor(bytes)) {
return false;
}

BlobDescriptor descriptor = BlobDescriptor.deserialize(bytes);
return !descriptorFileExists(pos, descriptor);
}

private boolean descriptorFileExists(int pos, BlobDescriptor descriptor) {
try {
boolean exists = uriReaderFactory.exists(descriptor.uri());
if (!exists) {
LOG.warn(
"Blob descriptor file {} does not exist, returning NULL for BLOB field at position {}.",
descriptor.uri(),
pos);
}
return exists;
} catch (IOException e) {
LOG.warn(
"Failed to check blob descriptor file {} for BLOB field at position {}.",
descriptor.uri(),
pos,
e);
throw new RuntimeException(e);
} catch (RuntimeException e) {
LOG.warn(
"Failed to check blob descriptor file {} for BLOB field at position {}.",
descriptor.uri(),
pos,
e);
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ public DataStreamSink<?> build() {
table.coreOptions().toConfiguration());

DataStream<InternalRow> input =
mapToInternalRow(this.input, table.rowType(), contextForDescriptor);
mapToInternalRow(
this.input,
table.rowType(),
contextForDescriptor,
table.coreOptions().blobWriteNullOnMissingFile());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
SingleOutputStreamOperator<InternalRow> newInput =
input.forward()
Expand Down Expand Up @@ -246,10 +250,22 @@ public static DataStream<InternalRow> mapToInternalRow(
DataStream<RowData> input,
org.apache.paimon.types.RowType rowType,
CatalogContext catalogContext) {
return mapToInternalRow(input, rowType, catalogContext, false);
}

public static DataStream<InternalRow> mapToInternalRow(
DataStream<RowData> input,
org.apache.paimon.types.RowType rowType,
CatalogContext catalogContext,
boolean checkBlobDescriptorExists) {
SingleOutputStreamOperator<InternalRow> result =
input.map(
(MapFunction<RowData, InternalRow>)
r -> new FlinkRowWrapper(r, catalogContext))
r ->
new FlinkRowWrapper(
r,
catalogContext,
checkBlobDescriptorExists))
.returns(
org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(
rowType));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.options.Options;

import org.apache.flink.table.data.GenericRowData;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Files;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link FlinkRowWrapper}. */
public class FlinkRowWrapperTest {

@TempDir java.nio.file.Path tempPath;

@Test
public void testMissingBlobDescriptorIsNullWhenCheckingEnabled() {
java.nio.file.Path missing = tempPath.resolve("missing.blob");
GenericRowData row = descriptorRow(missing, 1);

FlinkRowWrapper wrapper = wrapper(row, true);

assertThat(wrapper.isNullAt(0)).isTrue();
assertThat(wrapper.getBlob(0)).isNull();
}

@Test
public void testExistingBlobDescriptorIsReadableWhenCheckingEnabled() throws Exception {
byte[] bytes = new byte[] {1, 2, 3};
java.nio.file.Path blobFile = tempPath.resolve("existing.blob");
Files.write(blobFile, bytes);
GenericRowData row = descriptorRow(blobFile, bytes.length);

FlinkRowWrapper wrapper = wrapper(row, true);

assertThat(wrapper.isNullAt(0)).isFalse();
assertThat(wrapper.getBlob(0).toData()).isEqualTo(bytes);
}

@Test
public void testMissingBlobDescriptorUsesDefaultBehaviorWithoutChecking() {
java.nio.file.Path missing = tempPath.resolve("missing.blob");
GenericRowData row = descriptorRow(missing, 1);

FlinkRowWrapper wrapper = wrapper(row, false);
Blob blob = wrapper.getBlob(0);

assertThat(wrapper.isNullAt(0)).isFalse();
assertThat(blob).isNotNull();
}

private GenericRowData descriptorRow(java.nio.file.Path path, long length) {
return GenericRowData.of(
new BlobDescriptor(path.toUri().toString(), 0, length).serialize());
}

private FlinkRowWrapper wrapper(GenericRowData row, boolean checkBlobDescriptorExists) {
return new FlinkRowWrapper(
row, CatalogContext.create(new Options()), checkBlobDescriptorExists);
}
}
Loading