Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
import org.apache.nifi.kafka.service.Kafka3ConnectionService;
import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.reporting.InitializationException;
Expand All @@ -40,6 +41,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -254,6 +256,39 @@ public void testConsumesAllRecordsWithoutDuplicates() throws ExecutionException,
runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
}

@Test
@Timeout(30)
public void testLagReporting() throws ExecutionException, InterruptedException {
final String topic = "testLagReporting";
final int partition = 0;
final int maxPollRecords = 2;

runner.setProperty(ConsumeKafka.GROUP_ID, "testLagReportingGroup");
runner.setProperty(ConsumeKafka.TOPICS, topic);
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue());
runner.setProperty(ConsumeKafka.MAX_UNCOMMITTED_TIME, "1000 millis");

final ControllerService connectionService = runner.getControllerService(CONNECTION_SERVICE_ID);
runner.disableControllerService(connectionService);
runner.setProperty(connectionService, Kafka3ConnectionService.MAX_POLL_RECORDS, Integer.toString(maxPollRecords));
runner.enableControllerService(connectionService);


List<ProducerRecord<String, String>> records = IntStream.range(0, 1000)
.mapToObj(i -> new ProducerRecord<String, String>(topic, partition, "key" + i, "val" + i))
.toList();
produce(topic, records);

final TopicPartitionSummary tps = new TopicPartitionSummary(topic, partition);
final String lagMetricName = ((ConsumeKafka) runner.getProcessor()).makeLagMetricName(tps);
runner.run(1, false, true);

// consume until at least one gauge with lag gets recorded
while (runner.getGaugeValues(lagMetricName).isEmpty()) {
runner.run(1, false, false);
}
}

@Timeout(5)
@Test
void testMaxUncommittedSize() throws InterruptedException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
import org.apache.nifi.kafka.service.api.KafkaConnectionService;
import org.apache.nifi.kafka.service.api.common.PartitionState;
import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingContext;
Expand All @@ -57,6 +58,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.metrics.CommitTiming;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
Expand All @@ -68,15 +70,19 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;

Expand Down Expand Up @@ -432,7 +438,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
break;
}

final Iterator<ByteRecord> consumerRecords = consumerService.poll(maxWaitDuration).iterator();
final TopicPartitionScanningIterator consumerRecords =
new TopicPartitionScanningIterator(consumerService.poll(maxWaitDuration).iterator());

if (!consumerRecords.hasNext()) {
getLogger().trace("No Kafka Records consumed: {}", pollingContext);
// Check if a rebalance occurred during poll - if so, break to commit what we have
Expand All @@ -445,6 +453,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

recordsReceived = true;
processConsumerRecords(context, session, offsetTracker, consumerRecords);
reportCurrentLag(consumerService, session, consumerRecords.getTopicPartitionSummaries());

// Check if a rebalance occurred during poll - if so, break to commit what we have
if (consumerService.hasRevokedPartitions()) {
Expand Down Expand Up @@ -498,6 +507,27 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
});
}

private void reportCurrentLag(final KafkaConsumerService consumerService, final ProcessSession session, final Set<TopicPartitionSummary> topicPartitionSummaries) {
Map<TopicPartitionSummary, OptionalLong> topicPartitionLag =
topicPartitionSummaries.stream()
.map(ps -> new TopicPartitionSummary(ps.getTopic(), ps.getPartition()))
.collect(Collectors.toMap(
Function.identity(),
tps -> consumerService.currentLag(tps)
));

topicPartitionLag.forEach((tps, lag) -> {
if (lag.isPresent()) {
final String gaugeName = makeLagMetricName(tps);
session.recordGauge(gaugeName, lag.getAsLong(), CommitTiming.NOW);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functional approach results in creating an unnecessary intermediate Map. I recommend rewriting this using traditional for-each loops and avoiding the intermediate Map creation.

}

String makeLagMetricName(final TopicPartitionSummary tps) {
return "consume.kafka." + tps.getTopic() + "." + tps.getPartition() + ".currentLag";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of String concatenation, I recommend using a static format string and adjusting the format without the consume.kafka prefix, since the context of the gauge already includes the Processor Type.

}

private void commitOffsets(final KafkaConsumerService consumerService, final OffsetTracker offsetTracker, final PollingContext pollingContext, final ProcessSession session) {
try {
if (commitOffsets) {
Expand Down Expand Up @@ -686,4 +716,31 @@ private PollingContext createPollingContext(final ProcessContext context) {

return pollingContext;
}

static class TopicPartitionScanningIterator implements Iterator<ByteRecord> {

private final Iterator<ByteRecord> delegate;
private final Set<TopicPartitionSummary> topicPartitionSummaries = new HashSet<>();

TopicPartitionScanningIterator(Iterator<ByteRecord> delegate) {
this.delegate = delegate;
}

@Override
public boolean hasNext() {
return delegate.hasNext();
}

@Override
public ByteRecord next() {
ByteRecord record = delegate.next();
topicPartitionSummaries.add(new TopicPartitionSummary(record.getTopic(), record.getPartition()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach results in creating a new TopicPartitionSummary for every Record, which can create a lot of objects in a short period of time. Instead, keeping track of the last TopicPartitionSummary and comparing current topic and partition values should be a way to avoid unnecessary object creation.

return record;

}

public Set<TopicPartitionSummary> getTopicPartitionSummaries() {
return topicPartitionSummaries;
}
}
}
Loading