Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ void testDynamicUrlsParsedFromFlowFileAndAbleToConnectAndDisconnect() throws Ini
}

@Test
@Override
void testMigrateProperties() {
final TestRunner runner = TestRunners.newTestRunner(ConnectWebSocket.class);
final Map<String, String> expectedRenamed = Map.ofEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@ public interface ContentClaim extends Comparable<ContentClaim> {
* @return the length of this ContentClaim
*/
long getLength();

/**
* Indicates whether or not this ContentClaim is a candidate for truncation.
* @return true if this ContentClaim is a candidate for truncation, false otherwise
*/
default boolean isTruncationCandidate() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ public interface ResourceClaimManager {
*/
void markDestructable(ResourceClaim claim);

/**
* Indicates that the Resource Claim associated with the given Content Claim can now be
* truncated to the start of the ContentClaim. This should only ever be called after it is
* guaranteed that the FlowFile Repository has been synchronized with its underlying
* storage component for the same reason as described in the {@link #markDestructable(ResourceClaim)}
* method.
*
* @param claim the ContentClaim that should be used for truncation
*/
void markTruncatable(ContentClaim claim);

/**
* Drains up to {@code maxElements} Content Claims from the internal queue
* of destructable content claims to the given {@code destination} so that
Expand All @@ -139,6 +150,16 @@ public interface ResourceClaimManager {
*/
void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit);

/**
* Drains up to {@code maxElements} Content Claims from the internal queue
* of truncatable content claims to the given {@code destination} so that
* they can be truncated.
*
* @param destination to drain to
* @param maxElements max items to drain
*/
void drainTruncatableClaims(Collection<ContentClaim> destination, int maxElements);

/**
* Clears the manager's memory of any and all ResourceClaims that it knows
* about
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.repository.schema.FieldCache;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
Expand Down Expand Up @@ -98,6 +100,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
private final List<File> flowFileRepositoryPaths = new ArrayList<>();
private final ScheduledExecutorService checkpointExecutor;
private final int maxCharactersToCache;
private final long truncationThreshold;

private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null;
private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>());
Expand Down Expand Up @@ -132,6 +135,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed
// on restart.
private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> claimsAwaitingTruncation = new ConcurrentHashMap<>();

/**
* default no args constructor for service loading only.
Expand All @@ -143,6 +147,7 @@ public WriteAheadFlowFileRepository() {
nifiProperties = null;
retainOrphanedFlowFiles = true;
maxCharactersToCache = 0;
truncationThreshold = Long.MAX_VALUE;
}

public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
Expand All @@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty);

this.maxCharactersToCache = nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
final long maxAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue();
truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);

final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
flowFileRepositoryPaths.add(new File(directoryName));
Expand Down Expand Up @@ -446,33 +453,48 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
// The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful.
// Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim,
// it's quite possible for claimant count to be 0 below, which results in two different threads adding the Content
// Claim to the 'claimsAwaitDestruction' map. As a result, we can call #markDestructable with the same ContentClaim
// Claim to the 'claimsAwaitingDestruction' map. As a result, we can call #markDestructable with the same ContentClaim
// multiple times, and the #markDestructable method is not necessarily idempotent.
// However, the result of this is that the FileSystem Repository may end up trying to remove the content multiple times.
// This does not, however, cause problems, as ContentRepository should handle this
// This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface.
final Set<ResourceClaim> claimsToAdd = new HashSet<>();
final Set<ResourceClaim> destructableClaims = new HashSet<>();
final Set<ContentClaim> truncatableClaims = new HashSet<>();

final Set<String> swapLocationsAdded = new HashSet<>();
final Set<String> swapLocationsRemoved = new HashSet<>();

for (final RepositoryRecord record : repositoryRecords) {
updateClaimCounts(record);

final ContentClaim contentClaim = record.getCurrentClaim();
final boolean truncationCandidate = contentClaim != null && contentClaim.isTruncationCandidate();
final boolean claimChanged = !Objects.equals(record.getOriginalClaim(), contentClaim);
if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if claim is destructible, mark it so
if (record.getCurrentClaim() != null && isDestructable(record.getCurrentClaim())) {
claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
// For any DELETE record that we have, if claim is destructible or truncatable, mark it so
if (isDestructable(contentClaim)) {
destructableClaims.add(contentClaim.getResourceClaim());
} else if (truncationCandidate) {
truncatableClaims.add(contentClaim);
}

// If the original claim is different than the current claim and the original claim is destructible, mark it so
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && isDestructable(record.getOriginalClaim())) {
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
// If the original claim is different than the current claim and the original claim is destructible
// or truncatable, mark it so
if (claimChanged) {
if (isDestructable(record.getOriginalClaim())) {
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
} else if (record.getOriginalClaim() != null && record.getOriginalClaim().isTruncationCandidate()) {
truncatableClaims.add(record.getOriginalClaim());
}
}
} else if (record.getType() == RepositoryRecordType.UPDATE) {
// if we have an update, and the original is no longer needed, mark original as destructible
if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) {
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
if (claimChanged) {
if (isDestructable(record.getOriginalClaim())) {
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
} else if (record.getOriginalClaim() != null && record.getOriginalClaim().isTruncationCandidate()) {
truncatableClaims.add(record.getOriginalClaim());
}
}
} else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
final String swapLocation = record.getSwapLocation();
Expand All @@ -485,13 +507,16 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
}
}

// Once the content claim counts have been updated for all records, collect any transient claims that are eligible for destruction
// Once the content claim counts have been updated for all records, collect any transient
// claims that are eligible for destruction or truncation
for (final RepositoryRecord record : repositoryRecords) {
final List<ContentClaim> transientClaims = record.getTransientClaims();
if (transientClaims != null) {
for (final ContentClaim transientClaim : transientClaims) {
if (isDestructable(transientClaim)) {
claimsToAdd.add(transientClaim.getResourceClaim());
destructableClaims.add(transientClaim.getResourceClaim());
} else if (transientClaim.isTruncationCandidate()) {
truncatableClaims.add(transientClaim);
}
}
}
Expand All @@ -505,19 +530,15 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord
}
}

if (!claimsToAdd.isEmpty()) {
// Get / Register a Set<ContentClaim> for the given Partition Index
final Integer partitionKey = Integer.valueOf(partitionIndex);
BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey);
if (claimQueue == null) {
claimQueue = new LinkedBlockingQueue<>();
final BlockingQueue<ResourceClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
if (existingClaimQueue != null) {
claimQueue = existingClaimQueue;
}
}
if (!destructableClaims.isEmpty()) {
// Get / Register a Set<ResourceClaim> for the given Partition Index
final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.computeIfAbsent(partitionIndex, key -> new LinkedBlockingQueue<>());
claimQueue.addAll(destructableClaims);
}

claimQueue.addAll(claimsToAdd);
if (!truncatableClaims.isEmpty()) {
final BlockingQueue<ContentClaim> claimQueue = claimsAwaitingTruncation.computeIfAbsent(partitionIndex, key -> new LinkedBlockingQueue<>());
claimQueue.addAll(truncatableClaims);
}
}

Expand Down Expand Up @@ -568,16 +589,24 @@ private static String getLocationSuffix(final String swapLocation) {

@Override
public void onSync(final int partitionIndex) {
final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionIndex);
if (claimQueue == null) {
return;
final BlockingQueue<ResourceClaim> destructionClaimQueue = claimsAwaitingDestruction.get(partitionIndex);
if (destructionClaimQueue != null) {
final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
destructionClaimQueue.drainTo(claimsToDestroy);

for (final ResourceClaim claim : claimsToDestroy) {
markDestructable(claim);
}
}

final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
claimQueue.drainTo(claimsToDestroy);
final BlockingQueue<ContentClaim> truncationClaimQueue = claimsAwaitingTruncation.get(partitionIndex);
if (truncationClaimQueue != null) {
final Set<ContentClaim> claimsToTruncate = new HashSet<>();
truncationClaimQueue.drainTo(claimsToTruncate);

for (final ResourceClaim claim : claimsToDestroy) {
markDestructable(claim);
for (final ContentClaim claim : claimsToTruncate) {
claimManager.markTruncatable(claim);
}
}
}

Expand All @@ -591,6 +620,15 @@ public void onGlobalSync() {
markDestructable(claim);
}
}

for (final BlockingQueue<ContentClaim> claimQueue : claimsAwaitingTruncation.values()) {
final Set<ContentClaim> claimsToTruncate = new HashSet<>();
claimQueue.drainTo(claimsToTruncate);

for (final ContentClaim claim : claimsToTruncate) {
claimManager.markTruncatable(claim);
}
}
}

/**
Expand Down Expand Up @@ -725,6 +763,10 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
queueMap.put(queue.getIdentifier(), queue);
}

final Set<StandardContentClaim> truncationEligibleClaims = new HashSet<>();
final Set<ContentClaim> forbiddenTruncationClaims = new HashSet<>();
final Map<ResourceClaim, ContentClaim> latestContentClaimByResourceClaim = new HashMap<>();

final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>();
int numFlowFilesMissingQueue = 0;
long maxId = 0;
Expand All @@ -750,6 +792,15 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
}

final ContentClaim claim = record.getContentClaim();

// Track the latest Content Claim for each Resource Claim so that we can determine which claims are eligible for truncation.
if (claim != null) {
final ContentClaim latestContentClaim = latestContentClaimByResourceClaim.get(claim.getResourceClaim());
if (latestContentClaim == null || claim.getOffset() > latestContentClaim.getOffset()) {
latestContentClaimByResourceClaim.put(claim.getResourceClaim(), claim);
}
}

final FlowFileQueue flowFileQueue = queueMap.get(queueId);
final boolean orphaned = flowFileQueue == null;
if (orphaned) {
Expand Down Expand Up @@ -779,6 +830,18 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException

continue;
} else if (claim != null) {
// If the claim exceeds the max appendable claim length on its own and doesn't start the Resource Claim,
// we will consider it to be eligible for truncation. However, if there are multiple FlowFiles sharing the
// same claim, we cannot truncate it because doing so would affect the other FlowFiles.
if (claim.getOffset() > 0 && claim.getLength() > truncationThreshold && claim instanceof final StandardContentClaim scc) {
if (forbiddenTruncationClaims.contains(claim) || truncationEligibleClaims.contains(scc)) {
truncationEligibleClaims.remove(scc);
forbiddenTruncationClaims.add(scc);
} else {
truncationEligibleClaims.add(scc);
}
}

claimManager.incrementClaimantCount(claim.getResourceClaim());
}

Expand All @@ -788,6 +851,16 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException
// If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected.
recoveredRecords = null;

// If any Content Claim was determined to be truncatable, mark it as such now.
for (final StandardContentClaim eligible : truncationEligibleClaims) {
final ContentClaim latestForResource = latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
if (!Objects.equals(eligible, latestForResource)) {
continue;
}

eligible.setTruncationCandidate(true);
}

// Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
// return the appropriate number.
flowFileSequenceGenerator.set(maxId + 1);
Expand Down Expand Up @@ -854,7 +927,7 @@ public long getNextFlowFileSequence() {
}

@Override
public long getMaxFlowFileIdentifier() throws IOException {
public long getMaxFlowFileIdentifier() {
// flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong
return flowFileSequenceGenerator.get() - 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
Expand Down Expand Up @@ -223,6 +224,10 @@ public int incrementClaimantCount(ResourceClaim claim, boolean newClaim) {
public void markDestructable(ResourceClaim claim) {
}

@Override
public void markTruncatable(final ContentClaim claim) {
}

@Override
public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements) {
}
Expand All @@ -231,6 +236,10 @@ public void drainDestructableClaims(Collection<ResourceClaim> destination, int m
public void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit) {
}

@Override
public void drainTruncatableClaims(final Collection<ContentClaim> destination, final int maxElements) {
}

@Override
public void purge() {
}
Expand Down
Loading
Loading