-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15563 Report LAG in records from ConsumeKafka processor #10880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
NIFI-15563 Report LAG in records from ConsumeKafka processor #10880
Conversation
c8786f0 to
51e75e1
Compare
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting together this implementation with the current lag gauge recording @lkuchars. The general strategy looks good, and I recommended some implementation adjustments.
| @Override | ||
| public ByteRecord next() { | ||
| ByteRecord record = delegate.next(); | ||
| topicPartitionSummaries.add(new TopicPartitionSummary(record.getTopic(), record.getPartition())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach results in creating a new TopicPartitionSummary for every Record, which can create a lot of objects in a short period of time. Instead, keeping track of the last TopicPartitionSummary and comparing current topic and partition values should be a way to avoid unnecessary object creation.
| Map<TopicPartitionSummary, OptionalLong> topicPartitionLag = | ||
| topicPartitionSummaries.stream() | ||
| .map(ps -> new TopicPartitionSummary(ps.getTopic(), ps.getPartition())) | ||
| .collect(Collectors.toMap( | ||
| Function.identity(), | ||
| tps -> consumerService.currentLag(tps) | ||
| )); | ||
|
|
||
| topicPartitionLag.forEach((tps, lag) -> { | ||
| if (lag.isPresent()) { | ||
| final String gaugeName = makeLagMetricName(tps); | ||
| session.recordGauge(gaugeName, lag.getAsLong(), CommitTiming.NOW); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This functional approach results in creating an unnecessary intermediate Map. I recommend rewriting this using traditional for-each loops and avoiding the intermediate Map creation.
| } | ||
|
|
||
| String makeLagMetricName(final TopicPartitionSummary tps) { | ||
| return "consume.kafka." + tps.getTopic() + "." + tps.getPartition() + ".currentLag"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of String concatenation, I recommend using a static format string and adjusting the format without the consume.kafka prefix, since the context of the gauge already includes the Processor Type.
|
Thanks for the review. Requested changes are applied. |
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @lkuchars. Please review the Checkstyle warnings, but the overall implementation looks close to completion.
I'm sorry about that @exceptionfactory . Turns out, I ran contrib-check in the AWS bundle instead. Thank you for the review. |
Summary
NIFI-15563
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation