Commit 69ea5ffd authored by Andrés de la Peña's avatar Andrés de la Peña

Merge branch 'cassandra-3.11' into trunk

# Conflicts:
#	CHANGES.txt
#	src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
#	src/java/org/apache/cassandra/db/ReadCommand.java
#	src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
#	src/java/org/apache/cassandra/db/filter/RowFilter.java
#	src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
#	src/java/org/apache/cassandra/metrics/TableMetrics.java
#	src/java/org/apache/cassandra/service/DataResolver.java
#	src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
parents da8fe7a2 7014cd5a
......@@ -39,6 +39,7 @@
Merged from 3.11:
* Fix CQL formatting of read command restrictions for slow query log (CASSANDRA-15503)
Merged from 3.0:
* Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
* Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667)
* EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790)
* Fix index queries on partition key columns when some partitions contains only static data (CASSANDRA-13666)
......
......@@ -195,6 +195,16 @@ public class DataRange
return clusteringIndexFilter.selectsAllPartition();
}
/**
* Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*
* @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*/
public boolean isReversed()
{
return clusteringIndexFilter.isReversed();
}
/**
* The clustering index filter to use for the provided key.
* <p>
......
......@@ -240,6 +240,11 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
return DatabaseDescriptor.getRangeRpcTimeout(unit);
}
public boolean isReversed()
{
return dataRange.isReversed();
}
public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
{
return StorageProxy.getRangeSlice(this, consistency, queryStartNanoTime);
......
......@@ -361,6 +361,12 @@ public abstract class ReadCommand extends AbstractReadQuery
return oldestUnrepairedTombstone;
}
/**
* Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*
* @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*/
public abstract boolean isReversed();
@SuppressWarnings("resource")
public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
......
......@@ -363,6 +363,11 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
return DatabaseDescriptor.getReadRpcTimeout(unit);
}
public boolean isReversed()
{
return clusteringIndexFilter.isReversed();
}
@Override
public SinglePartitionReadCommand forPaging(Clustering lastReturned, DataLimits limits)
{
......
......@@ -206,11 +206,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
{
}
public void onMergedRows(Row merged, Row[] versions)
public Row onMergedRows(Row merged, Row[] versions)
{
indexTransaction.start();
indexTransaction.onRowMerge(merged, versions);
indexTransaction.commit();
return merged;
}
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
......
......@@ -32,6 +32,7 @@ import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.Transformation;
......@@ -128,6 +129,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return false;
}
protected abstract Transformation<BaseRowIterator<?>> filter(TableMetadata metadata, int nowInSec);
/**
* Filters the provided iterator so that only the row satisfying the expression of this filter
* are included in the resulting iterator.
......@@ -136,7 +139,23 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
* @param nowInSec the time of query in seconds.
* @return the filtered iterator.
*/
public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec);
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
{
return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(iter.metadata(), nowInSec));
}
/**
* Filters the provided iterator so that only the row satisfying the expression of this filter
* are included in the resulting iterator.
*
* @param iter the iterator to filter
* @param nowInSec the time of query in seconds.
* @return the filtered iterator.
*/
public PartitionIterator filter(PartitionIterator iter, TableMetadata metadata, int nowInSec)
{
return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(metadata, nowInSec));
}
/**
* Whether the provided row in the provided partition satisfies this filter.
......@@ -256,13 +275,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
super(expressions);
}
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
protected Transformation<BaseRowIterator<?>> filter(TableMetadata metadata, int nowInSec)
{
if (expressions.isEmpty())
return iter;
final TableMetadata metadata = iter.metadata();
List<Expression> partitionLevelExpressions = new ArrayList<>();
List<Expression> rowLevelExpressions = new ArrayList<>();
for (Expression e: expressions)
......@@ -276,12 +290,12 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
long numberOfRegularColumnExpressions = rowLevelExpressions.size();
final boolean filterNonStaticColumns = numberOfRegularColumnExpressions > 0;
class IsSatisfiedFilter extends Transformation<UnfilteredRowIterator>
return new Transformation<BaseRowIterator<?>>()
{
DecoratedKey pk;
@SuppressWarnings("resource")
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
{
pk = partition.partitionKey();
......@@ -293,7 +307,10 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return null;
}
UnfilteredRowIterator iterator = Transformation.apply(partition, this);
BaseRowIterator<?> iterator = partition instanceof UnfilteredRowIterator
? Transformation.apply((UnfilteredRowIterator) partition, this)
: Transformation.apply((RowIterator) partition, this);
if (filterNonStaticColumns && !iterator.hasNext())
{
iterator.close();
......@@ -315,9 +332,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return row;
}
}
return Transformation.apply(iter, new IsSatisfiedFilter());
};
}
protected RowFilter withNewExpressions(List<Expression> expressions)
......
......@@ -83,6 +83,18 @@ public abstract class PartitionIterators
return new SingletonPartitionIterator(iterator);
}
public static void consume(PartitionIterator iterator)
{
while (iterator.hasNext())
{
try (RowIterator partition = iterator.next())
{
while (partition.hasNext())
partition.next();
}
}
}
/**
* Wraps the provided iterator so it logs the returned rows for debugging purposes.
* <p>
......
......@@ -46,17 +46,9 @@ public abstract class UnfilteredPartitionIterators
public interface MergeListener
{
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions);
public void close();
public default void close() {}
public static MergeListener NOOP = new MergeListener()
{
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
{
return UnfilteredRowIterators.MergeListener.NOOP;
}
public void close() {}
};
public static MergeListener NOOP = (partitionKey, versions) -> UnfilteredRowIterators.MergeListener.NOOP;
}
@SuppressWarnings("resource") // The created resources are returned right away
......@@ -112,7 +104,6 @@ public abstract class UnfilteredPartitionIterators
@SuppressWarnings("resource")
public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final MergeListener listener)
{
assert listener != null;
assert !iterators.isEmpty();
final TableMetadata metadata = iterators.get(0).metadata();
......@@ -137,7 +128,9 @@ public abstract class UnfilteredPartitionIterators
@SuppressWarnings("resource")
protected UnfilteredRowIterator getReduced()
{
UnfilteredRowIterators.MergeListener rowListener = listener.getRowMergeListener(partitionKey, toMerge);
UnfilteredRowIterators.MergeListener rowListener = listener == null
? null
: listener.getRowMergeListener(partitionKey, toMerge);
// Make a single empty iterator object to merge, we don't need toMerge.size() copiess
UnfilteredRowIterator empty = null;
......@@ -185,7 +178,9 @@ public abstract class UnfilteredPartitionIterators
public void close()
{
merged.close();
listener.close();
if (listener != null)
listener.close();
}
};
}
......
......@@ -70,12 +70,17 @@ public abstract class UnfilteredRowIterators
* particular, this may be called in cases where there is no row in the merged output (if a source has a row
* that is shadowed by another source range tombstone or partition level deletion).
*
* @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a
* placeholder for when at least one source has a row, but that row is shadowed in the merged output.
* @param merged the result of the merge. This cannot be {@code null} (so that listener can always access the
* clustering from this safely)but can be empty, in which case this is a placeholder for when at least one
* source has a row, but that row is shadowed in the merged output.
* @param versions for each source, the row in that source corresponding to {@code merged}. This can be
* {@code null} for some sources if the source has not such row.
* @return the row to use as result of the merge (can be {@code null}). Most implementations should simply
* return {@code merged}, but this allows some implementations to impact the merge result if necessary. If this
* returns either {@code null} or an empty row, then the row is skipped from the merge result. If this returns a
* non {@code null} result, then the returned row <b>must</b> have the same clustering than {@code merged}.
*/
public void onMergedRows(Row merged, Row[] versions);
public Row onMergedRows(Row merged, Row[] versions);
/**
* Called once for every range tombstone marker participating in the merge.
......@@ -98,7 +103,7 @@ public abstract class UnfilteredRowIterators
{
public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) {}
public void onMergedRows(Row merged, Row[] versions) {}
public Row onMergedRows(Row merged, Row[] versions) {return merged;}
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) {}
......@@ -489,9 +494,12 @@ public abstract class UnfilteredRowIterators
Row merged = merger.merge(partitionDeletion);
if (merged == null)
merged = Rows.EMPTY_STATIC_ROW;
if (listener != null)
listener.onMergedRows(merged, merger.mergedRows());
return merged;
if (listener == null)
return merged;
merged = listener.onMergedRows(merged, merger.mergedRows());
// Note that onMergedRows can have returned null even though his input wasn't null
return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
}
private static RegularAndStaticColumns collectColumns(List<UnfilteredRowIterator> iterators)
......@@ -567,9 +575,15 @@ public abstract class UnfilteredRowIterators
if (nextKind == Unfiltered.Kind.ROW)
{
Row merged = rowMerger.merge(markerMerger.activeDeletion());
if (listener != null)
listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, rowMerger.mergedRows());
return merged;
if (listener == null)
return merged;
merged = listener.onMergedRows(merged == null
? BTreeRow.emptyRow(rowMerger.mergedClustering())
: merged,
rowMerger.mergedRows());
return merged == null || merged.isEmpty() ? null : merged;
}
else
{
......
......@@ -329,6 +329,7 @@ public class TableMetrics
public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
public final Meter replicaSideFilteringProtectionRequests;
public final EnumMap<SamplerType, Sampler<?>> samplers;
/**
......@@ -949,8 +950,9 @@ public class TableMetrics
return 0.0;
});
readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
readRepairRequests = createTableMeter("ReadRepairRequests");
shortReadProtectionRequests = createTableMeter("ShortReadProtectionRequests");
replicaSideFilteringProtectionRequests = createTableMeter("ReplicaSideFilteringProtectionRequests");
confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies);
unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
......@@ -1075,6 +1077,18 @@ public class TableMetrics
return cfCounter;
}
private Meter createTableMeter(final String name)
{
return createTableMeter(name, name);
}
private Meter createTableMeter(final String name, final String alias)
{
Meter tableMeter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias));
register(name, alias, tableMeter);
return tableMeter;
}
/**
* Computes the compression ratio for the specified SSTables
*
......
......@@ -21,16 +21,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.UnaryOperator;
import com.google.common.base.Joiner;
import com.google.common.collect.Collections2;
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
......@@ -41,9 +43,11 @@ import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
import org.apache.cassandra.db.transform.Filter;
import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.index.sasi.SASIIndex;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
......@@ -74,7 +78,6 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
return !responses.isEmpty();
}
@SuppressWarnings("resource")
public PartitionIterator resolve()
{
// We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
......@@ -82,10 +85,7 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
Collection<Message<ReadResponse>> messages = responses.snapshot();
assert !any(messages, msg -> msg.payload.isDigestResponse());
E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from()), false);
List<UnfilteredPartitionIterator> iters = new ArrayList<>(
Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
assert replicas.size() == iters.size();
E replicas = replicaPlan().candidates().select(transform(messages, Message::from), false);
// If requested, inspect each response for a digest of the replica's repaired data set
RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
......@@ -103,6 +103,157 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
});
}
if (!needsReplicaFilteringProtection())
{
ResolveContext context = new ResolveContext(replicas);
return resolveWithReadRepair(context,
i -> shortReadProtectedResponse(i, context),
UnaryOperator.identity(),
repairedDataTracker);
}
return resolveWithReplicaFilteringProtection(replicas, repairedDataTracker);
}
private boolean needsReplicaFilteringProtection()
{
if (command.rowFilter().isEmpty())
return false;
IndexMetadata indexDef = command.indexMetadata();
if (indexDef != null && indexDef.isCustom())
{
String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
return !SASIIndex.class.getName().equals(className);
}
return true;
}
private class ResolveContext
{
private final E replicas;
private final DataLimits.Counter mergedResultCounter;
private ResolveContext(E replicas)
{
this.replicas = replicas;
this.mergedResultCounter = command.limits().newCounter(command.nowInSec(),
true,
command.selectsFullPartition(),
enforceStrictLiveness);
}
private boolean needsReadRepair()
{
return replicas.size() > 1;
}
private boolean needShortReadProtection()
{
// If we have only one result, there is no read repair to do and we can't get short reads
// Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
// partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
// don't bother protecting against short reads.
return replicas.size() > 1 && !command.limits().isUnlimited();
}
}
@FunctionalInterface
private interface ResponseProvider
{
UnfilteredPartitionIterator getResponse(int i);
}
private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context)
{
UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command);
return context.needShortReadProtection()
? ShortReadProtection.extend(context.replicas.get(i),
originalResponse,
command,
context.mergedResultCounter,
queryStartNanoTime,
enforceStrictLiveness)
: originalResponse;
}
private PartitionIterator resolveWithReadRepair(ResolveContext context,
ResponseProvider responseProvider,
UnaryOperator<PartitionIterator> preCountFilter,
RepairedDataTracker repairedDataTracker)
{
UnfilteredPartitionIterators.MergeListener listener = null;
if (context.needsReadRepair())
{
P sources = replicaPlan.getWithContacts(context.replicas);
listener = wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker);
}
return resolveInternal(context, listener, responseProvider, preCountFilter);
}
@SuppressWarnings("resource")
private PartitionIterator resolveWithReplicaFilteringProtection(E replicas, RepairedDataTracker repairedDataTracker)
{
// Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that
// wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version
// of that row) works in 3 steps:
// 1) we read the full response just to collect rows that may be outdated (the ones we got from some
// replica but didn't got any response for other; it could be those other replica have filtered a more
// up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the
// query limit. This simulate the worst case scenario where all those "potentially outdated" rows are
// indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read
// protection).
// 2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated
// or not.
// 3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair,
// but where for each replica we use their original response _plus_ the additional rows queried in the
// previous step (and apply the command#rowFilter() on the full result). Since the first phase has
// pessimistically collected enough results for the case where all potentially outdated results are indeed
// outdated, we shouldn't need further short-read protection requests during this phase.
// We need separate contexts, as each context has his own counter
ResolveContext firstPhaseContext = new ResolveContext(replicas);
ResolveContext secondPhaseContext = new ResolveContext(replicas);
ReplicaFilteringProtection<E> rfp = new ReplicaFilteringProtection<>(replicaPlan().keyspace(),
command,
replicaPlan().consistencyLevel(),
queryStartNanoTime,
firstPhaseContext.replicas);
PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
rfp.mergeController(),
i -> shortReadProtectedResponse(i, firstPhaseContext),
UnaryOperator.identity());
// Consume the first phase partitions to populate the replica filtering protection with both those materialized
// partitions and the primary keys to be fetched.
PartitionIterators.consume(firstPhasePartitions);
firstPhasePartitions.close();
// After reading the entire query results the protection helper should have cached all the partitions so we can
// clear the responses accumulator for the sake of memory usage, given that the second phase might take long if
// it needs to query replicas.
responses.clearUnsafe();
return resolveWithReadRepair(secondPhaseContext,
rfp::queryProtectedPartitions,
results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec()),
repairedDataTracker);
}
@SuppressWarnings("resource")
private PartitionIterator resolveInternal(ResolveContext context,
UnfilteredPartitionIterators.MergeListener mergeListener,
ResponseProvider responseProvider,
UnaryOperator<PartitionIterator> preCountFilter)
{
int count = context.replicas.size();
List<UnfilteredPartitionIterator> results = new ArrayList<>(count);
for (int i = 0; i < count; i++)
results.add(responseProvider.getResponse(i));
/*
* Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
* have more rows than the client requested. To make sure that we still conform to the original limit,
......@@ -116,15 +267,10 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
*
* See CASSANDRA-13747 for more details.
*/
DataLimits.Counter mergedResultCounter =
command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
replicaPlan.getWithContacts(replicas),
mergedResultCounter,
repairedDataTracker);
UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, mergeListener);
FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter);
return Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
......@@ -133,27 +279,6 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
return RepairedDataVerifier.verifier(command);
}
private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
P sources,
DataLimits.Counter mergedResultCounter,
RepairedDataTracker repairedDataTracker)
{
// If we have only one results, there is no read repair to do, we can't get short
// reads and we can't make a comparison between repaired data sets
if (results.size() == 1)
return results.get(0);