4141import org .apache .nifi .kafka .processors .consumer .convert .WrapperRecordStreamKafkaMessageConverter ;
4242import org .apache .nifi .kafka .service .api .KafkaConnectionService ;
4343import org .apache .nifi .kafka .service .api .common .PartitionState ;
44+ import org .apache .nifi .kafka .service .api .common .TopicPartitionSummary ;
4445import org .apache .nifi .kafka .service .api .consumer .AutoOffsetReset ;
4546import org .apache .nifi .kafka .service .api .consumer .KafkaConsumerService ;
4647import org .apache .nifi .kafka .service .api .consumer .PollingContext ;
5758import org .apache .nifi .processor .Relationship ;
5859import org .apache .nifi .processor .VerifiableProcessor ;
5960import org .apache .nifi .processor .exception .ProcessException ;
61+ import org .apache .nifi .processor .metrics .CommitTiming ;
6062import org .apache .nifi .processor .util .StandardValidators ;
6163import org .apache .nifi .serialization .RecordReaderFactory ;
6264import org .apache .nifi .serialization .RecordSetWriterFactory ;
6870import java .time .Duration ;
6971import java .util .ArrayList ;
7072import java .util .Collection ;
73+ import java .util .HashSet ;
7174import java .util .Iterator ;
7275import java .util .List ;
7376import java .util .Map ;
77+ import java .util .OptionalLong ;
7478import java .util .Queue ;
7579import java .util .Set ;
7680import java .util .concurrent .LinkedBlockingQueue ;
7781import java .util .concurrent .TimeUnit ;
7882import java .util .concurrent .atomic .AtomicInteger ;
83+ import java .util .function .Function ;
7984import java .util .regex .Pattern ;
85+ import java .util .stream .Collectors ;
8086
8187import static org .apache .nifi .expression .ExpressionLanguageScope .NONE ;
8288
@@ -432,7 +438,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
432438 break ;
433439 }
434440
435- final Iterator <ByteRecord > consumerRecords = consumerService .poll (maxWaitDuration ).iterator ();
441+ final TopicPartitionScanningIterator consumerRecords =
442+ new TopicPartitionScanningIterator (consumerService .poll (maxWaitDuration ).iterator ());
443+
436444 if (!consumerRecords .hasNext ()) {
437445 getLogger ().trace ("No Kafka Records consumed: {}" , pollingContext );
438446 // Check if a rebalance occurred during poll - if so, break to commit what we have
@@ -445,6 +453,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
445453
446454 recordsReceived = true ;
447455 processConsumerRecords (context , session , offsetTracker , consumerRecords );
456+ reportCurrentLag (consumerService , session , consumerRecords .getTopicPartitionSummaries ());
448457
449458 // Check if a rebalance occurred during poll - if so, break to commit what we have
450459 if (consumerService .hasRevokedPartitions ()) {
@@ -498,6 +507,27 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
498507 });
499508 }
500509
510+ private void reportCurrentLag (final KafkaConsumerService consumerService , final ProcessSession session , final Set <TopicPartitionSummary > topicPartitionSummaries ) {
511+ Map <TopicPartitionSummary , OptionalLong > topicPartitionLag =
512+ topicPartitionSummaries .stream ()
513+ .map (ps -> new TopicPartitionSummary (ps .getTopic (), ps .getPartition ()))
514+ .collect (Collectors .toMap (
515+ Function .identity (),
516+ tps -> consumerService .currentLag (tps )
517+ ));
518+
519+ topicPartitionLag .forEach ((tps , lag ) -> {
520+ if (lag .isPresent ()) {
521+ final String gaugeName = makeLagMetricName (tps );
522+ session .recordGauge (gaugeName , lag .getAsLong (), CommitTiming .NOW );
523+ }
524+ });
525+ }
526+
527+ String makeLagMetricName (final TopicPartitionSummary tps ) {
528+ return tps .getTopic () + "_" + tps .getPartition () + "_currentLag" ;
529+ }
530+
501531 private void commitOffsets (final KafkaConsumerService consumerService , final OffsetTracker offsetTracker , final PollingContext pollingContext , final ProcessSession session ) {
502532 try {
503533 if (commitOffsets ) {
@@ -686,4 +716,31 @@ private PollingContext createPollingContext(final ProcessContext context) {
686716
687717 return pollingContext ;
688718 }
719+
720+ static class TopicPartitionScanningIterator implements Iterator <ByteRecord > {
721+
722+ private final Iterator <ByteRecord > delegate ;
723+ private final Set <TopicPartitionSummary > topicPartitionSummaries = new HashSet <>();
724+
725+ TopicPartitionScanningIterator (Iterator <ByteRecord > delegate ) {
726+ this .delegate = delegate ;
727+ }
728+
729+ @ Override
730+ public boolean hasNext () {
731+ return delegate .hasNext ();
732+ }
733+
734+ @ Override
735+ public ByteRecord next () {
736+ ByteRecord record = delegate .next ();
737+ topicPartitionSummaries .add (new TopicPartitionSummary (record .getTopic (), record .getPartition ()));
738+ return record ;
739+
740+ }
741+
742+ public Set <TopicPartitionSummary > getTopicPartitionSummaries () {
743+ return topicPartitionSummaries ;
744+ }
745+ }
689746}
0 commit comments