Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Open sidebar
cld
systems
cassandra
Commits
7014cd5a
Commit
7014cd5a
authored
Jun 05, 2020
by
Andrés de la Peña
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'cassandra-3.0' into cassandra-3.11
# Conflicts: # CHANGES.txt # src/java/org/apache/cassandra/db/filter/RowFilter.java
parents
056c9eff
dd255ffa
Changes
14
Hide whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
800 additions
and
94 deletions
+800
-94
CHANGES.txt
CHANGES.txt
+1
-0
src/java/org/apache/cassandra/db/DataRange.java
src/java/org/apache/cassandra/db/DataRange.java
+10
-0
src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
...va/org/apache/cassandra/db/PartitionRangeReadCommand.java
+5
-0
src/java/org/apache/cassandra/db/ReadCommand.java
src/java/org/apache/cassandra/db/ReadCommand.java
+7
-0
src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
...a/org/apache/cassandra/db/SinglePartitionReadCommand.java
+5
-0
src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
...rg/apache/cassandra/db/compaction/CompactionIterator.java
+2
-1
src/java/org/apache/cassandra/db/filter/RowFilter.java
src/java/org/apache/cassandra/db/filter/RowFilter.java
+59
-33
src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
...cassandra/db/partitions/UnfilteredPartitionIterators.java
+7
-4
src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
.../org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+23
-9
src/java/org/apache/cassandra/metrics/TableMetrics.java
src/java/org/apache/cassandra/metrics/TableMetrics.java
+16
-2
src/java/org/apache/cassandra/service/DataResolver.java
src/java/org/apache/cassandra/service/DataResolver.java
+139
-36
src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
.../apache/cassandra/service/ReplicaFilteringProtection.java
+468
-0
src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
...va/org/apache/cassandra/utils/concurrent/Accumulator.java
+13
-0
test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
...rg/apache/cassandra/utils/concurrent/AccumulatorTest.java
+45
-9
No files found.
CHANGES.txt
View file @
7014cd5a
...
...
@@ -2,6 +2,7 @@
* Fix CQL formatting of read command restrictions for slow query log (CASSANDRA-15503)
* Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
Merged from 3.0:
* Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
* Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805)
* Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667)
* EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790)
...
...
src/java/org/apache/cassandra/db/DataRange.java
View file @
7014cd5a
...
...
@@ -195,6 +195,16 @@ public class DataRange
return
clusteringIndexFilter
.
selectsAllPartition
();
}
/**
* Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*
* @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*/
public
boolean
isReversed
()
{
return
clusteringIndexFilter
.
isReversed
();
}
/**
* The clustering index filter to use for the provided key.
* <p>
...
...
src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
View file @
7014cd5a
...
...
@@ -241,6 +241,11 @@ public class PartitionRangeReadCommand extends ReadCommand
return
DatabaseDescriptor
.
getRangeRpcTimeout
();
}
public
boolean
isReversed
()
{
return
dataRange
.
isReversed
();
}
public
boolean
selectsKey
(
DecoratedKey
key
)
{
if
(!
dataRange
().
contains
(
key
))
...
...
src/java/org/apache/cassandra/db/ReadCommand.java
View file @
7014cd5a
...
...
@@ -342,6 +342,13 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
protected
abstract
int
oldestUnrepairedTombstone
();
/**
* Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*
* @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*/
public
abstract
boolean
isReversed
();
public
ReadResponse
createResponse
(
UnfilteredPartitionIterator
iterator
)
{
// validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
...
...
src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
View file @
7014cd5a
...
...
@@ -433,6 +433,11 @@ public class SinglePartitionReadCommand extends ReadCommand
return
DatabaseDescriptor
.
getReadRpcTimeout
();
}
public
boolean
isReversed
()
{
return
clusteringIndexFilter
.
isReversed
();
}
public
boolean
selectsKey
(
DecoratedKey
key
)
{
if
(!
this
.
partitionKey
().
equals
(
key
))
...
...
src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
View file @
7014cd5a
...
...
@@ -205,11 +205,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
{
}
public
void
onMergedRows
(
Row
merged
,
Row
[]
versions
)
public
Row
onMergedRows
(
Row
merged
,
Row
[]
versions
)
{
indexTransaction
.
start
();
indexTransaction
.
onRowMerge
(
merged
,
versions
);
indexTransaction
.
commit
();
return
merged
;
}
public
void
onMergedRangeTombstoneMarkers
(
RangeTombstoneMarker
mergedMarker
,
RangeTombstoneMarker
[]
versions
)
...
...
src/java/org/apache/cassandra/db/filter/RowFilter.java
View file @
7014cd5a
...
...
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap;
import
java.util.concurrent.atomic.AtomicInteger
;
import
com.google.common.base.Objects
;
import
com.google.common.collect.Iterables
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -35,7 +34,9 @@ import org.apache.cassandra.cql3.Operator;
import
org.apache.cassandra.db.*
;
import
org.apache.cassandra.db.context.*
;
import
org.apache.cassandra.db.marshal.*
;
import
org.apache.cassandra.db.partitions.FilteredPartition
;
import
org.apache.cassandra.db.partitions.ImmutableBTreePartition
;
import
org.apache.cassandra.db.partitions.PartitionIterator
;
import
org.apache.cassandra.db.partitions.UnfilteredPartitionIterator
;
import
org.apache.cassandra.db.rows.*
;
import
org.apache.cassandra.db.transform.Transformation
;
...
...
@@ -142,6 +143,21 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return
false
;
}
protected
abstract
Transformation
<
BaseRowIterator
<?>>
filter
(
CFMetaData
metadata
,
int
nowInSec
);
/**
* Filters the provided iterator so that only the row satisfying the expression of this filter
* are included in the resulting iterator.
*
* @param iter the iterator to filter
* @param nowInSec the time of query in seconds.
* @return the filtered iterator.
*/
public
UnfilteredPartitionIterator
filter
(
UnfilteredPartitionIterator
iter
,
int
nowInSec
)
{
return
expressions
.
isEmpty
()
?
iter
:
Transformation
.
apply
(
iter
,
filter
(
iter
.
metadata
(),
nowInSec
));
}
/**
* Filters the provided iterator so that only the row satisfying the expression of this filter
* are included in the resulting iterator.
...
...
@@ -150,7 +166,10 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
* @param nowInSec the time of query in seconds.
* @return the filtered iterator.
*/
public
abstract
UnfilteredPartitionIterator
filter
(
UnfilteredPartitionIterator
iter
,
int
nowInSec
);
public
PartitionIterator
filter
(
PartitionIterator
iter
,
CFMetaData
metadata
,
int
nowInSec
)
{
return
expressions
.
isEmpty
()
?
iter
:
Transformation
.
apply
(
iter
,
filter
(
metadata
,
nowInSec
));
}
/**
* Whether the provided row in the provided partition satisfies this filter.
...
...
@@ -284,13 +303,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
super
(
expressions
);
}
p
ublic
UnfilteredPartition
Iterator
filter
(
UnfilteredPartitionIterator
iter
,
int
nowInSec
)
p
rotected
Transformation
<
BaseRow
Iterator
<?>>
filter
(
CFMetaData
metadata
,
int
nowInSec
)
{
if
(
expressions
.
isEmpty
())
return
iter
;
final
CFMetaData
metadata
=
iter
.
metadata
();
List
<
Expression
>
partitionLevelExpressions
=
new
ArrayList
<>();
List
<
Expression
>
rowLevelExpressions
=
new
ArrayList
<>();
for
(
Expression
e:
expressions
)
...
...
@@ -304,10 +318,10 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
long
numberOfRegularColumnExpressions
=
rowLevelExpressions
.
size
();
final
boolean
filterNonStaticColumns
=
numberOfRegularColumnExpressions
>
0
;
class
IsSatisfiedFilter
extends
Transformation
<
Unfiltered
RowIterator
>
return
new
Transformation
<
Base
RowIterator
<?>>()
{
DecoratedKey
pk
;
p
ublic
Unfiltered
RowIterator
applyToPartition
(
Unfiltered
RowIterator
partition
)
p
rotected
Base
RowIterator
<?>
applyToPartition
(
Base
RowIterator
<?>
partition
)
{
pk
=
partition
.
partitionKey
();
...
...
@@ -319,7 +333,10 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return
null
;
}
UnfilteredRowIterator
iterator
=
Transformation
.
apply
(
partition
,
this
);
BaseRowIterator
<?>
iterator
=
partition
instanceof
UnfilteredRowIterator
?
Transformation
.
apply
((
UnfilteredRowIterator
)
partition
,
this
)
:
Transformation
.
apply
((
RowIterator
)
partition
,
this
);
if
(
filterNonStaticColumns
&&
!
iterator
.
hasNext
())
{
iterator
.
close
();
...
...
@@ -341,9 +358,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return
row
;
}
}
return
Transformation
.
apply
(
iter
,
new
IsSatisfiedFilter
());
};
}
protected
RowFilter
withNewExpressions
(
List
<
Expression
>
expressions
)
...
...
@@ -359,36 +374,47 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
super
(
expressions
);
}
p
ublic
UnfilteredPartition
Iterator
filter
(
UnfilteredPartitionIterator
iter
,
final
int
nowInSec
)
p
rotected
Transformation
<
BaseRow
Iterator
<?>>
filter
(
CFMetaData
metadata
,
int
nowInSec
)
{
if
(
expressions
.
isEmpty
())
return
iter
;
class
IsSatisfiedThriftFilter
extends
Transformation
<
UnfilteredRowIterator
>
// Thrift does not filter rows, it filters entire partition if any of the expression is not
// satisfied, which forces us to materialize the result (in theory we could materialize only
// what we need which might or might not be everything, but we keep it simple since in practice
// it's not worth that it has ever been).
return
new
Transformation
<
BaseRowIterator
<?>>()
{
@Override
public
UnfilteredRowIterator
applyToPartition
(
UnfilteredRowIterator
iter
)
protected
BaseRowIterator
<?>
applyToPartition
(
BaseRowIterator
<?>
partition
)
{
// Thrift does not filter rows, it filters entire partition if any of the expression is not
// satisfied, which forces us to materialize the result (in theory we could materialize only
// what we need which might or might not be everything, but we keep it simple since in practice
// it's not worth that it has ever been).
ImmutableBTreePartition
result
=
ImmutableBTreePartition
.
create
(
iter
);
iter
.
close
();
return
partition
instanceof
UnfilteredRowIterator
?
applyTo
((
UnfilteredRowIterator
)
partition
)
:
applyTo
((
RowIterator
)
partition
);
}
private
UnfilteredRowIterator
applyTo
(
UnfilteredRowIterator
partition
)
{
ImmutableBTreePartition
result
=
ImmutableBTreePartition
.
create
(
partition
);
partition
.
close
();
return
accepts
(
result
)
?
result
.
unfilteredIterator
()
:
null
;
}
private
RowIterator
applyTo
(
RowIterator
partition
)
{
FilteredPartition
result
=
FilteredPartition
.
create
(
partition
);
return
accepts
(
result
)
?
result
.
rowIterator
()
:
null
;
}
private
boolean
accepts
(
ImmutableBTreePartition
result
)
{
// The partition needs to have a row for every expression, and the expression needs to be valid.
for
(
Expression
expr
:
expressions
)
{
assert
expr
instanceof
ThriftExpression
;
Row
row
=
result
.
getRow
(
makeCompactClustering
(
iter
.
metadata
()
,
expr
.
column
().
name
.
bytes
));
if
(
row
==
null
||
!
expr
.
isSatisfiedBy
(
iter
.
metadata
(),
iter
.
partitionKey
(),
row
))
return
null
;
Row
row
=
result
.
getRow
(
makeCompactClustering
(
metadata
,
expr
.
column
().
name
.
bytes
));
if
(
row
==
null
||
!
expr
.
isSatisfiedBy
(
metadata
,
result
.
partitionKey
(),
row
))
return
false
;
}
// If we get there, it means all expressions where satisfied, so return the original result
return
result
.
unfilteredIterator
()
;
return
true
;
}
}
return
Transformation
.
apply
(
iter
,
new
IsSatisfiedThriftFilter
());
};
}
protected
RowFilter
withNewExpressions
(
List
<
Expression
>
expressions
)
...
...
src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
View file @
7014cd5a
...
...
@@ -48,7 +48,7 @@ public abstract class UnfilteredPartitionIterators
public
interface
MergeListener
{
public
UnfilteredRowIterators
.
MergeListener
getRowMergeListener
(
DecoratedKey
partitionKey
,
List
<
UnfilteredRowIterator
>
versions
);
public
void
close
()
;
public
default
void
close
()
{}
}
@SuppressWarnings
(
"resource"
)
// The created resources are returned right away
...
...
@@ -103,7 +103,6 @@ public abstract class UnfilteredPartitionIterators
public
static
UnfilteredPartitionIterator
merge
(
final
List
<?
extends
UnfilteredPartitionIterator
>
iterators
,
final
int
nowInSec
,
final
MergeListener
listener
)
{
assert
listener
!=
null
;
assert
!
iterators
.
isEmpty
();
final
boolean
isForThrift
=
iterators
.
get
(
0
).
isForThrift
();
...
...
@@ -128,7 +127,9 @@ public abstract class UnfilteredPartitionIterators
protected
UnfilteredRowIterator
getReduced
()
{
UnfilteredRowIterators
.
MergeListener
rowListener
=
listener
.
getRowMergeListener
(
partitionKey
,
toMerge
);
UnfilteredRowIterators
.
MergeListener
rowListener
=
listener
==
null
?
null
:
listener
.
getRowMergeListener
(
partitionKey
,
toMerge
);
// Replace nulls by empty iterators
for
(
int
i
=
0
;
i
<
toMerge
.
size
();
i
++)
...
...
@@ -172,7 +173,9 @@ public abstract class UnfilteredPartitionIterators
public
void
close
()
{
merged
.
close
();
listener
.
close
();
if
(
listener
!=
null
)
listener
.
close
();
}
};
}
...
...
src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
View file @
7014cd5a
...
...
@@ -72,12 +72,17 @@ public abstract class UnfilteredRowIterators
* particular, this may be called in cases where there is no row in the merged output (if a source has a row
* that is shadowed by another source range tombstone or partition level deletion).
*
* @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a
* placeholder for when at least one source has a row, but that row is shadowed in the merged output.
* @param merged the result of the merge. This cannot be {@code null} (so that listener can always access the
* clustering from this safely)but can be empty, in which case this is a placeholder for when at least one
* source has a row, but that row is shadowed in the merged output.
* @param versions for each source, the row in that source corresponding to {@code merged}. This can be
* {@code null} for some sources if the source has not such row.
* @return the row to use as result of the merge (can be {@code null}). Most implementations should simply
* return {@code merged}, but this allows some implementations to impact the merge result if necessary. If this
* returns either {@code null} or an empty row, then the row is skipped from the merge result. If this returns a
* non {@code null} result, then the returned row <b>must</b> have the same clustering than {@code merged}.
*/
public
void
onMergedRows
(
Row
merged
,
Row
[]
versions
);
public
Row
onMergedRows
(
Row
merged
,
Row
[]
versions
);
/**
* Called once for every range tombstone marker participating in the merge.
...
...
@@ -491,9 +496,12 @@ public abstract class UnfilteredRowIterators
Row
merged
=
merger
.
merge
(
partitionDeletion
);
if
(
merged
==
null
)
merged
=
Rows
.
EMPTY_STATIC_ROW
;
if
(
listener
!=
null
)
listener
.
onMergedRows
(
merged
,
merger
.
mergedRows
());
return
merged
;
if
(
listener
==
null
)
return
merged
;
merged
=
listener
.
onMergedRows
(
merged
,
merger
.
mergedRows
());
// Note that onMergedRows can have returned null even though his input wasn't null
return
merged
==
null
?
Rows
.
EMPTY_STATIC_ROW
:
merged
;
}
private
static
PartitionColumns
collectColumns
(
List
<
UnfilteredRowIterator
>
iterators
)
...
...
@@ -577,9 +585,15 @@ public abstract class UnfilteredRowIterators
if
(
nextKind
==
Unfiltered
.
Kind
.
ROW
)
{
Row
merged
=
rowMerger
.
merge
(
markerMerger
.
activeDeletion
());
if
(
listener
!=
null
)
listener
.
onMergedRows
(
merged
==
null
?
BTreeRow
.
emptyRow
(
rowMerger
.
mergedClustering
())
:
merged
,
rowMerger
.
mergedRows
());
return
merged
;
if
(
listener
==
null
)
return
merged
;
merged
=
listener
.
onMergedRows
(
merged
==
null
?
BTreeRow
.
emptyRow
(
rowMerger
.
mergedClustering
())
:
merged
,
rowMerger
.
mergedRows
());
return
merged
==
null
||
merged
.
isEmpty
()
?
null
:
merged
;
}
else
{
...
...
src/java/org/apache/cassandra/metrics/TableMetrics.java
View file @
7014cd5a
...
...
@@ -203,6 +203,7 @@ public class TableMetrics
public
final
Meter
readRepairRequests
;
public
final
Meter
shortReadProtectionRequests
;
public
final
Meter
replicaSideFilteringProtectionRequests
;
public
final
Map
<
Sampler
,
TopKSampler
<
ByteBuffer
>>
samplers
;
/**
...
...
@@ -698,8 +699,9 @@ public class TableMetrics
casPropose
=
new
LatencyMetrics
(
factory
,
"CasPropose"
,
cfs
.
keyspace
.
metric
.
casPropose
);
casCommit
=
new
LatencyMetrics
(
factory
,
"CasCommit"
,
cfs
.
keyspace
.
metric
.
casCommit
);
readRepairRequests
=
Metrics
.
meter
(
factory
.
createMetricName
(
"ReadRepairRequests"
));
shortReadProtectionRequests
=
Metrics
.
meter
(
factory
.
createMetricName
(
"ShortReadProtectionRequests"
));
readRepairRequests
=
createTableMeter
(
"ReadRepairRequests"
);
shortReadProtectionRequests
=
createTableMeter
(
"ShortReadProtectionRequests"
);
replicaSideFilteringProtectionRequests
=
createTableMeter
(
"ReplicaSideFilteringProtectionRequests"
);
}
public
void
updateSSTableIterated
(
int
count
)
...
...
@@ -807,6 +809,18 @@ public class TableMetrics
return
cfCounter
;
}
private
Meter
createTableMeter
(
final
String
name
)
{
return
createTableMeter
(
name
,
name
);
}
private
Meter
createTableMeter
(
final
String
name
,
final
String
alias
)
{
Meter
tableMeter
=
Metrics
.
meter
(
factory
.
createMetricName
(
name
),
aliasFactory
.
createMetricName
(
alias
));
register
(
name
,
alias
,
tableMeter
);
return
tableMeter
;
}
/**
* Computes the compression ratio for the specified SSTables
*
...
...
src/java/org/apache/cassandra/service/DataResolver.java
View file @
7014cd5a
...
...
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
import
java.net.InetAddress
;
import
java.util.*
;
import
java.util.concurrent.TimeoutException
;
import
java.util.function.UnaryOperator
;
import
com.google.common.annotations.VisibleForTesting
;
import
com.google.common.base.Joiner
;
...
...
@@ -28,6 +29,7 @@ import com.google.common.collect.Iterables;
import
org.apache.cassandra.concurrent.Stage
;
import
org.apache.cassandra.concurrent.StageManager
;
import
org.apache.cassandra.config.*
;
import
org.apache.cassandra.cql3.statements.IndexTarget
;
import
org.apache.cassandra.db.*
;
import
org.apache.cassandra.db.filter.*
;
import
org.apache.cassandra.db.filter.DataLimits.Counter
;
...
...
@@ -38,7 +40,9 @@ import org.apache.cassandra.dht.AbstractBounds;
import
org.apache.cassandra.dht.ExcludingBounds
;
import
org.apache.cassandra.dht.Range
;
import
org.apache.cassandra.exceptions.ReadTimeoutException
;
import
org.apache.cassandra.index.sasi.SASIIndex
;
import
org.apache.cassandra.net.*
;
import
org.apache.cassandra.schema.IndexMetadata
;
import
org.apache.cassandra.tracing.Tracing
;
import
org.apache.cassandra.utils.FBUtilities
;
...
...
@@ -81,17 +85,136 @@ public class DataResolver extends ResponseResolver
public
PartitionIterator
resolve
()
{
if
(!
needsReplicaFilteringProtection
())
{
ResolveContext
context
=
new
ResolveContext
(
responses
.
size
());
return
resolveWithReadRepair
(
context
,
i
->
shortReadProtectedResponse
(
i
,
context
),
UnaryOperator
.
identity
());
}
return
resolveWithReplicaFilteringProtection
();
}
private
boolean
needsReplicaFilteringProtection
()
{
if
(
command
.
rowFilter
().
isEmpty
())
return
false
;
IndexMetadata
indexDef
=
command
.
indexMetadata
();
if
(
indexDef
!=
null
&&
indexDef
.
isCustom
())
{
String
className
=
indexDef
.
options
.
get
(
IndexTarget
.
CUSTOM_INDEX_OPTION_NAME
);
return
!
SASIIndex
.
class
.
getName
().
equals
(
className
);
}
return
true
;
}
private
class
ResolveContext
{
private
final
InetAddress
[]
sources
;
private
final
DataLimits
.
Counter
mergedResultCounter
;
private
ResolveContext
(
int
count
)
{
assert
count
<=
responses
.
size
();
this
.
sources
=
new
InetAddress
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
sources
[
i
]
=
responses
.
get
(
i
).
from
;
this
.
mergedResultCounter
=
command
.
limits
().
newCounter
(
command
.
nowInSec
(),
true
,
command
.
selectsFullPartition
(),
enforceStrictLiveness
);
}
private
boolean
needShortReadProtection
()
{
// If we have only one result, there is no read repair to do and we can't get short reads
// Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
// partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
// don't bother protecting against short reads.
return
sources
.
length
>
1
&&
!
command
.
limits
().
isUnlimited
();
}
}
@FunctionalInterface
private
interface
ResponseProvider
{
UnfilteredPartitionIterator
getResponse
(
int
i
);
}
private
UnfilteredPartitionIterator
shortReadProtectedResponse
(
int
i
,
ResolveContext
context
)
{
UnfilteredPartitionIterator
originalResponse
=
responses
.
get
(
i
).
payload
.
makeIterator
(
command
);
return
context
.
needShortReadProtection
()
?
extendWithShortReadProtection
(
originalResponse
,
context
.
sources
[
i
],
context
.
mergedResultCounter
)
:
originalResponse
;
}
private
PartitionIterator
resolveWithReadRepair
(
ResolveContext
context
,
ResponseProvider
responseProvider
,
UnaryOperator
<
PartitionIterator
>
preCountFilter
)
{
return
resolveInternal
(
context
,
new
RepairMergeListener
(
context
.
sources
),
responseProvider
,
preCountFilter
);
}
private
PartitionIterator
resolveWithReplicaFilteringProtection
()
{
// Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that
// wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version
// of that row) works in 3 steps:
// 1) we read the full response just to collect rows that may be outdated (the ones we got from some
// replica but didn't got any response for other; it could be those other replica have filtered a more
// up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the
// query limit. This simulate the worst case scenario where all those "potentially outdated" rows are
// indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read
// protection).
// 2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated
// or not.
// 3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair,
// but where for each replica we use their original response _plus_ the additional rows queried in the
// previous step (and apply the command#rowFilter() on the full result). Since the first phase has
// pessimistically collected enough results for the case where all potentially outdated results are indeed
// outdated, we shouldn't need further short-read protection requests during this phase.
// We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
// at the beginning of this method), so grab the response count once and use that through the method.
int
count
=
responses
.
size
();
List
<
UnfilteredPartitionIterator
>
iters
=
new
ArrayList
<>(
count
);
InetAddress
[]
sources
=
new
InetAddress
[
count
];
// We need separate contexts, as each context has his own counter
ResolveContext
firstPhaseContext
=
new
ResolveContext
(
count
);
ResolveContext
secondPhaseContext
=
new
ResolveContext
(
count
);
ReplicaFilteringProtection
rfp
=
new
ReplicaFilteringProtection
(
keyspace
,
command
,
consistency
,
queryStartNanoTime
,
firstPhaseContext
.
sources
);
PartitionIterator
firstPhasePartitions
=
resolveInternal
(
firstPhaseContext
,
rfp
.
mergeController
(),
i
->
shortReadProtectedResponse
(
i
,
firstPhaseContext
),
UnaryOperator
.
identity
());
// Consume the first phase partitions to populate the replica filtering protection with both those materialized
// partitions and the primary keys to be fetched.
PartitionIterators
.
consume
(
firstPhasePartitions
);
firstPhasePartitions
.
close
();
// After reading the entire query results the protection helper should have cached all the partitions so we can
// clear the responses accumulator for the sake of memory usage, given that the second phase might take long if
// it needs to query replicas.
responses
.
clearUnsafe
();
return
resolveWithReadRepair
(
secondPhaseContext
,
rfp:
:
queryProtectedPartitions
,
results
->
command
.
rowFilter
().
filter
(
results
,
command
.
metadata
(),
command
.
nowInSec
()));
}
private
PartitionIterator
resolveInternal
(
ResolveContext
context
,
UnfilteredPartitionIterators
.
MergeListener
mergeListener
,
ResponseProvider
responseProvider
,
UnaryOperator
<
PartitionIterator
>
preCountFilter
)
{
int
count
=
context
.
sources
.
length
;
List
<
UnfilteredPartitionIterator
>
results
=
new
ArrayList
<>(
count
);
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
MessageIn
<
ReadResponse
>
msg
=
responses
.
get
(
i
);
iters
.
add
(
msg
.
payload
.
makeIterator
(
command
));
sources
[
i
]
=
msg
.
from
;
}
results
.
add
(
responseProvider
.
getResponse
(
i
));
/*
* Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
...
...
@@ -107,36 +230,14 @@ public class DataResolver extends ResponseResolver
* See CASSANDRA-13747 for more details.
*/
DataLimits
.
Counter
mergedResultCounter
=
command
.
limits
().
newCounter
(
command
.
nowInSec
(),
true
,
command
.
selectsFullPartition
(),
enforceStrictLiveness
);
UnfilteredPartitionIterator
merged
=
mergeWithShortReadProtection
(
iters
,
sources
,
mergedResultCounter
);
UnfilteredPartitionIterator
merged
=
UnfilteredPartitionIterators
.
merge
(
results
,
command
.
nowInSec
(),
mergeListener
);
FilteredPartitions
filtered
=
FilteredPartitions
.
filter
(
merged
,
new
Filter
(
command
.
nowInSec
(),
command
.
metadata
().
enforceStrictLiveness
()));
PartitionIterator
counted
=
Transformation
.
apply
(
filtered
,
mergedResultCounter
);
FilteredPartitions
.
filter
(
merged
,
new
Filter
(
command
.
nowInSec
(),
command
.
metadata
().
enforceStrictLiveness
()));
PartitionIterator
counted
=
Transformation
.
apply
(
preCountFilter
.
apply
(
filtered
)
,
context
.
mergedResultCounter
);
return
command
.
isForThrift
()
?
counted
:
Transformation
.
apply
(
counted
,
new
EmptyPartitionsDiscarder
());
}
private
UnfilteredPartitionIterator
mergeWithShortReadProtection
(
List
<
UnfilteredPartitionIterator
>
results
,
InetAddress
[]
sources
,
DataLimits
.
Counter
mergedResultCounter
)
{
// If we have only one results, there is no read repair to do and we can't get short reads
if
(
results
.
size
()
==
1
)
return
results
.
get
(
0
);
/*
* So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
* but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
*/
if
(!
command
.
limits
().
isUnlimited
())
for
(
int
i
=
0
;
i
<
results
.
size
();
i
++)
results
.
set
(
i
,
extendWithShortReadProtection
(
results
.
get
(
i
),
sources
[
i
],
mergedResultCounter
));
return
UnfilteredPartitionIterators
.
merge
(
results
,
command
.
nowInSec
(),
new
RepairMergeListener
(
sources
));
?
counted