diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProvider.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProvider.java index 58feed85180f..faa275b56e0d 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProvider.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProvider.java @@ -32,6 +32,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.services.iceberg.IcebergFileIOProvider; import org.apache.nifi.services.iceberg.ProviderContext; +import software.amazon.awssdk.services.s3.model.StorageClass; import java.util.HashMap; import java.util.List; @@ -123,6 +124,16 @@ public class S3IcebergFileIOProvider extends AbstractControllerService implement .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); + static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() + .name("Storage Class") + .description(""" + Specifies the S3 storage class to use when writing objects. + Primarily intended for on-premises or S3-compatible object storage implementations.""") + .required(true) + .allowableValues(StorageClass.values()) + .defaultValue(String.valueOf(StorageClass.STANDARD)) + .build(); + private static final List PROPERTY_DESCRIPTORS = List.of( AUTHENTICATION_STRATEGY, ACCESS_KEY_ID, @@ -130,7 +141,8 @@ public class S3IcebergFileIOProvider extends AbstractControllerService implement SESSION_TOKEN, CLIENT_REGION, ENDPOINT_URL, - PATH_STYLE_ACCESS + PATH_STYLE_ACCESS, + STORAGE_CLASS ); private final Map standardProperties = new ConcurrentHashMap<>(); @@ -189,6 +201,8 @@ private Map getConfiguredProperties(final ConfigurationContext c final String pathStyleAccess = context.getProperty(PATH_STYLE_ACCESS).getValue(); contextProperties.put(S3FileIOProperties.PATH_STYLE_ACCESS, pathStyleAccess); + final String storageClass = context.getProperty(STORAGE_CLASS).getValue(); + contextProperties.put(S3FileIOProperties.WRITE_STORAGE_CLASS, storageClass); // HttpURLConnection Client Type avoids additional dependencies contextProperties.put(HttpClientProperties.CLIENT_TYPE, HttpClientProperties.CLIENT_TYPE_URLCONNECTION); diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/pom.xml index 37e3f4d26c31..2393c344297a 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/pom.xml +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/pom.xml @@ -50,6 +50,11 @@ org.apache.iceberg iceberg-parquet + + + org.apache.iceberg + iceberg-data + org.apache.parquet diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/IcebergRecordConverter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/IcebergRecordConverter.java new file mode 100644 index 000000000000..8d725792da1e --- /dev/null +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/IcebergRecordConverter.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg.parquet.io; + +import org.apache.iceberg.data.Record; + + +public interface IcebergRecordConverter { + void convertRecord(Record record); +} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetIcebergRowWriter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetIcebergRowWriter.java index 9c6046ef68b4..2df2cecc3ad6 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetIcebergRowWriter.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetIcebergRowWriter.java @@ -20,6 +20,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.TaskWriter; import org.apache.nifi.services.iceberg.IcebergRowWriter; +import org.apache.nifi.services.iceberg.parquet.io.recordconverters.IcebergParquetRecordWriterConverter; import java.io.IOException; import java.util.Objects; @@ -29,6 +30,8 @@ */ public class ParquetIcebergRowWriter implements IcebergRowWriter { private final TaskWriter writer; + private final IcebergRecordConverter icebergParquetRecordWriterConverter = + new IcebergParquetRecordWriterConverter(); public ParquetIcebergRowWriter(final TaskWriter writer) { this.writer = Objects.requireNonNull(writer, "Writer required"); @@ -36,6 +39,7 @@ public ParquetIcebergRowWriter(final TaskWriter writer) { @Override public void write(final Record row) throws IOException { + this.icebergParquetRecordWriterConverter.convertRecord(row); writer.write(row); } diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java index e5bc9a186578..1c5f25830d3d 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java @@ -20,6 +20,7 @@ import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; @@ -32,6 +33,7 @@ public class ParquetPartitionedWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; + private final InternalRecordWrapper recordWrapper; public ParquetPartitionedWriter( final PartitionSpec spec, @@ -43,11 +45,12 @@ public ParquetPartitionedWriter( ) { super(spec, FileFormat.PARQUET, appenderFactory, fileFactory, io, targetFileSize); this.partitionKey = new PartitionKey(spec, schema); + this.recordWrapper = new InternalRecordWrapper(schema.asStruct()); } @Override protected PartitionKey partition(final Record record) { - partitionKey.partition(record); + partitionKey.partition(recordWrapper.wrap(record)); return partitionKey; } } diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/recordconverters/IcebergParquetRecordWriterConverter.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/recordconverters/IcebergParquetRecordWriterConverter.java new file mode 100644 index 000000000000..3552b910e679 --- /dev/null +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/recordconverters/IcebergParquetRecordWriterConverter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg.parquet.io.recordconverters; + +import org.apache.iceberg.data.Record; +import org.apache.nifi.services.iceberg.parquet.io.IcebergRecordConverter; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class IcebergParquetRecordWriterConverter implements IcebergRecordConverter { + + private final Map, Function> typeHandlers = Map.of( + java.sql.Timestamp.class, (ts) -> ((java.sql.Timestamp) ts).toLocalDateTime(), + Record.class, (rec) -> { + convertRecord((Record) rec); + return rec; + }, + List.class, (list) -> { + convertList((List) list); + return list; + } + ); + + @Override + public void convertRecord(Record record) { + if (record == null) { + return; + } + + for (int i = 0; i < record.struct().fields().size(); i++) { + Object value = record.get(i); + if (value == null) { + continue; + } + + Function handler = findHandler(value.getClass()); + if (handler != null) { + record.set(i, handler.apply(value)); + } + } + } + + public void convertList(List list) { + if (list == null) { + return; + } + + for (Object element : list) { + if (element == null) { + continue; + } + + Function handler = findHandler(element.getClass()); + if (handler != null) { + handler.apply(element); + } + } + } + + private Function findHandler(Class clazz) { + if (typeHandlers.containsKey(clazz)) { + return typeHandlers.get(clazz); + } + + return typeHandlers.entrySet().stream() + .filter(entry -> entry.getKey().isAssignableFrom(clazz)) + .map(Map.Entry::getValue) + .findFirst() + .orElse(null); + } +} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/recordconverters/IcebergParquetRecordWriterConverterTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/recordconverters/IcebergParquetRecordWriterConverterTest.java new file mode 100644 index 000000000000..8a34baf0b7fb --- /dev/null +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/recordconverters/IcebergParquetRecordWriterConverterTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg.parquet.io.recordconverters; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; + +@ExtendWith(MockitoExtension.class) +class IcebergParquetRecordWriterConverterTest { + + private IcebergParquetRecordWriterConverter converter; + + @BeforeEach + void setUp() { + converter = new IcebergParquetRecordWriterConverter(); + } + + @Test + void testConvertTimestampToLocalDateTime() { + final Schema schema = new Schema( + Types.NestedField.required(1, "ts_field", Types.TimestampType.withoutZone()) + ); + final Timestamp timestamp = Timestamp.valueOf("2026-06-06 06:06:06"); + final Record record = GenericRecord.create(schema); + record.set(0, timestamp); + + converter.convertRecord(record); + + Object result = record.get(0); + assertInstanceOf(LocalDateTime.class, result); + assertEquals(timestamp.toLocalDateTime(), result); + } + + @Test + void testConvertNestedRecord() { + final Schema innerSchema = new Schema( + Types.NestedField.required(1, "inner_ts", Types.TimestampType.withoutZone()) + ); + final Schema outerSchema = new Schema( + Types.NestedField.required(2, "nested_record", innerSchema.asStruct()) + ); + + final Record innerRecord = GenericRecord.create(innerSchema); + final Timestamp timestamp = Timestamp.valueOf("2026-06-06 06:06:06"); + innerRecord.set(0, timestamp); + + final Record outerRecord = GenericRecord.create(outerSchema); + outerRecord.set(0, innerRecord); + + converter.convertRecord(outerRecord); + + Record resultInner = (Record) outerRecord.get(0); + assertInstanceOf(LocalDateTime.class, resultInner.get(0)); + } + + @Test + void testConvertListWithRecords() { + final Schema elementSchema = new Schema( + Types.NestedField.required(1, "ts", Types.TimestampType.withoutZone()) + ); + final Record recordInList = GenericRecord.create(elementSchema); + recordInList.set(0, Timestamp.valueOf("2026-06-06 06:06:06")); + + final List list = Collections.singletonList(recordInList); + + converter.convertList(list); + + assertInstanceOf(LocalDateTime.class, recordInList.get(0)); + } + + @Test + void testNullValueHandling() { + final Schema schema = new Schema( + Types.NestedField.optional(1, "nullable_field", Types.TimestampType.withoutZone()) + ); + final Record record = GenericRecord.create(schema); + record.set(0, null); + + converter.convertRecord(record); + + assertNull(record.get(0)); + } +}