Commit 4d1bdb12 authored by Sam Tunnicliffe's avatar Sam Tunnicliffe

Improve handling of static rows in repaired data tracking

Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-15848
parent 04533e6c
4.0-alpha5
* Fix handling of fully purged static rows in repaired data tracking (CASSANDRA-15848)
* Prevent validation request submission from blocking ANTI_ENTROPY stage (CASSANDRA-15812)
* Add fqltool and auditlogviewer to rpm and deb packages (CASSANDRA-14712)
* Include DROPPED_COLUMNS in schema digest computation (CASSANDRA-15843)
......
......@@ -173,17 +173,7 @@ class RepairedDataInfo
protected Row applyToStatic(Row row)
{
if (repairedCounter.isDone())
return row;
assert purger != null;
Row purged = purger.applyToRow(row);
if (!purged.isEmpty())
{
isFullyPurged = false;
purged.digest(getPerPartitionDigest());
}
return row;
return applyToRow(row);
}
protected Row applyToRow(Row row)
......@@ -193,7 +183,7 @@ class RepairedDataInfo
assert purger != null;
Row purged = purger.applyToRow(row);
if (purged != null)
if (purged != null && !purged.isEmpty())
{
isFullyPurged = false;
purged.digest(getPerPartitionDigest());
......
......@@ -164,6 +164,7 @@ public class ReadCommandTest
TableMetadata.Builder metadata6 =
TableMetadata.builder(KEYSPACE, CF6)
.addPartitionKeyColumn("key", BytesType.instance)
.addStaticColumn("s", AsciiType.instance)
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("a", AsciiType.instance)
.addRegularColumn("b", AsciiType.instance)
......@@ -979,7 +980,8 @@ public class ReadCommandTest
cfs.disableAutoCompaction();
setGCGrace(cfs, 600);
// Partition with a single, fully deleted row
// Partition with a fully deleted static row and a single, fully deleted regular row
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key")).apply();
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key"), "cc").apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
......@@ -1029,7 +1031,8 @@ public class ReadCommandTest
new RowUpdateBuilder(cfs.metadata.get(), 0, ByteBufferUtil.bytes("key-0")).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
// Fully deleted partition in an unrepaired sstable, so not included in the intial digest
// Fully deleted partition (static and regular rows) in an unrepaired sstable, so not included in the intial digest
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1")).apply();
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1"), "cc").apply();
cfs.forceBlockingFlush();
......@@ -1064,8 +1067,9 @@ public class ReadCommandTest
cfs.disableAutoCompaction();
setGCGrace(cfs, 0);
// Partition with a single, fully deleted row which will be fully purged
// Partition with a fully deleted static row and a single, fully deleted row which will be fully purged
DecoratedKey key = Util.dk("key");
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, key).apply();
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, key, "cc").apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
......
......@@ -48,7 +48,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
import static org.apache.cassandra.Util.clustering;
import static org.apache.cassandra.Util.dk;
......@@ -170,6 +169,32 @@ public class RepairedDataInfoTest
assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
}
@Test
public void digestOfFullyPurgedPartition()
{
int deletionTime = nowInSec - cfs.metadata().params.gcGraceSeconds - 1;
DeletionTime deletion = new DeletionTime(((long)deletionTime * 1000), deletionTime);
Row staticRow = staticRow(nowInSec, deletion);
Row row = row(1, nowInSec, deletion);
UnfilteredRowIterator partition = partitionWithStaticRow(bytes(0), staticRow, row);
// The partition is fully purged, so nothing should be added to the digest
byte[] fromRepairedInfo = consume(partition);
assertEquals(0, fromRepairedInfo.length);
}
@Test
public void digestOfEmptyPartition()
{
// Static row is read greedily during transformation and if the underlying
// SSTableIterator doesn't contain the partition, an empty but non-null
// static row is read and digested.
UnfilteredRowIterator partition = partition(bytes(0));
// The partition is completely empty, so nothing should be added to the digest
byte[] fromRepairedInfo = consume(partition);
assertEquals(0, fromRepairedInfo.length);
}
private RepairedDataInfo info()
{
return new RepairedDataInfo(DataLimits.NONE.newCounter(nowInSec, false, false, false));
......@@ -182,7 +207,7 @@ public class RepairedDataInfoTest
Unfiltered...unfiltereds)
{
Digest perPartitionDigest = Digest.forRepairedDataTracking();
if (!staticRow.isEmpty())
if (staticRow != null && !staticRow.isEmpty())
staticRow.digest(perPartitionDigest);
perPartitionDigest.update(partitionKey);
deletion.digest(perPartitionDigest);
......@@ -232,6 +257,14 @@ public class RepairedDataInfoTest
return builder.build();
}
private Row staticRow(int nowInSec, DeletionTime deletion)
{
Row.Builder builder = BTreeRow.unsortedBuilder();
builder.newRow(Clustering.STATIC_CLUSTERING);
builder.addRowDeletion(new Row.Deletion(deletion, false));
return builder.build();
}
private Row row(int clustering, int value, int nowInSec)
{
Row.Builder builder = BTreeRow.unsortedBuilder();
......@@ -240,6 +273,14 @@ public class RepairedDataInfoTest
return builder.build();
}
private Row row(int clustering, int nowInSec, DeletionTime deletion)
{
Row.Builder builder = BTreeRow.unsortedBuilder();
builder.newRow(clustering(metadata.comparator, Integer.toString(clustering)));
builder.addRowDeletion(new Row.Deletion(deletion, false));
return builder.build();
}
private Row[] rows(int clusteringStart, int clusteringEnd, int nowInSec)
{
return IntStream.range(clusteringStart, clusteringEnd)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment