Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ void testProcessingStrategyRecord() throws InterruptedException, ExecutionExcept
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, MAX_OFFSET);
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_OFFSET, FIRST_OFFSET);
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_COUNT, Integer.toString(TEST_RECORD_COUNT));
flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_TIMESTAMP);

flowFile.assertAttributeEquals("record.count", Integer.toString(TEST_RECORD_COUNT));
flowFile.assertAttributeEquals(FIRST_HEADER, HEADER_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ private void processSingleRecord(final ProcessSession session,
final long offset = consumerRecord.getOffset();
final AtomicLong maxOffset = new AtomicLong(offset);
final AtomicLong minOffset = new AtomicLong(offset);
group = new RecordGroup(ff, writer, maxOffset, minOffset);
final AtomicLong minTimestamp = new AtomicLong(consumerRecord.getTimestamp());
group = new RecordGroup(ff, writer, maxOffset, minOffset, minTimestamp);
recordGroups.put(criteria, group);
} else {
final long recordOffset = consumerRecord.getOffset();
Expand All @@ -172,6 +173,12 @@ private void processSingleRecord(final ProcessSession session,
if (recordOffset < minOffset.get()) {
minOffset.set(recordOffset);
}

final long recordTimestamp = consumerRecord.getTimestamp();
final AtomicLong minTimestamp = group.minTimestamp();
if (recordTimestamp < minTimestamp.get()) {
minTimestamp.set(recordTimestamp);
}
}

// let subclass convert into the thing to write
Expand Down Expand Up @@ -201,6 +208,9 @@ private void finishAllGroups(final ProcessSession session, final Map<RecordGroup
final long minOffset = group.minOffset().get();
resultAttrs.put(KafkaFlowFileAttribute.KAFKA_OFFSET, Long.toString(minOffset));

final long minTimestamp = group.minTimestamp().get();
resultAttrs.put(KafkaFlowFileAttribute.KAFKA_TIMESTAMP, Long.toString(minTimestamp));

// add any extra header‐derived attributes
resultAttrs.putAll(criteria.extraAttributes());
resultAttrs.put(KafkaFlowFileAttribute.KAFKA_CONSUMER_OFFSETS_COMMITTED, String.valueOf(commitOffsets));
Expand Down Expand Up @@ -240,6 +250,6 @@ protected Map<String, String> extractHeaders(final ByteRecord consumerRecord) {
private record RecordGroupCriteria(RecordSchema schema, Map<String, String> extraAttributes, String topic, int partition) {
}

private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, AtomicLong maxOffset, AtomicLong minOffset) {
private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, AtomicLong maxOffset, AtomicLong minOffset, AtomicLong minTimestamp) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -80,12 +83,12 @@ void testGroupingOfMessagesByTopicAndPartition() {
);

// Create ByteRecords
final ByteRecord group1Record1 = new ByteRecord("topic1", 0, 0, 0L, List.of(), null, "value1".getBytes(), 0L);
final ByteRecord group1Record2 = new ByteRecord("topic1", 0, 3, 0L, List.of(), null, "value4".getBytes(), 0L);
final ByteRecord group1Record1 = new ByteRecord("topic1", 0, 0, 1000L, List.of(), null, "value1".getBytes(), 0L);
final ByteRecord group1Record2 = new ByteRecord("topic1", 0, 3, 500L, List.of(), null, "value4".getBytes(), 0L);

final ByteRecord group2 = new ByteRecord("topic1", 1, 1, 0L, List.of(), null, "value2".getBytes(), 0L);
final ByteRecord group2 = new ByteRecord("topic1", 1, 1, 2000L, List.of(), null, "value2".getBytes(), 0L);

final ByteRecord group3 = new ByteRecord("topic2", 0, 2, 0L, List.of(), null, "value3".getBytes(), 0L);
final ByteRecord group3 = new ByteRecord("topic2", 0, 2, 3000L, List.of(), null, "value3".getBytes(), 0L);

final Iterator<ByteRecord> consumerRecords = List.of(group1Record1, group2, group3, group1Record2).iterator();
// Mock the session.create() and session.write() methods
Expand Down Expand Up @@ -120,6 +123,13 @@ void testGroupingOfMessagesByTopicAndPartition() {
assertEquals("topic2", capturedAttributes.get(2).get(KafkaFlowFileAttribute.KAFKA_TOPIC));
assertEquals("0", capturedAttributes.get(2).get(KafkaFlowFileAttribute.KAFKA_PARTITION));


final List<String> timestamps = capturedAttributes.stream()
.map(attrs -> attrs.get(KafkaFlowFileAttribute.KAFKA_TIMESTAMP))
.filter(Objects::nonNull)
.toList();
assertFalse(timestamps.isEmpty(), "Expected at least one group to have kafka.timestamp attribute");
assertTrue(timestamps.contains("500"), "Expected timestamp from group1Record2");
assertTrue(timestamps.contains("2000"), "Expected timestamp from group2");
assertTrue(timestamps.contains("3000"), "Expected timestamp from group3");
}
}
Loading