Skip to content

Commit 7532de6

Browse files
committed
NIFI-15570: Keep track of Content Claims where the last Claim in a Resource Claim can be truncated if it is large. Whenever FlowFile Repository is checkpointed, truncate any large Resource Claims when possible and necessary to avoid having a situtation where a small FlowFile in a given Resource Claim prevents a large Content Claim from being cleaned up.
1 parent ede26f4 commit 7532de6

File tree

15 files changed

+1444
-47
lines changed

15 files changed

+1444
-47
lines changed

nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,12 @@ public interface ContentClaim extends Comparable<ContentClaim> {
4444
* @return the length of this ContentClaim
4545
*/
4646
long getLength();
47+
48+
/**
49+
* Indicates whether or not this ContentClaim is a candidate for truncation.
50+
* @return true if this ContentClaim is a candidate for truncation, false otherwise
51+
*/
52+
default boolean isTruncationCandidate() {
53+
return false;
54+
}
4755
}

nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,17 @@ public interface ResourceClaimManager {
113113
*/
114114
void markDestructable(ResourceClaim claim);
115115

116+
/**
117+
* Indicates that the Resource Claim associated with the given Content Claim can now be
118+
* truncated to the start of the ContentClaim. This should only ever be called after it is
119+
* guaranteed that the FlowFile Repository has been synchronized with its underlying
120+
* storage component for the same reason as described in the {@link #markDestructable(ResourceClaim)}
121+
* method.
122+
*
123+
* @param claim the ContentClaim that should be used for truncation
124+
*/
125+
void markTruncatable(ContentClaim claim);
126+
116127
/**
117128
* Drains up to {@code maxElements} Content Claims from the internal queue
118129
* of destructable content claims to the given {@code destination} so that
@@ -139,6 +150,16 @@ public interface ResourceClaimManager {
139150
*/
140151
void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit);
141152

153+
/**
154+
* Drains up to {@code maxElements} Content Claims from the internal queue
155+
* of truncatable content claims to the given {@code destination} so that
156+
* they can be truncated.
157+
*
158+
* @param destination to drain to
159+
* @param maxElements max items to drain
160+
*/
161+
void drainTruncatableClaims(Collection<ContentClaim> destination, int maxElements);
162+
142163
/**
143164
* Clears the manager's memory of any and all ResourceClaims that it knows
144165
* about

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java

Lines changed: 211 additions & 12 deletions
Large diffs are not rendered by default.

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java

Lines changed: 105 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import org.apache.nifi.controller.repository.claim.ContentClaim;
2222
import org.apache.nifi.controller.repository.claim.ResourceClaim;
2323
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
24+
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
2425
import org.apache.nifi.flowfile.attributes.CoreAttributes;
26+
import org.apache.nifi.processor.DataUnit;
2527
import org.apache.nifi.repository.schema.FieldCache;
2628
import org.apache.nifi.util.FormatUtils;
2729
import org.apache.nifi.util.NiFiProperties;
@@ -98,6 +100,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
98100
private final List<File> flowFileRepositoryPaths = new ArrayList<>();
99101
private final ScheduledExecutorService checkpointExecutor;
100102
private final int maxCharactersToCache;
103+
private final long truncationThreshold;
101104

102105
private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null;
103106
private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>());
@@ -132,6 +135,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
132135
// before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed
133136
// on restart.
134137
private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>();
138+
private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> claimsAwaitingTruncation = new ConcurrentHashMap<>();
135139

136140
/**
137141
* default no args constructor for service loading only.
@@ -143,6 +147,7 @@ public WriteAheadFlowFileRepository() {
143147
nifiProperties = null;
144148
retainOrphanedFlowFiles = true;
145149
maxCharactersToCache = 0;
150+
truncationThreshold = Long.MAX_VALUE;
146151
}
147152

148153
public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
@@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
153158
retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty);
154159

155160
this.maxCharactersToCache = nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
161+
final long maxAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue();
162+
truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);
156163

157164
final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
158165
flowFileRepositoryPaths.add(new File(directoryName));
@@ -446,33 +453,48 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
446453
// The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful.
447454
// Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim,
448455
// it's quite possible for claimant count to be 0 below, which results in two different threads adding the Content
449-
// Claim to the 'claimsAwaitDestruction' map. As a result, we can call #markDestructable with the same ContentClaim
456+
// Claim to the 'claimsAwaitingDestruction' map. As a result, we can call #markDestructable with the same ContentClaim
450457
// multiple times, and the #markDestructable method is not necessarily idempotent.
451458
// However, the result of this is that the FileSystem Repository may end up trying to remove the content multiple times.
452459
// This does not, however, cause problems, as ContentRepository should handle this
453460
// This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface.
454-
final Set<ResourceClaim> claimsToAdd = new HashSet<>();
461+
final Set<ResourceClaim> destructableClaims = new HashSet<>();
462+
final Set<ContentClaim> truncatableClaims = new HashSet<>();
455463

456464
final Set<String> swapLocationsAdded = new HashSet<>();
457465
final Set<String> swapLocationsRemoved = new HashSet<>();
458466

459467
for (final RepositoryRecord record : repositoryRecords) {
460468
updateClaimCounts(record);
461469

470+
final ContentClaim contentClaim = record.getCurrentClaim();
471+
final boolean truncationCandidate = contentClaim != null && contentClaim.isTruncationCandidate();
472+
final boolean claimChanged = !Objects.equals(record.getOriginalClaim(), contentClaim);
462473
if (record.getType() == RepositoryRecordType.DELETE) {
463-
// For any DELETE record that we have, if claim is destructible, mark it so
464-
if (record.getCurrentClaim() != null && isDestructable(record.getCurrentClaim())) {
465-
claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
474+
// For any DELETE record that we have, if claim is destructible or truncatable, mark it so
475+
if (isDestructable(contentClaim)) {
476+
destructableClaims.add(contentClaim.getResourceClaim());
477+
} else if (truncationCandidate) {
478+
truncatableClaims.add(contentClaim);
466479
}
467480

468-
// If the original claim is different than the current claim and the original claim is destructible, mark it so
469-
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && isDestructable(record.getOriginalClaim())) {
470-
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
481+
// If the original claim is different than the current claim and the original claim is destructible
482+
// or truncatable, mark it so
483+
if (claimChanged) {
484+
if (isDestructable(record.getOriginalClaim())) {
485+
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
486+
} else if (record.getOriginalClaim() != null && record.getOriginalClaim().isTruncationCandidate()) {
487+
truncatableClaims.add(record.getOriginalClaim());
488+
}
471489
}
472490
} else if (record.getType() == RepositoryRecordType.UPDATE) {
473491
// if we have an update, and the original is no longer needed, mark original as destructible
474-
if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) {
475-
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
492+
if (claimChanged) {
493+
if (isDestructable(record.getOriginalClaim())) {
494+
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
495+
} else if (record.getOriginalClaim() != null && record.getOriginalClaim().isTruncationCandidate()) {
496+
truncatableClaims.add(record.getOriginalClaim());
497+
}
476498
}
477499
} else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
478500
final String swapLocation = record.getSwapLocation();
@@ -485,13 +507,16 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
485507
}
486508
}
487509

488-
// Once the content claim counts have been updated for all records, collect any transient claims that are eligible for destruction
510+
// Once the content claim counts have been updated for all records, collect any transient
511+
// claims that are eligible for destruction or truncation
489512
for (final RepositoryRecord record : repositoryRecords) {
490513
final List<ContentClaim> transientClaims = record.getTransientClaims();
491514
if (transientClaims != null) {
492515
for (final ContentClaim transientClaim : transientClaims) {
493516
if (isDestructable(transientClaim)) {
494-
claimsToAdd.add(transientClaim.getResourceClaim());
517+
destructableClaims.add(transientClaim.getResourceClaim());
518+
} else if (transientClaim.isTruncationCandidate()) {
519+
truncatableClaims.add(transientClaim);
495520
}
496521
}
497522
}
@@ -505,19 +530,15 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
505530
}
506531
}
507532

508-
if (!claimsToAdd.isEmpty()) {
509-
// Get / Register a Set<ContentClaim> for the given Partition Index
510-
final Integer partitionKey = Integer.valueOf(partitionIndex);
511-
BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey);
512-
if (claimQueue == null) {
513-
claimQueue = new LinkedBlockingQueue<>();
514-
final BlockingQueue<ResourceClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
515-
if (existingClaimQueue != null) {
516-
claimQueue = existingClaimQueue;
517-
}
518-
}
533+
if (!destructableClaims.isEmpty()) {
534+
// Get / Register a Set<ResourceClaim> for the given Partition Index
535+
final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.computeIfAbsent(partitionIndex, key -> new LinkedBlockingQueue<>());
536+
claimQueue.addAll(destructableClaims);
537+
}
519538

520-
claimQueue.addAll(claimsToAdd);
539+
if (!truncatableClaims.isEmpty()) {
540+
final BlockingQueue<ContentClaim> claimQueue = claimsAwaitingTruncation.computeIfAbsent(partitionIndex, key -> new LinkedBlockingQueue<>());
541+
claimQueue.addAll(truncatableClaims);
521542
}
522543
}
523544

@@ -568,16 +589,24 @@ private static String getLocationSuffix(final String swapLocation) {
568589

569590
@Override
570591
public void onSync(final int partitionIndex) {
571-
final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionIndex);
572-
if (claimQueue == null) {
573-
return;
592+
final BlockingQueue<ResourceClaim> destructionClaimQueue = claimsAwaitingDestruction.get(partitionIndex);
593+
if (destructionClaimQueue != null) {
594+
final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
595+
destructionClaimQueue.drainTo(claimsToDestroy);
596+
597+
for (final ResourceClaim claim : claimsToDestroy) {
598+
markDestructable(claim);
599+
}
574600
}
575601

576-
final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
577-
claimQueue.drainTo(claimsToDestroy);
602+
final BlockingQueue<ContentClaim> truncationClaimQueue = claimsAwaitingTruncation.get(partitionIndex);
603+
if (truncationClaimQueue != null) {
604+
final Set<ContentClaim> claimsToTruncate = new HashSet<>();
605+
truncationClaimQueue.drainTo(claimsToTruncate);
578606

579-
for (final ResourceClaim claim : claimsToDestroy) {
580-
markDestructable(claim);
607+
for (final ContentClaim claim : claimsToTruncate) {
608+
claimManager.markTruncatable(claim);
609+
}
581610
}
582611
}
583612

@@ -591,6 +620,15 @@ public void onGlobalSync() {
591620
markDestructable(claim);
592621
}
593622
}
623+
624+
for (final BlockingQueue<ContentClaim> claimQueue : claimsAwaitingTruncation.values()) {
625+
final Set<ContentClaim> claimsToTruncate = new HashSet<>();
626+
claimQueue.drainTo(claimsToTruncate);
627+
628+
for (final ContentClaim claim : claimsToTruncate) {
629+
claimManager.markTruncatable(claim);
630+
}
631+
}
594632
}
595633

596634
/**
@@ -725,6 +763,10 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
725763
queueMap.put(queue.getIdentifier(), queue);
726764
}
727765

766+
final Set<StandardContentClaim> truncationEligibleClaims = new HashSet<>();
767+
final Set<ContentClaim> forbiddenTruncationClaims = new HashSet<>();
768+
final Map<ResourceClaim, ContentClaim> latestContentClaimByResourceClaim = new HashMap<>();
769+
728770
final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>();
729771
int numFlowFilesMissingQueue = 0;
730772
long maxId = 0;
@@ -750,6 +792,15 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
750792
}
751793

752794
final ContentClaim claim = record.getContentClaim();
795+
796+
// Track the latest Content Claim for each Resource Claim so that we can determine which claims are eligible for truncation.
797+
if (claim != null) {
798+
final ContentClaim latestContentClaim = latestContentClaimByResourceClaim.get(claim.getResourceClaim());
799+
if (latestContentClaim == null || claim.getOffset() > latestContentClaim.getOffset()) {
800+
latestContentClaimByResourceClaim.put(claim.getResourceClaim(), claim);
801+
}
802+
}
803+
753804
final FlowFileQueue flowFileQueue = queueMap.get(queueId);
754805
final boolean orphaned = flowFileQueue == null;
755806
if (orphaned) {
@@ -779,6 +830,18 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
779830

780831
continue;
781832
} else if (claim != null) {
833+
// If the claim exceeds the max appendable claim length on its own and doesn't start the Resource Claim,
834+
// we will consider it to be eligible for truncation. However, if there are multiple FlowFiles sharing the
835+
// same claim, we cannot truncate it because doing so would affect the other FlowFiles.
836+
if (claim.getOffset() > 0 && claim.getLength() > truncationThreshold && claim instanceof final StandardContentClaim scc) {
837+
if (forbiddenTruncationClaims.contains(claim) || truncationEligibleClaims.contains(scc)) {
838+
truncationEligibleClaims.remove(scc);
839+
forbiddenTruncationClaims.add(scc);
840+
} else {
841+
truncationEligibleClaims.add(scc);
842+
}
843+
}
844+
782845
claimManager.incrementClaimantCount(claim.getResourceClaim());
783846
}
784847

@@ -788,6 +851,16 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
788851
// If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected.
789852
recoveredRecords = null;
790853

854+
// If any Content Claim was determined to be truncatable, mark it as such now.
855+
for (final StandardContentClaim eligible : truncationEligibleClaims) {
856+
final ContentClaim latestForResource = latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
857+
if (!Objects.equals(eligible, latestForResource)) {
858+
continue;
859+
}
860+
861+
eligible.setTruncationCandidate(true);
862+
}
863+
791864
// Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
792865
// return the appropriate number.
793866
flowFileSequenceGenerator.set(maxId + 1);
@@ -854,7 +927,7 @@ public long getNextFlowFileSequence() {
854927
}
855928

856929
@Override
857-
public long getMaxFlowFileIdentifier() throws IOException {
930+
public long getMaxFlowFileIdentifier() {
858931
// flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong
859932
return flowFileSequenceGenerator.get() - 1;
860933
}

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.nifi.controller.repository.FlowFileRepository;
2222
import org.apache.nifi.controller.repository.SwapContents;
2323
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
24+
import org.apache.nifi.controller.repository.claim.ContentClaim;
2425
import org.apache.nifi.controller.repository.claim.ResourceClaim;
2526
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
2627
import org.apache.nifi.events.EventReporter;
@@ -223,6 +224,10 @@ public int incrementClaimantCount(ResourceClaim claim, boolean newClaim) {
223224
public void markDestructable(ResourceClaim claim) {
224225
}
225226

227+
@Override
228+
public void markTruncatable(final ContentClaim claim) {
229+
}
230+
226231
@Override
227232
public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements) {
228233
}
@@ -231,6 +236,10 @@ public void drainDestructableClaims(Collection<ResourceClaim> destination, int m
231236
public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit) {
232237
}
233238

239+
@Override
240+
public void drainTruncatableClaims(final Collection<ContentClaim> destination, final int maxElements) {
241+
}
242+
234243
@Override
235244
public void purge() {
236245
}

0 commit comments

Comments
 (0)