2121import org .apache .nifi .controller .repository .claim .ContentClaim ;
2222import org .apache .nifi .controller .repository .claim .ResourceClaim ;
2323import org .apache .nifi .controller .repository .claim .ResourceClaimManager ;
24+ import org .apache .nifi .controller .repository .claim .StandardContentClaim ;
2425import org .apache .nifi .flowfile .attributes .CoreAttributes ;
26+ import org .apache .nifi .processor .DataUnit ;
2527import org .apache .nifi .repository .schema .FieldCache ;
2628import org .apache .nifi .util .FormatUtils ;
2729import org .apache .nifi .util .NiFiProperties ;
@@ -98,6 +100,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
98100 private final List <File > flowFileRepositoryPaths = new ArrayList <>();
99101 private final ScheduledExecutorService checkpointExecutor ;
100102 private final int maxCharactersToCache ;
103+ private final long truncationThreshold ;
101104
102105 private volatile Collection <SerializedRepositoryRecord > recoveredRecords = null ;
103106 private final Set <ResourceClaim > orphanedResourceClaims = Collections .synchronizedSet (new HashSet <>());
@@ -132,6 +135,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
132135 // before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed
133136 // on restart.
134137 private final ConcurrentMap <Integer , BlockingQueue <ResourceClaim >> claimsAwaitingDestruction = new ConcurrentHashMap <>();
138+ private final ConcurrentMap <Integer , BlockingQueue <ContentClaim >> claimsAwaitingTruncation = new ConcurrentHashMap <>();
135139
136140 /**
137141 * default no args constructor for service loading only.
@@ -143,6 +147,7 @@ public WriteAheadFlowFileRepository() {
143147 nifiProperties = null ;
144148 retainOrphanedFlowFiles = true ;
145149 maxCharactersToCache = 0 ;
150+ truncationThreshold = Long .MAX_VALUE ;
146151 }
147152
148153 public WriteAheadFlowFileRepository (final NiFiProperties nifiProperties ) {
@@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
153158 retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean .parseBoolean (orphanedFlowFileProperty );
154159
155160 this .maxCharactersToCache = nifiProperties .getIntegerProperty (FLOWFILE_REPO_CACHE_SIZE , DEFAULT_CACHE_SIZE );
161+ final long maxAppendableClaimLength = DataUnit .parseDataSize (nifiProperties .getMaxAppendableClaimSize (), DataUnit .B ).longValue ();
162+ truncationThreshold = Math .min (1_000_000 , maxAppendableClaimLength );
156163
157164 final String directoryName = nifiProperties .getProperty (FLOWFILE_REPOSITORY_DIRECTORY_PREFIX );
158165 flowFileRepositoryPaths .add (new File (directoryName ));
@@ -446,33 +453,48 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
446453 // The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful.
447454 // Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim,
448455 // it's quite possible for claimant count to be 0 below, which results in two different threads adding the Content
449- // Claim to the 'claimsAwaitDestruction ' map. As a result, we can call #markDestructable with the same ContentClaim
456+ // Claim to the 'claimsAwaitingDestruction ' map. As a result, we can call #markDestructable with the same ContentClaim
450457 // multiple times, and the #markDestructable method is not necessarily idempotent.
451458 // However, the result of this is that the FileSystem Repository may end up trying to remove the content multiple times.
452459 // This does not, however, cause problems, as ContentRepository should handle this
453460 // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface.
454- final Set <ResourceClaim > claimsToAdd = new HashSet <>();
461+ final Set <ResourceClaim > destructableClaims = new HashSet <>();
462+ final Set <ContentClaim > truncatableClaims = new HashSet <>();
455463
456464 final Set <String > swapLocationsAdded = new HashSet <>();
457465 final Set <String > swapLocationsRemoved = new HashSet <>();
458466
459467 for (final RepositoryRecord record : repositoryRecords ) {
460468 updateClaimCounts (record );
461469
470+ final ContentClaim contentClaim = record .getCurrentClaim ();
471+ final boolean truncationCandidate = contentClaim != null && contentClaim .isTruncationCandidate ();
472+ final boolean claimChanged = !Objects .equals (record .getOriginalClaim (), contentClaim );
462473 if (record .getType () == RepositoryRecordType .DELETE ) {
463- // For any DELETE record that we have, if claim is destructible, mark it so
464- if (record .getCurrentClaim () != null && isDestructable (record .getCurrentClaim ())) {
465- claimsToAdd .add (record .getCurrentClaim ().getResourceClaim ());
474+ // For any DELETE record that we have, if claim is destructible or truncatable, mark it so
475+ if (isDestructable (contentClaim )) {
476+ destructableClaims .add (contentClaim .getResourceClaim ());
477+ } else if (truncationCandidate ) {
478+ truncatableClaims .add (contentClaim );
466479 }
467480
468- // If the original claim is different than the current claim and the original claim is destructible, mark it so
469- if (record .getOriginalClaim () != null && !record .getOriginalClaim ().equals (record .getCurrentClaim ()) && isDestructable (record .getOriginalClaim ())) {
470- claimsToAdd .add (record .getOriginalClaim ().getResourceClaim ());
481+ // If the original claim is different than the current claim and the original claim is destructible
482+ // or truncatable, mark it so
483+ if (claimChanged ) {
484+ if (isDestructable (record .getOriginalClaim ())) {
485+ destructableClaims .add (record .getOriginalClaim ().getResourceClaim ());
486+ } else if (record .getOriginalClaim () != null && record .getOriginalClaim ().isTruncationCandidate ()) {
487+ truncatableClaims .add (record .getOriginalClaim ());
488+ }
471489 }
472490 } else if (record .getType () == RepositoryRecordType .UPDATE ) {
473491 // if we have an update, and the original is no longer needed, mark original as destructible
474- if (record .getOriginalClaim () != null && record .getCurrentClaim () != record .getOriginalClaim () && isDestructable (record .getOriginalClaim ())) {
475- claimsToAdd .add (record .getOriginalClaim ().getResourceClaim ());
492+ if (claimChanged ) {
493+ if (isDestructable (record .getOriginalClaim ())) {
494+ destructableClaims .add (record .getOriginalClaim ().getResourceClaim ());
495+ } else if (record .getOriginalClaim () != null && record .getOriginalClaim ().isTruncationCandidate ()) {
496+ truncatableClaims .add (record .getOriginalClaim ());
497+ }
476498 }
477499 } else if (record .getType () == RepositoryRecordType .SWAP_OUT ) {
478500 final String swapLocation = record .getSwapLocation ();
@@ -485,13 +507,16 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
485507 }
486508 }
487509
488- // Once the content claim counts have been updated for all records, collect any transient claims that are eligible for destruction
510+ // Once the content claim counts have been updated for all records, collect any transient
511+ // claims that are eligible for destruction or truncation
489512 for (final RepositoryRecord record : repositoryRecords ) {
490513 final List <ContentClaim > transientClaims = record .getTransientClaims ();
491514 if (transientClaims != null ) {
492515 for (final ContentClaim transientClaim : transientClaims ) {
493516 if (isDestructable (transientClaim )) {
494- claimsToAdd .add (transientClaim .getResourceClaim ());
517+ destructableClaims .add (transientClaim .getResourceClaim ());
518+ } else if (transientClaim .isTruncationCandidate ()) {
519+ truncatableClaims .add (transientClaim );
495520 }
496521 }
497522 }
@@ -505,19 +530,15 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
505530 }
506531 }
507532
508- if (!claimsToAdd .isEmpty ()) {
509- // Get / Register a Set<ContentClaim> for the given Partition Index
510- final Integer partitionKey = Integer .valueOf (partitionIndex );
511- BlockingQueue <ResourceClaim > claimQueue = claimsAwaitingDestruction .get (partitionKey );
512- if (claimQueue == null ) {
513- claimQueue = new LinkedBlockingQueue <>();
514- final BlockingQueue <ResourceClaim > existingClaimQueue = claimsAwaitingDestruction .putIfAbsent (partitionKey , claimQueue );
515- if (existingClaimQueue != null ) {
516- claimQueue = existingClaimQueue ;
517- }
518- }
533+ if (!destructableClaims .isEmpty ()) {
534+ // Get / Register a Set<ResourceClaim> for the given Partition Index
535+ final BlockingQueue <ResourceClaim > claimQueue = claimsAwaitingDestruction .computeIfAbsent (partitionIndex , key -> new LinkedBlockingQueue <>());
536+ claimQueue .addAll (destructableClaims );
537+ }
519538
520- claimQueue .addAll (claimsToAdd );
539+ if (!truncatableClaims .isEmpty ()) {
540+ final BlockingQueue <ContentClaim > claimQueue = claimsAwaitingTruncation .computeIfAbsent (partitionIndex , key -> new LinkedBlockingQueue <>());
541+ claimQueue .addAll (truncatableClaims );
521542 }
522543 }
523544
@@ -568,16 +589,24 @@ private static String getLocationSuffix(final String swapLocation) {
568589
569590 @ Override
570591 public void onSync (final int partitionIndex ) {
571- final BlockingQueue <ResourceClaim > claimQueue = claimsAwaitingDestruction .get (partitionIndex );
572- if (claimQueue == null ) {
573- return ;
592+ final BlockingQueue <ResourceClaim > destructionClaimQueue = claimsAwaitingDestruction .get (partitionIndex );
593+ if (destructionClaimQueue != null ) {
594+ final Set <ResourceClaim > claimsToDestroy = new HashSet <>();
595+ destructionClaimQueue .drainTo (claimsToDestroy );
596+
597+ for (final ResourceClaim claim : claimsToDestroy ) {
598+ markDestructable (claim );
599+ }
574600 }
575601
576- final Set <ResourceClaim > claimsToDestroy = new HashSet <>();
577- claimQueue .drainTo (claimsToDestroy );
602+ final BlockingQueue <ContentClaim > truncationClaimQueue = claimsAwaitingTruncation .get (partitionIndex );
603+ if (truncationClaimQueue != null ) {
604+ final Set <ContentClaim > claimsToTruncate = new HashSet <>();
605+ truncationClaimQueue .drainTo (claimsToTruncate );
578606
579- for (final ResourceClaim claim : claimsToDestroy ) {
580- markDestructable (claim );
607+ for (final ContentClaim claim : claimsToTruncate ) {
608+ claimManager .markTruncatable (claim );
609+ }
581610 }
582611 }
583612
@@ -591,6 +620,15 @@ public void onGlobalSync() {
591620 markDestructable (claim );
592621 }
593622 }
623+
624+ for (final BlockingQueue <ContentClaim > claimQueue : claimsAwaitingTruncation .values ()) {
625+ final Set <ContentClaim > claimsToTruncate = new HashSet <>();
626+ claimQueue .drainTo (claimsToTruncate );
627+
628+ for (final ContentClaim claim : claimsToTruncate ) {
629+ claimManager .markTruncatable (claim );
630+ }
631+ }
594632 }
595633
596634 /**
@@ -725,6 +763,10 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
725763 queueMap .put (queue .getIdentifier (), queue );
726764 }
727765
766+ final Set <StandardContentClaim > truncationEligibleClaims = new HashSet <>();
767+ final Set <ContentClaim > forbiddenTruncationClaims = new HashSet <>();
768+ final Map <ResourceClaim , ContentClaim > latestContentClaimByResourceClaim = new HashMap <>();
769+
728770 final List <SerializedRepositoryRecord > dropRecords = new ArrayList <>();
729771 int numFlowFilesMissingQueue = 0 ;
730772 long maxId = 0 ;
@@ -750,6 +792,15 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
750792 }
751793
752794 final ContentClaim claim = record .getContentClaim ();
795+
796+ // Track the latest Content Claim for each Resource Claim so that we can determine which claims are eligible for truncation.
797+ if (claim != null ) {
798+ final ContentClaim latestContentClaim = latestContentClaimByResourceClaim .get (claim .getResourceClaim ());
799+ if (latestContentClaim == null || claim .getOffset () > latestContentClaim .getOffset ()) {
800+ latestContentClaimByResourceClaim .put (claim .getResourceClaim (), claim );
801+ }
802+ }
803+
753804 final FlowFileQueue flowFileQueue = queueMap .get (queueId );
754805 final boolean orphaned = flowFileQueue == null ;
755806 if (orphaned ) {
@@ -779,6 +830,18 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
779830
780831 continue ;
781832 } else if (claim != null ) {
833+ // If the claim exceeds the max appendable claim length on its own and doesn't start the Resource Claim,
834+ // we will consider it to be eligible for truncation. However, if there are multiple FlowFiles sharing the
835+ // same claim, we cannot truncate it because doing so would affect the other FlowFiles.
836+ if (claim .getOffset () > 0 && claim .getLength () > truncationThreshold && claim instanceof final StandardContentClaim scc ) {
837+ if (forbiddenTruncationClaims .contains (claim ) || truncationEligibleClaims .contains (scc )) {
838+ truncationEligibleClaims .remove (scc );
839+ forbiddenTruncationClaims .add (scc );
840+ } else {
841+ truncationEligibleClaims .add (scc );
842+ }
843+ }
844+
782845 claimManager .incrementClaimantCount (claim .getResourceClaim ());
783846 }
784847
@@ -788,6 +851,16 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
788851 // If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected.
789852 recoveredRecords = null ;
790853
854+ // If any Content Claim was determined to be truncatable, mark it as such now.
855+ for (final StandardContentClaim eligible : truncationEligibleClaims ) {
856+ final ContentClaim latestForResource = latestContentClaimByResourceClaim .get (eligible .getResourceClaim ());
857+ if (!Objects .equals (eligible , latestForResource )) {
858+ continue ;
859+ }
860+
861+ eligible .setTruncationCandidate (true );
862+ }
863+
791864 // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
792865 // return the appropriate number.
793866 flowFileSequenceGenerator .set (maxId + 1 );
@@ -854,7 +927,7 @@ public long getNextFlowFileSequence() {
854927 }
855928
856929 @ Override
857- public long getMaxFlowFileIdentifier () throws IOException {
930+ public long getMaxFlowFileIdentifier () {
858931 // flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong
859932 return flowFileSequenceGenerator .get () - 1 ;
860933 }
0 commit comments