Commit dd255ffa authored by Andrés de la Peña's avatar Andrés de la Peña

Fix replica-side filtering returning stale data with CL > 1

patch by Andres de la Peña; reviewed by Benjamin Lerer, Caleb Rackliffe and ZhaoYang for CASSANDRA-8272
parent c4064dd8
3.0.21 3.0.21
* Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
* Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805) * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805)
* Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667) * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667)
* EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790) * EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790)
......
...@@ -195,6 +195,16 @@ public class DataRange ...@@ -195,6 +195,16 @@ public class DataRange
return clusteringIndexFilter.selectsAllPartition(); return clusteringIndexFilter.selectsAllPartition();
} }
/**
* Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*
* @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*/
public boolean isReversed()
{
return clusteringIndexFilter.isReversed();
}
/** /**
* The clustering index filter to use for the provided key. * The clustering index filter to use for the provided key.
* <p> * <p>
......
...@@ -206,6 +206,11 @@ public class PartitionRangeReadCommand extends ReadCommand ...@@ -206,6 +206,11 @@ public class PartitionRangeReadCommand extends ReadCommand
return DatabaseDescriptor.getRangeRpcTimeout(); return DatabaseDescriptor.getRangeRpcTimeout();
} }
public boolean isReversed()
{
return dataRange.isReversed();
}
public boolean selectsKey(DecoratedKey key) public boolean selectsKey(DecoratedKey key)
{ {
if (!dataRange().contains(key)) if (!dataRange().contains(key))
......
...@@ -330,6 +330,13 @@ public abstract class ReadCommand implements ReadQuery ...@@ -330,6 +330,13 @@ public abstract class ReadCommand implements ReadQuery
protected abstract int oldestUnrepairedTombstone(); protected abstract int oldestUnrepairedTombstone();
/**
* Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*
* @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*/
public abstract boolean isReversed();
public ReadResponse createResponse(UnfilteredPartitionIterator iterator) public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
{ {
// validate that the sequence of RT markers is correct: open is followed by close, deletion times for both // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
......
...@@ -415,6 +415,11 @@ public class SinglePartitionReadCommand extends ReadCommand ...@@ -415,6 +415,11 @@ public class SinglePartitionReadCommand extends ReadCommand
return DatabaseDescriptor.getReadRpcTimeout(); return DatabaseDescriptor.getReadRpcTimeout();
} }
public boolean isReversed()
{
return clusteringIndexFilter.isReversed();
}
public boolean selectsKey(DecoratedKey key) public boolean selectsKey(DecoratedKey key)
{ {
if (!this.partitionKey().equals(key)) if (!this.partitionKey().equals(key))
......
...@@ -199,11 +199,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte ...@@ -199,11 +199,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
{ {
} }
public void onMergedRows(Row merged, Row[] versions) public Row onMergedRows(Row merged, Row[] versions)
{ {
indexTransaction.start(); indexTransaction.start();
indexTransaction.onRowMerge(merged, versions); indexTransaction.onRowMerge(merged, versions);
indexTransaction.commit(); indexTransaction.commit();
return merged;
} }
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions) public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
......
...@@ -126,6 +126,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> ...@@ -126,6 +126,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return false; return false;
} }
protected abstract Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec);
/** /**
* Filters the provided iterator so that only the row satisfying the expression of this filter * Filters the provided iterator so that only the row satisfying the expression of this filter
* are included in the resulting iterator. * are included in the resulting iterator.
...@@ -134,7 +136,23 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> ...@@ -134,7 +136,23 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
* @param nowInSec the time of query in seconds. * @param nowInSec the time of query in seconds.
* @return the filtered iterator. * @return the filtered iterator.
*/ */
public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec); public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
{
return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(iter.metadata(), nowInSec));
}
/**
* Filters the provided iterator so that only the row satisfying the expression of this filter
* are included in the resulting iterator.
*
* @param iter the iterator to filter
* @param nowInSec the time of query in seconds.
* @return the filtered iterator.
*/
public PartitionIterator filter(PartitionIterator iter, CFMetaData metadata, int nowInSec)
{
return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(metadata, nowInSec));
}
/** /**
* Whether the provided row in the provided partition satisfies this filter. * Whether the provided row in the provided partition satisfies this filter.
...@@ -263,20 +281,16 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> ...@@ -263,20 +281,16 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
super(expressions); super(expressions);
} }
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) protected Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec)
{ {
if (expressions.isEmpty())
return iter;
final CFMetaData metadata = iter.metadata();
long numberOfStaticColumnExpressions = expressions.stream().filter(e -> e.column.isStatic()).count(); long numberOfStaticColumnExpressions = expressions.stream().filter(e -> e.column.isStatic()).count();
final boolean filterStaticColumns = numberOfStaticColumnExpressions != 0; final boolean filterStaticColumns = numberOfStaticColumnExpressions != 0;
final boolean filterNonStaticColumns = (expressions.size() - numberOfStaticColumnExpressions) > 0; final boolean filterNonStaticColumns = (expressions.size() - numberOfStaticColumnExpressions) > 0;
class IsSatisfiedFilter extends Transformation<UnfilteredRowIterator> return new Transformation<BaseRowIterator<?>>()
{ {
DecoratedKey pk; DecoratedKey pk;
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
{ {
// The filter might be on static columns, so need to check static row first. // The filter might be on static columns, so need to check static row first.
if (filterStaticColumns && applyToRow(partition.staticRow()) == null) if (filterStaticColumns && applyToRow(partition.staticRow()) == null)
...@@ -286,7 +300,9 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> ...@@ -286,7 +300,9 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
} }
pk = partition.partitionKey(); pk = partition.partitionKey();
UnfilteredRowIterator iterator = Transformation.apply(partition, this); BaseRowIterator<?> iterator = partition instanceof UnfilteredRowIterator
? Transformation.apply((UnfilteredRowIterator) partition, this)
: Transformation.apply((RowIterator) partition, this);
if (filterNonStaticColumns && !iterator.hasNext()) if (filterNonStaticColumns && !iterator.hasNext())
{ {
...@@ -308,9 +324,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> ...@@ -308,9 +324,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return null; return null;
return row; return row;
} }
} };
return Transformation.apply(iter, new IsSatisfiedFilter());
} }
protected RowFilter withNewExpressions(List<Expression> expressions) protected RowFilter withNewExpressions(List<Expression> expressions)
...@@ -326,36 +340,47 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> ...@@ -326,36 +340,47 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
super(expressions); super(expressions);
} }
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec) protected Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec)
{ {
if (expressions.isEmpty()) // Thrift does not filter rows, it filters entire partition if any of the expression is not
return iter; // satisfied, which forces us to materialize the result (in theory we could materialize only
// what we need which might or might not be everything, but we keep it simple since in practice
class IsSatisfiedThriftFilter extends Transformation<UnfilteredRowIterator> // it's not worth that it has ever been).
return new Transformation<BaseRowIterator<?>>()
{ {
@Override protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) {
return partition instanceof UnfilteredRowIterator ? applyTo((UnfilteredRowIterator) partition)
: applyTo((RowIterator) partition);
}
private UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
{
ImmutableBTreePartition result = ImmutableBTreePartition.create(partition);
partition.close();
return accepts(result) ? result.unfilteredIterator() : null;
}
private RowIterator applyTo(RowIterator partition)
{ {
// Thrift does not filter rows, it filters entire partition if any of the expression is not FilteredPartition result = FilteredPartition.create(partition);
// satisfied, which forces us to materialize the result (in theory we could materialize only return accepts(result) ? result.rowIterator() : null;
// what we need which might or might not be everything, but we keep it simple since in practice }
// it's not worth that it has ever been).
ImmutableBTreePartition result = ImmutableBTreePartition.create(iter);
iter.close();
private boolean accepts(ImmutableBTreePartition result)
{
// The partition needs to have a row for every expression, and the expression needs to be valid. // The partition needs to have a row for every expression, and the expression needs to be valid.
for (Expression expr : expressions) for (Expression expr : expressions)
{ {
assert expr instanceof ThriftExpression; assert expr instanceof ThriftExpression;
Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes)); Row row = result.getRow(makeCompactClustering(metadata, expr.column().name.bytes));
if (row == null || !expr.isSatisfiedBy(iter.metadata(), iter.partitionKey(), row)) if (row == null || !expr.isSatisfiedBy(metadata, result.partitionKey(), row))
return null; return false;
} }
// If we get there, it means all expressions where satisfied, so return the original result // If we get there, it means all expressions where satisfied, so return the original result
return result.unfilteredIterator(); return true;
} }
} };
return Transformation.apply(iter, new IsSatisfiedThriftFilter());
} }
protected RowFilter withNewExpressions(List<Expression> expressions) protected RowFilter withNewExpressions(List<Expression> expressions)
......
...@@ -27,6 +27,7 @@ import org.apache.cassandra.db.*; ...@@ -27,6 +27,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.FilteredPartitions; import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.MorePartitions;
import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputPlus;
...@@ -47,7 +48,7 @@ public abstract class UnfilteredPartitionIterators ...@@ -47,7 +48,7 @@ public abstract class UnfilteredPartitionIterators
public interface MergeListener public interface MergeListener
{ {
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions); public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions);
public void close(); public default void close() {}
} }
@SuppressWarnings("resource") // The created resources are returned right away @SuppressWarnings("resource") // The created resources are returned right away
...@@ -77,6 +78,24 @@ public abstract class UnfilteredPartitionIterators ...@@ -77,6 +78,24 @@ public abstract class UnfilteredPartitionIterators
return Transformation.apply(toReturn, new Close()); return Transformation.apply(toReturn, new Close());
} }
public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators)
{
if (iterators.size() == 1)
return iterators.get(0);
class Extend implements MorePartitions<UnfilteredPartitionIterator>
{
int i = 1;
public UnfilteredPartitionIterator moreContents()
{
if (i >= iterators.size())
return null;
return iterators.get(i++);
}
}
return MorePartitions.extend(iterators.get(0), new Extend());
}
public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec) public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
{ {
return FilteredPartitions.filter(iterator, nowInSec); return FilteredPartitions.filter(iterator, nowInSec);
...@@ -84,7 +103,6 @@ public abstract class UnfilteredPartitionIterators ...@@ -84,7 +103,6 @@ public abstract class UnfilteredPartitionIterators
public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener listener) public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener listener)
{ {
assert listener != null;
assert !iterators.isEmpty(); assert !iterators.isEmpty();
final boolean isForThrift = iterators.get(0).isForThrift(); final boolean isForThrift = iterators.get(0).isForThrift();
...@@ -109,7 +127,9 @@ public abstract class UnfilteredPartitionIterators ...@@ -109,7 +127,9 @@ public abstract class UnfilteredPartitionIterators
protected UnfilteredRowIterator getReduced() protected UnfilteredRowIterator getReduced()
{ {
UnfilteredRowIterators.MergeListener rowListener = listener.getRowMergeListener(partitionKey, toMerge); UnfilteredRowIterators.MergeListener rowListener = listener == null
? null
: listener.getRowMergeListener(partitionKey, toMerge);
// Replace nulls by empty iterators // Replace nulls by empty iterators
for (int i = 0; i < toMerge.size(); i++) for (int i = 0; i < toMerge.size(); i++)
...@@ -153,7 +173,9 @@ public abstract class UnfilteredPartitionIterators ...@@ -153,7 +173,9 @@ public abstract class UnfilteredPartitionIterators
public void close() public void close()
{ {
merged.close(); merged.close();
listener.close();
if (listener != null)
listener.close();
} }
}; };
} }
......
...@@ -72,12 +72,17 @@ public abstract class UnfilteredRowIterators ...@@ -72,12 +72,17 @@ public abstract class UnfilteredRowIterators
* particular, this may be called in cases where there is no row in the merged output (if a source has a row * particular, this may be called in cases where there is no row in the merged output (if a source has a row
* that is shadowed by another source range tombstone or partition level deletion). * that is shadowed by another source range tombstone or partition level deletion).
* *
* @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a * @param merged the result of the merge. This cannot be {@code null} (so that listener can always access the
* placeholder for when at least one source has a row, but that row is shadowed in the merged output. * clustering from this safely)but can be empty, in which case this is a placeholder for when at least one
* source has a row, but that row is shadowed in the merged output.
* @param versions for each source, the row in that source corresponding to {@code merged}. This can be * @param versions for each source, the row in that source corresponding to {@code merged}. This can be
* {@code null} for some sources if the source has not such row. * {@code null} for some sources if the source has not such row.
* @return the row to use as result of the merge (can be {@code null}). Most implementations should simply
* return {@code merged}, but this allows some implementations to impact the merge result if necessary. If this
* returns either {@code null} or an empty row, then the row is skipped from the merge result. If this returns a
* non {@code null} result, then the returned row <b>must</b> have the same clustering than {@code merged}.
*/ */
public void onMergedRows(Row merged, Row[] versions); public Row onMergedRows(Row merged, Row[] versions);
/** /**
* Called once for every range tombstone marker participating in the merge. * Called once for every range tombstone marker participating in the merge.
...@@ -500,9 +505,12 @@ public abstract class UnfilteredRowIterators ...@@ -500,9 +505,12 @@ public abstract class UnfilteredRowIterators
Row merged = merger.merge(partitionDeletion); Row merged = merger.merge(partitionDeletion);
if (merged == null) if (merged == null)
merged = Rows.EMPTY_STATIC_ROW; merged = Rows.EMPTY_STATIC_ROW;
if (listener != null) if (listener == null)
listener.onMergedRows(merged, merger.mergedRows()); return merged;
return merged;
merged = listener.onMergedRows(merged, merger.mergedRows());
// Note that onMergedRows can have returned null even though his input wasn't null
return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
} }
private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators) private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
...@@ -586,9 +594,15 @@ public abstract class UnfilteredRowIterators ...@@ -586,9 +594,15 @@ public abstract class UnfilteredRowIterators
if (nextKind == Unfiltered.Kind.ROW) if (nextKind == Unfiltered.Kind.ROW)
{ {
Row merged = rowMerger.merge(markerMerger.activeDeletion()); Row merged = rowMerger.merge(markerMerger.activeDeletion());
if (listener != null) if (listener == null)
listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, rowMerger.mergedRows()); return merged;
return merged;
merged = listener.onMergedRows(merged == null
? BTreeRow.emptyRow(rowMerger.mergedClustering())
: merged,
rowMerger.mergedRows());
return merged == null || merged.isEmpty() ? null : merged;
} }
else else
{ {
......
...@@ -153,6 +153,7 @@ public class TableMetrics ...@@ -153,6 +153,7 @@ public class TableMetrics
public final Meter readRepairRequests; public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests; public final Meter shortReadProtectionRequests;
public final Meter replicaSideFilteringProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/** /**
...@@ -649,8 +650,9 @@ public class TableMetrics ...@@ -649,8 +650,9 @@ public class TableMetrics
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose); casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit); casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests")); readRepairRequests = createTableMeter("ReadRepairRequests");
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests")); shortReadProtectionRequests = createTableMeter("ShortReadProtectionRequests");
replicaSideFilteringProtectionRequests = createTableMeter("ReplicaSideFilteringProtectionRequests");
} }
public void updateSSTableIterated(int count) public void updateSSTableIterated(int count)
...@@ -758,6 +760,18 @@ public class TableMetrics ...@@ -758,6 +760,18 @@ public class TableMetrics
return cfCounter; return cfCounter;
} }
private Meter createTableMeter(final String name)
{
return createTableMeter(name, name);
}
private Meter createTableMeter(final String name, final String alias)
{
Meter tableMeter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias));
register(name, alias, tableMeter);
return tableMeter;
}
/** /**
* Create a histogram-like interface that will register both a CF, keyspace and global level * Create a histogram-like interface that will register both a CF, keyspace and global level
* histogram and forward any updates to both * histogram and forward any updates to both
......
...@@ -20,6 +20,7 @@ package org.apache.cassandra.service; ...@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.UnaryOperator;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
...@@ -80,17 +81,126 @@ public class DataResolver extends ResponseResolver ...@@ -80,17 +81,126 @@ public class DataResolver extends ResponseResolver
public PartitionIterator resolve() public PartitionIterator resolve()
{ {
if (!needsReplicaFilteringProtection())
{
ResolveContext context = new ResolveContext(responses.size());
return resolveWithReadRepair(context,
i -> shortReadProtectedResponse(i, context),
UnaryOperator.identity());
}
return resolveWithReplicaFilteringProtection();
}
private boolean needsReplicaFilteringProtection()
{
return !command.rowFilter().isEmpty();
}
private class ResolveContext
{
private final InetAddress[] sources;
private final DataLimits.Counter mergedResultCounter;
private ResolveContext(int count)
{
assert count <= responses.size();
this.sources = new InetAddress[count];
for (int i = 0; i < count; i++)
sources[i] = responses.get(i).from;
this.mergedResultCounter = command.limits().newCounter(command.nowInSec(),
true,
command.selectsFullPartition(),
enforceStrictLiveness);
}
private boolean needShortReadProtection()
{
// If we have only one result, there is no read repair to do and we can't get short reads
// Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
// partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
// don't bother protecting against short reads.
return sources.length > 1 && !command.limits().isUnlimited();
}
}
@FunctionalInterface
private interface ResponseProvider