diff --git a/CHANGES.txt b/CHANGES.txt index 46b3f56c0bee71a7901add2f123209ad664bd6fd..ff00579c4f95cdfa2691ff94d7eb99ca69c932a5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.21 + * Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273) * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805) * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667) * EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790) diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index d2f9c7607d8af6c04c62092e7a164255ce4cf953..f6776c49f9516635b0dd25aca9366318e2ae7c82 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -195,6 +195,16 @@ public class DataRange return clusteringIndexFilter.selectsAllPartition(); } + /** + * Whether the underlying {@code ClusteringIndexFilter} is reversed or not. + * + * @return whether the underlying {@code ClusteringIndexFilter} is reversed or not. + */ + public boolean isReversed() + { + return clusteringIndexFilter.isReversed(); + } + /** * The clustering index filter to use for the provided key. *

diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 4f936cc8524567e1087cf707f4293a58a36a7c5c..1da66c124b804ec20b6fcbced7e2b5ebda578d56 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -206,6 +206,11 @@ public class PartitionRangeReadCommand extends ReadCommand return DatabaseDescriptor.getRangeRpcTimeout(); } + public boolean isReversed() + { + return dataRange.isReversed(); + } + public boolean selectsKey(DecoratedKey key) { if (!dataRange().contains(key)) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index b499dafb9ba5a20854103b4d48d6d38680bebebb..39a54026d26af6096de5b00bf999da0012151d89 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -330,6 +330,13 @@ public abstract class ReadCommand implements ReadQuery protected abstract int oldestUnrepairedTombstone(); + /** + * Whether the underlying {@code ClusteringIndexFilter} is reversed or not. + * + * @return whether the underlying {@code ClusteringIndexFilter} is reversed or not. + */ + public abstract boolean isReversed(); + public ReadResponse createResponse(UnfilteredPartitionIterator iterator) { // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 2e014ba56614019bb20e9a40e739f0b5b9a87f3a..841c3b9f992a2d21230b2d7ffe28608781c1c8a6 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -415,6 +415,11 @@ public class SinglePartitionReadCommand extends ReadCommand return DatabaseDescriptor.getReadRpcTimeout(); } + public boolean isReversed() + { + return clusteringIndexFilter.isReversed(); + } + public boolean selectsKey(DecoratedKey key) { if (!this.partitionKey().equals(key)) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index b132d90af4ee68140e18b9e9457a473016511594..8c4732b2163fd96cb6a66ae015acf55a3e2cba61 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -199,11 +199,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte { } - public void onMergedRows(Row merged, Row[] versions) + public Row onMergedRows(Row merged, Row[] versions) { indexTransaction.start(); indexTransaction.onRowMerge(merged, versions); indexTransaction.commit(); + return merged; } public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions) diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 4300651905ae08c046f5a9ff9ccddfe0aeb0498e..774e4d3a639b70b4a0a0314251554d912d365d52 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -126,6 +126,8 @@ public abstract class RowFilter implements Iterable return false; } + protected abstract Transformation> filter(CFMetaData metadata, int nowInSec); + /** * Filters the provided iterator so that only the row satisfying the expression of this filter * are included in the resulting iterator. @@ -134,7 +136,23 @@ public abstract class RowFilter implements Iterable * @param nowInSec the time of query in seconds. * @return the filtered iterator. */ - public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec); + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) + { + return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(iter.metadata(), nowInSec)); + } + + /** + * Filters the provided iterator so that only the row satisfying the expression of this filter + * are included in the resulting iterator. + * + * @param iter the iterator to filter + * @param nowInSec the time of query in seconds. + * @return the filtered iterator. + */ + public PartitionIterator filter(PartitionIterator iter, CFMetaData metadata, int nowInSec) + { + return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(metadata, nowInSec)); + } /** * Whether the provided row in the provided partition satisfies this filter. @@ -263,20 +281,16 @@ public abstract class RowFilter implements Iterable super(expressions); } - public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) + protected Transformation> filter(CFMetaData metadata, int nowInSec) { - if (expressions.isEmpty()) - return iter; - - final CFMetaData metadata = iter.metadata(); long numberOfStaticColumnExpressions = expressions.stream().filter(e -> e.column.isStatic()).count(); final boolean filterStaticColumns = numberOfStaticColumnExpressions != 0; final boolean filterNonStaticColumns = (expressions.size() - numberOfStaticColumnExpressions) > 0; - class IsSatisfiedFilter extends Transformation + return new Transformation>() { DecoratedKey pk; - public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + protected BaseRowIterator applyToPartition(BaseRowIterator partition) { // The filter might be on static columns, so need to check static row first. if (filterStaticColumns && applyToRow(partition.staticRow()) == null) @@ -286,7 +300,9 @@ public abstract class RowFilter implements Iterable } pk = partition.partitionKey(); - UnfilteredRowIterator iterator = Transformation.apply(partition, this); + BaseRowIterator iterator = partition instanceof UnfilteredRowIterator + ? Transformation.apply((UnfilteredRowIterator) partition, this) + : Transformation.apply((RowIterator) partition, this); if (filterNonStaticColumns && !iterator.hasNext()) { @@ -308,9 +324,7 @@ public abstract class RowFilter implements Iterable return null; return row; } - } - - return Transformation.apply(iter, new IsSatisfiedFilter()); + }; } protected RowFilter withNewExpressions(List expressions) @@ -326,36 +340,47 @@ public abstract class RowFilter implements Iterable super(expressions); } - public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec) + protected Transformation> filter(CFMetaData metadata, int nowInSec) { - if (expressions.isEmpty()) - return iter; - - class IsSatisfiedThriftFilter extends Transformation + // Thrift does not filter rows, it filters entire partition if any of the expression is not + // satisfied, which forces us to materialize the result (in theory we could materialize only + // what we need which might or might not be everything, but we keep it simple since in practice + // it's not worth that it has ever been). + return new Transformation>() { - @Override - public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) + protected BaseRowIterator applyToPartition(BaseRowIterator partition) + { + return partition instanceof UnfilteredRowIterator ? applyTo((UnfilteredRowIterator) partition) + : applyTo((RowIterator) partition); + } + + private UnfilteredRowIterator applyTo(UnfilteredRowIterator partition) + { + ImmutableBTreePartition result = ImmutableBTreePartition.create(partition); + partition.close(); + return accepts(result) ? result.unfilteredIterator() : null; + } + + private RowIterator applyTo(RowIterator partition) { - // Thrift does not filter rows, it filters entire partition if any of the expression is not - // satisfied, which forces us to materialize the result (in theory we could materialize only - // what we need which might or might not be everything, but we keep it simple since in practice - // it's not worth that it has ever been). - ImmutableBTreePartition result = ImmutableBTreePartition.create(iter); - iter.close(); + FilteredPartition result = FilteredPartition.create(partition); + return accepts(result) ? result.rowIterator() : null; + } + private boolean accepts(ImmutableBTreePartition result) + { // The partition needs to have a row for every expression, and the expression needs to be valid. for (Expression expr : expressions) { assert expr instanceof ThriftExpression; - Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes)); - if (row == null || !expr.isSatisfiedBy(iter.metadata(), iter.partitionKey(), row)) - return null; + Row row = result.getRow(makeCompactClustering(metadata, expr.column().name.bytes)); + if (row == null || !expr.isSatisfiedBy(metadata, result.partitionKey(), row)) + return false; } // If we get there, it means all expressions where satisfied, so return the original result - return result.unfilteredIterator(); + return true; } - } - return Transformation.apply(iter, new IsSatisfiedThriftFilter()); + }; } protected RowFilter withNewExpressions(List expressions) diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 3f7d072992a639595bd2c423f22252bda4ba4c7b..bff910e35643469f54584cfe951e762e92b77739 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.FilteredPartitions; +import org.apache.cassandra.db.transform.MorePartitions; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -47,7 +48,7 @@ public abstract class UnfilteredPartitionIterators public interface MergeListener { public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List versions); - public void close(); + public default void close() {} } @SuppressWarnings("resource") // The created resources are returned right away @@ -77,6 +78,24 @@ public abstract class UnfilteredPartitionIterators return Transformation.apply(toReturn, new Close()); } + public static UnfilteredPartitionIterator concat(final List iterators) + { + if (iterators.size() == 1) + return iterators.get(0); + + class Extend implements MorePartitions + { + int i = 1; + public UnfilteredPartitionIterator moreContents() + { + if (i >= iterators.size()) + return null; + return iterators.get(i++); + } + } + return MorePartitions.extend(iterators.get(0), new Extend()); + } + public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec) { return FilteredPartitions.filter(iterator, nowInSec); @@ -84,7 +103,6 @@ public abstract class UnfilteredPartitionIterators public static UnfilteredPartitionIterator merge(final List iterators, final int nowInSec, final MergeListener listener) { - assert listener != null; assert !iterators.isEmpty(); final boolean isForThrift = iterators.get(0).isForThrift(); @@ -109,7 +127,9 @@ public abstract class UnfilteredPartitionIterators protected UnfilteredRowIterator getReduced() { - UnfilteredRowIterators.MergeListener rowListener = listener.getRowMergeListener(partitionKey, toMerge); + UnfilteredRowIterators.MergeListener rowListener = listener == null + ? null + : listener.getRowMergeListener(partitionKey, toMerge); // Replace nulls by empty iterators for (int i = 0; i < toMerge.size(); i++) @@ -153,7 +173,9 @@ public abstract class UnfilteredPartitionIterators public void close() { merged.close(); - listener.close(); + + if (listener != null) + listener.close(); } }; } diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index f42f675d62f80a81bb8ca1632cb11142a66d6118..b6dbf822f7f3174920ca729c26d749ee6514e77f 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -72,12 +72,17 @@ public abstract class UnfilteredRowIterators * particular, this may be called in cases where there is no row in the merged output (if a source has a row * that is shadowed by another source range tombstone or partition level deletion). * - * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a - * placeholder for when at least one source has a row, but that row is shadowed in the merged output. + * @param merged the result of the merge. This cannot be {@code null} (so that listener can always access the + * clustering from this safely)but can be empty, in which case this is a placeholder for when at least one + * source has a row, but that row is shadowed in the merged output. * @param versions for each source, the row in that source corresponding to {@code merged}. This can be * {@code null} for some sources if the source has not such row. + * @return the row to use as result of the merge (can be {@code null}). Most implementations should simply + * return {@code merged}, but this allows some implementations to impact the merge result if necessary. If this + * returns either {@code null} or an empty row, then the row is skipped from the merge result. If this returns a + * non {@code null} result, then the returned row must have the same clustering than {@code merged}. */ - public void onMergedRows(Row merged, Row[] versions); + public Row onMergedRows(Row merged, Row[] versions); /** * Called once for every range tombstone marker participating in the merge. @@ -500,9 +505,12 @@ public abstract class UnfilteredRowIterators Row merged = merger.merge(partitionDeletion); if (merged == null) merged = Rows.EMPTY_STATIC_ROW; - if (listener != null) - listener.onMergedRows(merged, merger.mergedRows()); - return merged; + if (listener == null) + return merged; + + merged = listener.onMergedRows(merged, merger.mergedRows()); + // Note that onMergedRows can have returned null even though his input wasn't null + return merged == null ? Rows.EMPTY_STATIC_ROW : merged; } private static PartitionColumns collectColumns(List iterators) @@ -586,9 +594,15 @@ public abstract class UnfilteredRowIterators if (nextKind == Unfiltered.Kind.ROW) { Row merged = rowMerger.merge(markerMerger.activeDeletion()); - if (listener != null) - listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, rowMerger.mergedRows()); - return merged; + if (listener == null) + return merged; + + merged = listener.onMergedRows(merged == null + ? BTreeRow.emptyRow(rowMerger.mergedClustering()) + : merged, + rowMerger.mergedRows()); + + return merged == null || merged.isEmpty() ? null : merged; } else { diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index eb56ed92ca9fb8c75824419a9648220a7b852f50..1f4803e1bccc3ceb5df230585e8716f4bafe1ec3 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -153,6 +153,7 @@ public class TableMetrics public final Meter readRepairRequests; public final Meter shortReadProtectionRequests; + public final Meter replicaSideFilteringProtectionRequests; public final Map> samplers; /** @@ -649,8 +650,9 @@ public class TableMetrics casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose); casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit); - readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests")); - shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests")); + readRepairRequests = createTableMeter("ReadRepairRequests"); + shortReadProtectionRequests = createTableMeter("ShortReadProtectionRequests"); + replicaSideFilteringProtectionRequests = createTableMeter("ReplicaSideFilteringProtectionRequests"); } public void updateSSTableIterated(int count) @@ -758,6 +760,18 @@ public class TableMetrics return cfCounter; } + private Meter createTableMeter(final String name) + { + return createTableMeter(name, name); + } + + private Meter createTableMeter(final String name, final String alias) + { + Meter tableMeter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias)); + register(name, alias, tableMeter); + return tableMeter; + } + /** * Create a histogram-like interface that will register both a CF, keyspace and global level * histogram and forward any updates to both diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 522c57bbce79364074a1a9d98d18d95aeadd9acb..02d355e0aeb21386bd54074f4b754c14845e32b6 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -20,6 +20,7 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.*; import java.util.concurrent.TimeoutException; +import java.util.function.UnaryOperator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -80,17 +81,126 @@ public class DataResolver extends ResponseResolver public PartitionIterator resolve() { + if (!needsReplicaFilteringProtection()) + { + ResolveContext context = new ResolveContext(responses.size()); + return resolveWithReadRepair(context, + i -> shortReadProtectedResponse(i, context), + UnaryOperator.identity()); + } + + return resolveWithReplicaFilteringProtection(); + } + + private boolean needsReplicaFilteringProtection() + { + return !command.rowFilter().isEmpty(); + } + + private class ResolveContext + { + private final InetAddress[] sources; + private final DataLimits.Counter mergedResultCounter; + + private ResolveContext(int count) + { + assert count <= responses.size(); + this.sources = new InetAddress[count]; + for (int i = 0; i < count; i++) + sources[i] = responses.get(i).from; + this.mergedResultCounter = command.limits().newCounter(command.nowInSec(), + true, + command.selectsFullPartition(), + enforceStrictLiveness); + } + + private boolean needShortReadProtection() + { + // If we have only one result, there is no read repair to do and we can't get short reads + // Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a + // partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit, + // don't bother protecting against short reads. + return sources.length > 1 && !command.limits().isUnlimited(); + } + } + + @FunctionalInterface + private interface ResponseProvider + { + UnfilteredPartitionIterator getResponse(int i); + } + + private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context) + { + UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command); + + return context.needShortReadProtection() + ? extendWithShortReadProtection(originalResponse, context.sources[i], context.mergedResultCounter) + : originalResponse; + } + + private PartitionIterator resolveWithReadRepair(ResolveContext context, + ResponseProvider responseProvider, + UnaryOperator preCountFilter) + { + return resolveInternal(context, new RepairMergeListener(context.sources), responseProvider, preCountFilter); + } + + private PartitionIterator resolveWithReplicaFilteringProtection() + { + // Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that + // wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version + // of that row) works in 3 steps: + // 1) we read the full response just to collect rows that may be outdated (the ones we got from some + // replica but didn't got any response for other; it could be those other replica have filtered a more + // up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the + // query limit. This simulate the worst case scenario where all those "potentially outdated" rows are + // indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read + // protection). + // 2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated + // or not. + // 3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair, + // but where for each replica we use their original response _plus_ the additional rows queried in the + // previous step (and apply the command#rowFilter() on the full result). Since the first phase has + // pessimistically collected enough results for the case where all potentially outdated results are indeed + // outdated, we shouldn't need further short-read protection requests during this phase. + // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here // at the beginning of this method), so grab the response count once and use that through the method. int count = responses.size(); - List iters = new ArrayList<>(count); - InetAddress[] sources = new InetAddress[count]; + // We need separate contexts, as each context has his own counter + ResolveContext firstPhaseContext = new ResolveContext(count); + ResolveContext secondPhaseContext = new ResolveContext(count); + ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, firstPhaseContext.sources); + PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext, + rfp.mergeController(), + i -> shortReadProtectedResponse(i, firstPhaseContext), + UnaryOperator.identity()); + + // Consume the first phase partitions to populate the replica filtering protection with both those materialized + // partitions and the primary keys to be fetched. + PartitionIterators.consume(firstPhasePartitions); + firstPhasePartitions.close(); + + // After reading the entire query results the protection helper should have cached all the partitions so we can + // clear the responses accumulator for the sake of memory usage, given that the second phase might take long if + // it needs to query replicas. + responses.clearUnsafe(); + + return resolveWithReadRepair(secondPhaseContext, + rfp::queryProtectedPartitions, + results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec())); + } + + private PartitionIterator resolveInternal(ResolveContext context, + UnfilteredPartitionIterators.MergeListener mergeListener, + ResponseProvider responseProvider, + UnaryOperator preCountFilter) + { + int count = context.sources.length; + List results = new ArrayList<>(count); for (int i = 0; i < count; i++) - { - MessageIn msg = responses.get(i); - iters.add(msg.payload.makeIterator(command)); - sources[i] = msg.from; - } + results.add(responseProvider.getResponse(i)); /* * Even though every response, individually, will honor the limit, it is possible that we will, after the merge, @@ -106,36 +216,14 @@ public class DataResolver extends ResponseResolver * See CASSANDRA-13747 for more details. */ - DataLimits.Counter mergedResultCounter = - command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); - - UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter); + UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, command.nowInSec(), mergeListener); FilteredPartitions filtered = - FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); - PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter); + FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); + PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter); return command.isForThrift() - ? counted - : Transformation.apply(counted, new EmptyPartitionsDiscarder()); - } - - private UnfilteredPartitionIterator mergeWithShortReadProtection(List results, - InetAddress[] sources, - DataLimits.Counter mergedResultCounter) - { - // If we have only one results, there is no read repair to do and we can't get short reads - if (results.size() == 1) - return results.get(0); - - /* - * So-called short reads stems from nodes returning only a subset of the results they have due to the limit, - * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother. - */ - if (!command.limits().isUnlimited()) - for (int i = 0; i < results.size(); i++) - results.set(i, extendWithShortReadProtection(results.get(i), sources[i], mergedResultCounter)); - - return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources)); + ? counted + : Transformation.apply(counted, new EmptyPartitionsDiscarder()); } private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener @@ -294,13 +382,13 @@ public class DataResolver extends ResponseResolver } } - public void onMergedRows(Row merged, Row[] versions) + public Row onMergedRows(Row merged, Row[] versions) { // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle // those case directly in their respective methods (in other words, it would be inefficient to send a row // deletion as repair when we know we've already send a partition level or range tombstone that covers it). if (merged.isEmpty()) - return; + return merged; Rows.diff(diffListener, merged, versions); for (int i = 0; i < currentRows.length; i++) @@ -309,6 +397,8 @@ public class DataResolver extends ResponseResolver update(i).add(currentRows[i].build()); } Arrays.fill(currentRows, null); + + return merged; } private DeletionTime currentDeletion() diff --git a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java new file mode 100644 index 0000000000000000000000000000000000000000..36d51cc89c054f1b089bda767e766ce6d06c2d4a --- /dev/null +++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Columns; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionColumns; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.btree.BTreeSet; + +/** + * Helper in charge of collecting additional queries to be done on the coordinator to protect against invalid results + * being included due to replica-side filtering (secondary indexes or {@code ALLOW * FILTERING}). + *

+ * When using replica-side filtering with CL>ONE, a replica can send a stale result satisfying the filter, while updated + * replicas won't send a corresponding tombstone to discard that result during reconciliation. This helper identifies + * the rows in a replica response that don't have a corresponding row in other replica responses, and requests them by + * primary key to the "silent" replicas in a second fetch round. + *

+ * See CASSANDRA-8272 and CASSANDRA-8273 for further details. + */ +class ReplicaFilteringProtection +{ + private static final Logger logger = LoggerFactory.getLogger(ReplicaFilteringProtection.class); + + private final Keyspace keyspace; + private final ReadCommand command; + private final ConsistencyLevel consistency; + private final InetAddress[] sources; + private final TableMetrics tableMetrics; + + /** + * Per-source primary keys of the rows that might be outdated so they need to be fetched. + * For outdated static rows we use an empty builder to signal it has to be queried. + */ + private final List>> rowsToFetch; + + /** + * Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows. + */ + private final List> originalPartitions; + + ReplicaFilteringProtection(Keyspace keyspace, + ReadCommand command, + ConsistencyLevel consistency, + InetAddress[] sources) + { + this.keyspace = keyspace; + this.command = command; + this.consistency = consistency; + this.sources = sources; + this.rowsToFetch = new ArrayList<>(sources.length); + this.originalPartitions = new ArrayList<>(sources.length); + + for (InetAddress ignored : sources) + { + rowsToFetch.add(new TreeMap<>()); + originalPartitions.add(new ArrayList<>()); + } + + tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId); + } + + private BTreeSet.Builder getOrCreateToFetch(int source, DecoratedKey partitionKey) + { + return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> BTreeSet.builder(command.metadata().comparator)); + } + + /** + * Returns the protected results for the specified replica. These are generated fetching the extra rows and merging + * them with the cached original filtered results for that replica. + * + * @param source the source + * @return the protected results for the specified replica + */ + UnfilteredPartitionIterator queryProtectedPartitions(int source) + { + UnfilteredPartitionIterator original = makeIterator(originalPartitions.get(source)); + SortedMap> toFetch = rowsToFetch.get(source); + + if (toFetch.isEmpty()) + return original; + + // TODO: this would be more efficient if we had multi-key queries internally + List fetched = toFetch.keySet() + .stream() + .map(k -> querySourceOnKey(source, k)) + .collect(Collectors.toList()); + + return UnfilteredPartitionIterators.merge(Arrays.asList(original, UnfilteredPartitionIterators.concat(fetched)), + command.nowInSec(), null); + } + + private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey key) + { + BTreeSet.Builder builder = rowsToFetch.get(i).get(key); + assert builder != null; // We're calling this on the result of rowsToFetch.get(i).keySet() + + InetAddress source = sources[i]; + NavigableSet clusterings = builder.build(); + tableMetrics.replicaSideFilteringProtectionRequests.mark(); + if (logger.isTraceEnabled()) + logger.trace("Requesting rows {} in partition {} from {} for replica-side filtering protection", + clusterings, key, source); + Tracing.trace("Requesting {} rows in partition {} from {} for replica-side filtering protection", + clusterings.size(), key, source); + + // build the read command taking into account that we could be requesting only in the static row + DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE; + ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed()); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + RowFilter.NONE, + limits, + key, + filter); + try + { + return executeReadCommand(cmd, source); + } + catch (ReadTimeoutException e) + { + int blockFor = consistency.blockFor(keyspace); + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + } + catch (UnavailableException e) + { + int blockFor = consistency.blockFor(keyspace); + throw new UnavailableException(consistency, blockFor, blockFor - 1); + } + } + + private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, InetAddress source) + { + DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1); + ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source)); + + if (StorageProxy.canDoLocalRequest(source)) + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); + else + MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler); + + // We don't call handler.get() because we want to preserve tombstones + handler.awaitResults(); + assert resolver.responses.size() == 1; + return resolver.responses.get(0).payload.makeIterator(command); + } + + /** + * Returns a merge listener that skips the merged rows for which any of the replicas doesn't have a version, + * pessimistically assuming that they are outdated. It is intended to be used during a first merge of per-replica + * query results to ensure we fetch enough results from the replicas to ensure we don't miss any potentially + * outdated result. + *

+ * The listener will track both the accepted data and the primary keys of the rows that are considered as outdated. + * That way, once the query results would have been merged using this listener, further calls to + * {@link #queryProtectedPartitions(int)} will use the collected data to return a copy of the + * data originally collected from the specified replica, completed with the potentially outdated rows. + */ + UnfilteredPartitionIterators.MergeListener mergeController() + { + return (partitionKey, versions) -> { + + PartitionBuilder[] builders = new PartitionBuilder[sources.length]; + + for (int i = 0; i < sources.length; i++) + builders[i] = new PartitionBuilder(partitionKey, columns(versions), stats(versions)); + + return new UnfilteredRowIterators.MergeListener() + { + @Override + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + // cache the deletion time versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].setDeletionTime(versions[i]); + } + + @Override + public Row onMergedRows(Row merged, Row[] versions) + { + // cache the row versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].addRow(versions[i]); + + if (merged.isEmpty()) + return merged; + + boolean isPotentiallyOutdated = false; + boolean isStatic = merged.isStatic(); + for (int i = 0; i < versions.length; i++) + { + Row version = versions[i]; + if (version == null || (isStatic && version.isEmpty())) + { + isPotentiallyOutdated = true; + BTreeSet.Builder toFetch = getOrCreateToFetch(i, partitionKey); + // Note that for static, we shouldn't add the clustering to the clustering set (the + // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact + // we created a builder in the first place will act as a marker that the static row must be + // fetched, even if no other rows are added for this partition. + if (!isStatic) + toFetch.add(merged.clustering()); + } + } + + // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be + // an outdated result that is only present because other replica have filtered the up-to-date result + // out), then we skip the row. In other words, the results of the initial merging of results by this + // protection assume the worst case scenario where every row that might be outdated actually is. + // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed + // to look at enough data to ultimately fulfill the query limit. + return isPotentiallyOutdated ? null : merged; + } + + @Override + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + // cache the marker versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].addRangeTombstoneMarker(versions[i]); + } + + @Override + public void close() + { + for (int i = 0; i < sources.length; i++) + originalPartitions.get(i).add(builders[i]); + } + }; + }; + } + + private static PartitionColumns columns(List versions) + { + Columns statics = Columns.NONE; + Columns regulars = Columns.NONE; + for (UnfilteredRowIterator iter : versions) + { + if (iter == null) + continue; + + PartitionColumns cols = iter.columns(); + statics = statics.mergeTo(cols.statics); + regulars = regulars.mergeTo(cols.regulars); + } + return new PartitionColumns(statics, regulars); + } + + private static EncodingStats stats(List iterators) + { + EncodingStats stats = EncodingStats.NO_STATS; + for (UnfilteredRowIterator iter : iterators) + { + if (iter == null) + continue; + + stats = stats.mergeWith(iter.stats()); + } + return stats; + } + + private UnfilteredPartitionIterator makeIterator(List builders) + { + return new UnfilteredPartitionIterator() + { + final Iterator iterator = builders.iterator(); + + @Override + public boolean isForThrift() + { + return command.isForThrift(); + } + + @Override + public CFMetaData metadata() + { + return command.metadata(); + } + + @Override + public void close() + { + // nothing to do here + } + + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public UnfilteredRowIterator next() + { + return iterator.next().build(); + } + }; + } + + private class PartitionBuilder + { + private final DecoratedKey partitionKey; + private final PartitionColumns columns; + private final EncodingStats stats; + private DeletionTime deletionTime; + private Row staticRow = Rows.EMPTY_STATIC_ROW; + private final List contents = new ArrayList<>(); + + private PartitionBuilder(DecoratedKey partitionKey, PartitionColumns columns, EncodingStats stats) + { + this.partitionKey = partitionKey; + this.columns = columns; + this.stats = stats; + } + + private void setDeletionTime(DeletionTime deletionTime) + { + this.deletionTime = deletionTime; + } + + private void addRow(Row row) + { + if (row == null) + return; + + if (row.isStatic()) + staticRow = row; + else + contents.add(row); + } + + private void addRangeTombstoneMarker(RangeTombstoneMarker marker) + { + if (marker != null) + contents.add(marker); + } + + private UnfilteredRowIterator build() + { + return new UnfilteredRowIterator() + { + final Iterator iterator = contents.iterator(); + + @Override + public DeletionTime partitionLevelDeletion() + { + return deletionTime; + } + + @Override + public EncodingStats stats() + { + return stats; + } + + @Override + public CFMetaData metadata() + { + return command.metadata(); + } + + @Override + public boolean isReverseOrder() + { + return command.isReversed(); + } + + @Override + public PartitionColumns columns() + { + return columns; + } + + @Override + public DecoratedKey partitionKey() + { + return partitionKey; + } + + @Override + public Row staticRow() + { + return staticRow; + } + + @Override + public void close() + { + // nothing to do here + } + + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public Unfiltered next() + { + return iterator.next(); + } + }; + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java index e80faca806cb390909a99b1b36ccc41e6b16e0b4..ca9bb0982f1a5dc9e4979cd36a91955022089f64 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java @@ -18,6 +18,7 @@ */ package org.apache.cassandra.utils.concurrent; +import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -135,4 +136,16 @@ public class Accumulator implements Iterable throw new IndexOutOfBoundsException(); return (E) values[i]; } + + /** + * Removes all of the elements from this accumulator. + * + * This method is not thread-safe when used concurrently with {@link #add(Object)}. + */ + public void clearUnsafe() + { + nextIndexUpdater.set(this, 0); + presentCountUpdater.set(this, 0); + Arrays.fill(values, null); + } } diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java index 28423740ed5a6a04f771e7b3c4669591b719ae2f..33daca747b2c484df2989fc683bb84e0ca2b7c14 100644 --- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java @@ -81,15 +81,7 @@ public class AccumulatorTest assertEquals("2", accu.get(1)); assertEquals("4", accu.get(2)); - try - { - assertEquals(null, accu.get(3)); - fail(); - } - catch (IndexOutOfBoundsException e) - { - // Expected - } + assertOutOfBonds(accu, 3); accu.add("0"); @@ -103,4 +95,48 @@ public class AccumulatorTest assertEquals("0", iter.next()); assertFalse(iter.hasNext()); } + + @Test + public void testClearUnsafe() + { + Accumulator accu = new Accumulator<>(3); + + accu.add("1"); + accu.add("2"); + accu.add("3"); + + accu.clearUnsafe(); + + assertEquals(0, accu.size()); + assertFalse(accu.iterator().hasNext()); + assertOutOfBonds(accu, 0); + + accu.add("4"); + accu.add("5"); + + assertEquals(2, accu.size()); + + assertEquals("4", accu.get(0)); + assertEquals("5", accu.get(1)); + assertOutOfBonds(accu, 2); + + Iterator iter = accu.iterator(); + assertTrue(iter.hasNext()); + assertEquals("4", iter.next()); + assertEquals("5", iter.next()); + assertFalse(iter.hasNext()); + } + + private static void assertOutOfBonds(Accumulator accumulator, int index) + { + try + { + assertNull(accumulator.get(index)); + fail(); + } + catch (IndexOutOfBoundsException e) + { + // Expected + } + } }