Skip to content

Commit 7bfa0af

Browse files
committed
Added support for on prem s3 and fixed timestamp bug
Removed unnecessary code that already existed in main Added licenses and formatting Fixed all contrib-check issues
1 parent ede26f4 commit 7bfa0af

File tree

7 files changed

+251
-2
lines changed

7 files changed

+251
-2
lines changed

nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-aws/src/main/java/org/apache/nifi/services/iceberg/aws/S3IcebergFileIOProvider.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.nifi.processor.util.StandardValidators;
3333
import org.apache.nifi.services.iceberg.IcebergFileIOProvider;
3434
import org.apache.nifi.services.iceberg.ProviderContext;
35+
import software.amazon.awssdk.services.s3.model.StorageClass;
3536

3637
import java.util.HashMap;
3738
import java.util.List;
@@ -123,14 +124,25 @@ public class S3IcebergFileIOProvider extends AbstractControllerService implement
123124
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
124125
.build();
125126

127+
static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
128+
.name("Storage Class")
129+
.description("""
130+
Specifies the S3 storage class to use when writing objects.
131+
Primarily intended for on-premises or S3-compatible object storage implementations.""")
132+
.required(true)
133+
.allowableValues(StorageClass.values())
134+
.defaultValue(String.valueOf(StorageClass.STANDARD))
135+
.build();
136+
126137
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
127138
AUTHENTICATION_STRATEGY,
128139
ACCESS_KEY_ID,
129140
SECRET_ACCESS_KEY,
130141
SESSION_TOKEN,
131142
CLIENT_REGION,
132143
ENDPOINT_URL,
133-
PATH_STYLE_ACCESS
144+
PATH_STYLE_ACCESS,
145+
STORAGE_CLASS
134146
);
135147

136148
private final Map<String, String> standardProperties = new ConcurrentHashMap<>();
@@ -189,6 +201,8 @@ private Map<String, String> getConfiguredProperties(final ConfigurationContext c
189201
final String pathStyleAccess = context.getProperty(PATH_STYLE_ACCESS).getValue();
190202
contextProperties.put(S3FileIOProperties.PATH_STYLE_ACCESS, pathStyleAccess);
191203

204+
final String storageClass = context.getProperty(STORAGE_CLASS).getValue();
205+
contextProperties.put(S3FileIOProperties.WRITE_STORAGE_CLASS, storageClass);
192206
// HttpURLConnection Client Type avoids additional dependencies
193207
contextProperties.put(HttpClientProperties.CLIENT_TYPE, HttpClientProperties.CLIENT_TYPE_URLCONNECTION);
194208

nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@
5050
<groupId>org.apache.iceberg</groupId>
5151
<artifactId>iceberg-parquet</artifactId>
5252
</dependency>
53+
<!-- Iceberg Data dependencies -->
54+
<dependency>
55+
<groupId>org.apache.iceberg</groupId>
56+
<artifactId>iceberg-data</artifactId>
57+
</dependency>
5358
<!-- Override Parquet version from iceberg-parquet -->
5459
<dependency>
5560
<groupId>org.apache.parquet</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.services.iceberg.parquet.io;
18+
19+
import org.apache.iceberg.data.Record;
20+
21+
22+
public interface IcebergRecordConverter {
23+
void convertRecord(Record record);
24+
}

nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetIcebergRowWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.iceberg.data.Record;
2121
import org.apache.iceberg.io.TaskWriter;
2222
import org.apache.nifi.services.iceberg.IcebergRowWriter;
23+
import org.apache.nifi.services.iceberg.parquet.io.recordconverters.IcebergParquetRecordWriterConverter;
2324

2425
import java.io.IOException;
2526
import java.util.Objects;
@@ -29,13 +30,16 @@
2930
*/
3031
public class ParquetIcebergRowWriter implements IcebergRowWriter {
3132
private final TaskWriter<Record> writer;
33+
private final IcebergRecordConverter icebergParquetRecordWriterConverter =
34+
new IcebergParquetRecordWriterConverter();
3235

3336
public ParquetIcebergRowWriter(final TaskWriter<Record> writer) {
3437
this.writer = Objects.requireNonNull(writer, "Writer required");
3538
}
3639

3740
@Override
3841
public void write(final Record row) throws IOException {
42+
this.icebergParquetRecordWriterConverter.convertRecord(row);
3943
writer.write(row);
4044
}
4145

nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.iceberg.PartitionKey;
2121
import org.apache.iceberg.PartitionSpec;
2222
import org.apache.iceberg.Schema;
23+
import org.apache.iceberg.data.InternalRecordWrapper;
2324
import org.apache.iceberg.data.Record;
2425
import org.apache.iceberg.io.FileAppenderFactory;
2526
import org.apache.iceberg.io.FileIO;
@@ -32,6 +33,7 @@
3233
public class ParquetPartitionedWriter extends PartitionedFanoutWriter<Record> {
3334

3435
private final PartitionKey partitionKey;
36+
private final InternalRecordWrapper recordWrapper;
3537

3638
public ParquetPartitionedWriter(
3739
final PartitionSpec spec,
@@ -43,11 +45,12 @@ public ParquetPartitionedWriter(
4345
) {
4446
super(spec, FileFormat.PARQUET, appenderFactory, fileFactory, io, targetFileSize);
4547
this.partitionKey = new PartitionKey(spec, schema);
48+
this.recordWrapper = new InternalRecordWrapper(schema.asStruct());
4649
}
4750

4851
@Override
4952
protected PartitionKey partition(final Record record) {
50-
partitionKey.partition(record);
53+
partitionKey.partition(recordWrapper.wrap(record));
5154
return partitionKey;
5255
}
5356
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.services.iceberg.parquet.io.recordconverters;
18+
19+
import org.apache.iceberg.data.Record;
20+
import org.apache.nifi.services.iceberg.parquet.io.IcebergRecordConverter;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.function.Function;
25+
26+
public class IcebergParquetRecordWriterConverter implements IcebergRecordConverter {
27+
28+
private final Map<Class<?>, Function<Object, Object>> typeHandlers = Map.of(
29+
java.sql.Timestamp.class, (ts) -> ((java.sql.Timestamp) ts).toLocalDateTime(),
30+
Record.class, (rec) -> {
31+
convertRecord((Record) rec);
32+
return rec;
33+
},
34+
List.class, (list) -> {
35+
convertList((List<?>) list);
36+
return list;
37+
}
38+
);
39+
40+
@Override
41+
public void convertRecord(Record record) {
42+
if (record == null) {
43+
return;
44+
}
45+
46+
for (int i = 0; i < record.struct().fields().size(); i++) {
47+
Object value = record.get(i);
48+
if (value == null) {
49+
continue;
50+
}
51+
52+
Function<Object, Object> handler = findHandler(value.getClass());
53+
if (handler != null) {
54+
record.set(i, handler.apply(value));
55+
}
56+
}
57+
}
58+
59+
public void convertList(List<?> list) {
60+
if (list == null) {
61+
return;
62+
}
63+
64+
for (Object element : list) {
65+
if (element == null) {
66+
continue;
67+
}
68+
69+
Function<Object, Object> handler = findHandler(element.getClass());
70+
if (handler != null) {
71+
handler.apply(element);
72+
}
73+
}
74+
}
75+
76+
private Function<Object, Object> findHandler(Class<?> clazz) {
77+
if (typeHandlers.containsKey(clazz)) {
78+
return typeHandlers.get(clazz);
79+
}
80+
81+
return typeHandlers.entrySet().stream()
82+
.filter(entry -> entry.getKey().isAssignableFrom(clazz))
83+
.map(Map.Entry::getValue)
84+
.findFirst()
85+
.orElse(null);
86+
}
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.services.iceberg.parquet.io.recordconverters;
18+
19+
import org.apache.iceberg.Schema;
20+
import org.apache.iceberg.data.GenericRecord;
21+
import org.apache.iceberg.data.Record;
22+
import org.apache.iceberg.types.Types;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.extension.ExtendWith;
26+
import org.mockito.junit.jupiter.MockitoExtension;
27+
28+
import java.sql.Timestamp;
29+
import java.time.LocalDateTime;
30+
import java.util.Collections;
31+
import java.util.List;
32+
33+
import static org.junit.jupiter.api.Assertions.assertEquals;
34+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
35+
import static org.junit.jupiter.api.Assertions.assertNull;
36+
37+
@ExtendWith(MockitoExtension.class)
38+
class IcebergParquetRecordWriterConverterTest {
39+
40+
private IcebergParquetRecordWriterConverter converter;
41+
42+
@BeforeEach
43+
void setUp() {
44+
converter = new IcebergParquetRecordWriterConverter();
45+
}
46+
47+
@Test
48+
void testConvertTimestampToLocalDateTime() {
49+
final Schema schema = new Schema(
50+
Types.NestedField.required(1, "ts_field", Types.TimestampType.withoutZone())
51+
);
52+
final Timestamp timestamp = Timestamp.valueOf("2026-06-06 06:06:06");
53+
final Record record = GenericRecord.create(schema);
54+
record.set(0, timestamp);
55+
56+
converter.convertRecord(record);
57+
58+
Object result = record.get(0);
59+
assertInstanceOf(LocalDateTime.class, result);
60+
assertEquals(timestamp.toLocalDateTime(), result);
61+
}
62+
63+
@Test
64+
void testConvertNestedRecord() {
65+
final Schema innerSchema = new Schema(
66+
Types.NestedField.required(1, "inner_ts", Types.TimestampType.withoutZone())
67+
);
68+
final Schema outerSchema = new Schema(
69+
Types.NestedField.required(2, "nested_record", innerSchema.asStruct())
70+
);
71+
72+
final Record innerRecord = GenericRecord.create(innerSchema);
73+
final Timestamp timestamp = Timestamp.valueOf("2026-06-06 06:06:06");
74+
innerRecord.set(0, timestamp);
75+
76+
final Record outerRecord = GenericRecord.create(outerSchema);
77+
outerRecord.set(0, innerRecord);
78+
79+
converter.convertRecord(outerRecord);
80+
81+
Record resultInner = (Record) outerRecord.get(0);
82+
assertInstanceOf(LocalDateTime.class, resultInner.get(0));
83+
}
84+
85+
@Test
86+
void testConvertListWithRecords() {
87+
final Schema elementSchema = new Schema(
88+
Types.NestedField.required(1, "ts", Types.TimestampType.withoutZone())
89+
);
90+
final Record recordInList = GenericRecord.create(elementSchema);
91+
recordInList.set(0, Timestamp.valueOf("2026-06-06 06:06:06"));
92+
93+
final List<Record> list = Collections.singletonList(recordInList);
94+
95+
converter.convertList(list);
96+
97+
assertInstanceOf(LocalDateTime.class, recordInList.get(0));
98+
}
99+
100+
@Test
101+
void testNullValueHandling() {
102+
final Schema schema = new Schema(
103+
Types.NestedField.optional(1, "nullable_field", Types.TimestampType.withoutZone())
104+
);
105+
final Record record = GenericRecord.create(schema);
106+
record.set(0, null);
107+
108+
converter.convertRecord(record);
109+
110+
assertNull(record.get(0));
111+
}
112+
}

0 commit comments

Comments
 (0)