Commit 595dc612 authored by Bereng's avatar Bereng Committed by Andrés de la Peña

Improve handling of 2i initialization failures

patch by Berenguer Blasi; reviewed by Andres de la Peña for CASSANDRA-13606
parent bcc1174e
4.0-alpha5 4.0-alpha5
* Improve handling of 2i initialization failures (CASSANDRA-13606)
* Add completion_ratio column to sstable_tasks virtual table (CASANDRA-15759) * Add completion_ratio column to sstable_tasks virtual table (CASANDRA-15759)
* Add support for adding custom Verbs (CASSANDRA-15725) * Add support for adding custom Verbs (CASSANDRA-15725)
* Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783) * Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783)
......
...@@ -44,7 +44,6 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator; ...@@ -44,7 +44,6 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.utils.concurrent.OpOrder;
/** /**
* Consisting of a top level Index interface and two sub-interfaces which handle read and write operations, * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations,
...@@ -136,6 +135,23 @@ import org.apache.cassandra.utils.concurrent.OpOrder; ...@@ -136,6 +135,23 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
*/ */
public interface Index public interface Index
{ {
/**
* Supported loads. An index could be badly initialized and support only reads i.e.
*/
public enum LoadType
{
READ, WRITE, ALL, NOOP;
public boolean supportsWrites()
{
return this == ALL || this == WRITE;
}
public boolean supportsReads()
{
return this == ALL || this == READ;
}
}
/* /*
* Helpers for building indexes from SSTable data * Helpers for building indexes from SSTable data
...@@ -180,7 +196,7 @@ public interface Index ...@@ -180,7 +196,7 @@ public interface Index
* single pass through the data. The singleton instance returned from the default method implementation builds * single pass through the data. The singleton instance returned from the default method implementation builds
* indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data. * indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data.
* *
* @return an instance of the index build taski helper. Index implementations which return <b>the same instance</b> * @return an instance of the index build task helper. Index implementations which return <b>the same instance</b>
* will be built using a single task. * will be built using a single task.
*/ */
default IndexBuildingSupport getBuildTaskSupport() default IndexBuildingSupport getBuildTaskSupport()
...@@ -188,6 +204,25 @@ public interface Index ...@@ -188,6 +204,25 @@ public interface Index
return INDEX_BUILDER_SUPPORT; return INDEX_BUILDER_SUPPORT;
} }
/**
* Same as {@code getBuildTaskSupport} but can be overloaded with a specific 'recover' logic different than the index building one
*/
default IndexBuildingSupport getRecoveryTaskSupport()
{
return getBuildTaskSupport();
}
/**
* Returns the type of operations supported by the index in case its building has failed and it's needing recovery.
*
* @param isInitialBuild {@code true} if the failure is for the initial build task on index creation, {@code false}
* if the failure is for a full rebuild or recovery.
*/
default LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
{
return isInitialBuild ? LoadType.WRITE : LoadType.ALL;
}
/** /**
* Return a task to perform any initialization work when a new index instance is created. * Return a task to perform any initialization work when a new index instance is created.
* This may involve costly operations such as (re)building the index, and is performed asynchronously * This may involve costly operations such as (re)building the index, and is performed asynchronously
......
...@@ -59,6 +59,7 @@ import org.apache.cassandra.db.lifecycle.View; ...@@ -59,6 +59,7 @@ import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.Index.IndexBuildingSupport;
import org.apache.cassandra.index.internal.CassandraIndex; import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.transactions.*; import org.apache.cassandra.index.transactions.*;
import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReader;
...@@ -145,6 +146,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -145,6 +146,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
*/ */
private final Set<String> queryableIndexes = Sets.newConcurrentHashSet(); private final Set<String> queryableIndexes = Sets.newConcurrentHashSet();
/**
* The indexes that are available for writing.
*/
private final Map<String, Index> writableIndexes = Maps.newConcurrentMap();
/** /**
* The count of pending index builds for each index. * The count of pending index builds for each index.
*/ */
...@@ -207,6 +213,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -207,6 +213,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
{ {
final Index index = createInstance(indexDef); final Index index = createInstance(indexDef);
index.register(this); index.register(this);
if (writableIndexes.put(index.getIndexMetadata().name, index) == null)
logger.info("Index [{}] registered and writable.", index.getIndexMetadata().name);
markIndexesBuilding(ImmutableSet.of(index), true, isNewCF); markIndexesBuilding(ImmutableSet.of(index), true, isNewCF);
...@@ -220,7 +228,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -220,7 +228,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
} }
catch (Throwable t) catch (Throwable t)
{ {
logAndMarkIndexesFailed(Collections.singleton(index), t); logAndMarkIndexesFailed(Collections.singleton(index), t, true);
throw t; throw t;
} }
} }
...@@ -239,7 +247,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -239,7 +247,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
@Override @Override
public void onFailure(Throwable t) public void onFailure(Throwable t)
{ {
logAndMarkIndexesFailed(Collections.singleton(index), t); logAndMarkIndexesFailed(Collections.singleton(index), t, true);
initialization.setException(t); initialization.setException(t);
} }
...@@ -273,13 +281,24 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -273,13 +281,24 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
* Checks if the specified index is queryable. * Checks if the specified index is queryable.
* *
* @param index the index * @param index the index
* @return <code>true</code> if the specified index is registered, <code>false</code> otherwise * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
*/ */
public boolean isIndexQueryable(Index index) public boolean isIndexQueryable(Index index)
{ {
return queryableIndexes.contains(index.getIndexMetadata().name); return queryableIndexes.contains(index.getIndexMetadata().name);
} }
/**
* Checks if the specified index is writable.
*
* @param index the index
* @return <code>true</code> if the specified index is writable, <code>false</code> otherwise
*/
public boolean isIndexWritable(Index index)
{
return writableIndexes.containsKey(index.getIndexMetadata().name);
}
/** /**
* Checks if the specified index has any running build task. * Checks if the specified index has any running build task.
* *
...@@ -326,8 +345,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -326,8 +345,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
} }
/** /**
* Does a blocking full rebuild of the specifed indexes from all the sstables in the base table. * Does a blocking full rebuild/recovery of the specifed indexes from all the sstables in the base table.
* Note also that this method of (re)building indexes: * Note also that this method of (re)building/recovering indexes:
* a) takes a set of index *names* rather than Indexers * a) takes a set of index *names* rather than Indexers
* b) marks existing indexes removed prior to rebuilding * b) marks existing indexes removed prior to rebuilding
* c) fails if such marking operation conflicts with any ongoing index builds, as full rebuilds cannot be run * c) fails if such marking operation conflicts with any ongoing index builds, as full rebuilds cannot be run
...@@ -337,19 +356,40 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -337,19 +356,40 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
*/ */
public void rebuildIndexesBlocking(Set<String> indexNames) public void rebuildIndexesBlocking(Set<String> indexNames)
{ {
try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); // Get the set of indexes that require blocking build
Refs<SSTableReader> allSSTables = viewFragment.refs) Set<Index> toRebuild = indexes.values()
{ .stream()
Set<Index> toRebuild = indexes.values().stream()
.filter(index -> indexNames.contains(index.getIndexMetadata().name)) .filter(index -> indexNames.contains(index.getIndexMetadata().name))
.filter(Index::shouldBuildBlocking) .filter(Index::shouldBuildBlocking)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
if (toRebuild.isEmpty()) if (toRebuild.isEmpty())
{ {
logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames)); logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
return; return;
} }
// Optimistically mark the indexes as writable, so we don't miss incoming writes
boolean needsFlush = false;
for (Index index : toRebuild)
{
String name = index.getIndexMetadata().name;
if (writableIndexes.put(name, index) == null)
{
logger.info("Index [{}] became writable starting recovery.", name);
needsFlush = true;
}
}
// Once we are tracking new writes, flush any memtable contents to not miss them from the sstable-based rebuild
if (needsFlush)
baseCfs.forceBlockingFlush();
// Now that we are tracking new writes and we haven't left untracked contents on the memtables, we are ready to
// index the sstables
try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
Refs<SSTableReader> allSSTables = viewFragment.refs)
{
buildIndexesBlocking(allSSTables, toRebuild, true); buildIndexesBlocking(allSSTables, toRebuild, true);
} }
} }
...@@ -426,7 +466,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -426,7 +466,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
} }
/** /**
* Performs a blocking (re)indexing of the specified SSTables for the specified indexes. * Performs a blocking (re)indexing/recovery of the specified SSTables for the specified indexes.
*
* If the index doesn't support ALL {@link Index.LoadType} it performs a recovery {@link Index#getRecoveryTaskSupport()}
* instead of a build {@link Index#getBuildTaskSupport()}
* *
* @param sstables the SSTables to be (re)indexed * @param sstables the SSTables to be (re)indexed
* @param indexes the indexes to be (re)built for the specifed SSTables * @param indexes the indexes to be (re)built for the specifed SSTables
...@@ -443,15 +486,16 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -443,15 +486,16 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
markIndexesBuilding(indexes, isFullRebuild, false); markIndexesBuilding(indexes, isFullRebuild, false);
// Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed: // Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed:
final Set<Index> builtIndexes = new HashSet<>(); final Set<Index> builtIndexes = Sets.newConcurrentHashSet();
final Set<Index> unbuiltIndexes = new HashSet<>(); final Set<Index> unbuiltIndexes = Sets.newConcurrentHashSet();
// Any exception thrown during index building that could be suppressed by the finally block // Any exception thrown during index building that could be suppressed by the finally block
Exception accumulatedFail = null; Exception accumulatedFail = null;
try try
{ {
logger.info("Submitting index build of {} for data in {}", logger.info("Submitting index {} of {} for data in {}",
isFullRebuild ? "recovery" : "build",
indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")), indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(","))); sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
...@@ -459,7 +503,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -459,7 +503,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>(); Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
for (Index index : indexes) for (Index index : indexes)
{ {
Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>()); IndexBuildingSupport buildOrRecoveryTask = isFullRebuild
? index.getBuildTaskSupport()
: index.getRecoveryTaskSupport();
Set<Index> stored = byType.computeIfAbsent(buildOrRecoveryTask, i -> new HashSet<>());
stored.add(index); stored.add(index);
} }
...@@ -474,7 +521,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -474,7 +521,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
@Override @Override
public void onFailure(Throwable t) public void onFailure(Throwable t)
{ {
logAndMarkIndexesFailed(groupedIndexes, t); logAndMarkIndexesFailed(groupedIndexes, t, false);
unbuiltIndexes.addAll(groupedIndexes); unbuiltIndexes.addAll(groupedIndexes);
build.setException(t); build.setException(t);
} }
...@@ -507,7 +554,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -507,7 +554,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
Set<Index> failedIndexes = Sets.difference(indexes, Sets.union(builtIndexes, unbuiltIndexes)); Set<Index> failedIndexes = Sets.difference(indexes, Sets.union(builtIndexes, unbuiltIndexes));
if (!failedIndexes.isEmpty()) if (!failedIndexes.isEmpty())
{ {
logAndMarkIndexesFailed(failedIndexes, accumulatedFail); logAndMarkIndexesFailed(failedIndexes, accumulatedFail, false);
} }
// Flush all built indexes with an aynchronous callback to log the success or failure of the flush // Flush all built indexes with an aynchronous callback to log the success or failure of the flush
...@@ -571,8 +618,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -571,8 +618,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
* the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same * the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same
* lock, but this is fine as the work done while holding such lock is trivial. * lock, but this is fine as the work done while holding such lock is trivial.
* <p> * <p>
* {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index)} should be always called after the * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index, boolean)} should be always called after
* rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt. * the rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
* *
* @param indexes the index to be marked as building * @param indexes the index to be marked as building
* @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
...@@ -620,7 +667,13 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -620,7 +667,13 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
{ {
String indexName = index.getIndexMetadata().name; String indexName = index.getIndexMetadata().name;
if (isFullRebuild) if (isFullRebuild)
queryableIndexes.add(indexName); {
if (queryableIndexes.add(indexName))
logger.info("Index [{}] became queryable after successful build.", indexName);
if (writableIndexes.put(indexName, index) == null)
logger.info("Index [{}] became writable after successful build.", indexName);
}
AtomicInteger counter = inProgressBuilds.get(indexName); AtomicInteger counter = inProgressBuilds.get(indexName);
if (counter != null) if (counter != null)
...@@ -640,10 +693,12 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -640,10 +693,12 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
* {@link #markIndexesBuilding(Set, boolean, boolean)} should always be invoked before this method. * {@link #markIndexesBuilding(Set, boolean, boolean)} should always be invoked before this method.
* *
* @param index the index to be marked as built * @param index the index to be marked as built
* @param isInitialBuild {@code true} if the index failed during its initial build, {@code false} otherwise
*/ */
private synchronized void markIndexFailed(Index index) private synchronized void markIndexFailed(Index index, boolean isInitialBuild)
{ {
String indexName = index.getIndexMetadata().name; String indexName = index.getIndexMetadata().name;
AtomicInteger counter = inProgressBuilds.get(indexName); AtomicInteger counter = inProgressBuilds.get(indexName);
if (counter != null) if (counter != null)
{ {
...@@ -655,17 +710,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -655,17 +710,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName); SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
needsFullRebuild.add(indexName); needsFullRebuild.add(indexName);
if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsWrites() && writableIndexes.remove(indexName) != null)
logger.info("Index [{}] became not-writable because of failed build.", indexName);
if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsReads() && queryableIndexes.remove(indexName))
logger.info("Index [{}] became not-queryable because of failed build.", indexName);
} }
} }
private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure) private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure, boolean isInitialBuild)
{ {
JVMStabilityInspector.inspectThrowable(indexBuildFailure); JVMStabilityInspector.inspectThrowable(indexBuildFailure);
if (indexBuildFailure != null) if (indexBuildFailure != null)
logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes), indexBuildFailure); logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes), indexBuildFailure);
else else
logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes)); logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes));
indexes.forEach(SecondaryIndexManager.this::markIndexFailed); indexes.forEach(i -> this.markIndexFailed(i, isInitialBuild));
} }
/** /**
...@@ -677,6 +738,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -677,6 +738,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
{ {
SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName); SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
queryableIndexes.remove(indexName); queryableIndexes.remove(indexName);
writableIndexes.remove(indexName);
needsFullRebuild.remove(indexName); needsFullRebuild.remove(indexName);
inProgressBuilds.remove(indexName); inProgressBuilds.remove(indexName);
} }
...@@ -1081,9 +1143,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -1081,9 +1143,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
index.validate(update); index.validate(update);
} }
/** /*
* IndexRegistry methods * IndexRegistry methods
*/ */
public void registerIndex(Index index) public void registerIndex(Index index)
{ {
String name = index.getIndexMetadata().name; String name = index.getIndexMetadata().name;
...@@ -1113,7 +1176,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -1113,7 +1176,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
return ImmutableSet.copyOf(indexes.values()); return ImmutableSet.copyOf(indexes.values());
} }
/** /*
* Handling of index updates. * Handling of index updates.
* Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
* during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances. * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
...@@ -1127,16 +1190,18 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -1127,16 +1190,18 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
if (!hasIndexes()) if (!hasIndexes())
return UpdateTransaction.NO_OP; return UpdateTransaction.NO_OP;
Index.Indexer[] indexers = indexes.values().stream() ArrayList<Index.Indexer> idxrs = new ArrayList<>();
.map(i -> i.indexerFor(update.partitionKey(), for (Index i : writableIndexes.values())
update.columns(), {
nowInSec, Index.Indexer idxr = i.indexerFor(update.partitionKey(), update.columns(), nowInSec, ctx, IndexTransaction.Type.UPDATE);
ctx, if (idxr != null)
IndexTransaction.Type.UPDATE)) idxrs.add(idxr);
.filter(Objects::nonNull) }
.toArray(Index.Indexer[]::new);
return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers); if (idxrs.size() == 0)
return UpdateTransaction.NO_OP;
else
return new WriteTimeTransaction(idxrs.toArray(new Index.Indexer[idxrs.size()]));
} }
/** /**
...@@ -1148,7 +1213,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -1148,7 +1213,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
int nowInSec) int nowInSec)
{ {
// the check for whether there are any registered indexes is already done in CompactionIterator // the check for whether there are any registered indexes is already done in CompactionIterator
return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, listIndexes()); return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, writableIndexes.values());
} }
/** /**
...@@ -1161,7 +1226,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -1161,7 +1226,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
if (!hasIndexes()) if (!hasIndexes())
return CleanupTransaction.NO_OP; return CleanupTransaction.NO_OP;
return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, listIndexes()); return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, writableIndexes.values());
} }
/** /**
...@@ -1283,7 +1348,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -1283,7 +1348,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
private IndexGCTransaction(DecoratedKey key, private IndexGCTransaction(DecoratedKey key,
RegularAndStaticColumns columns, RegularAndStaticColumns columns,
Keyspace keyspace, int versions, Keyspace keyspace,
int versions,
int nowInSec, int nowInSec,
Collection<Index> indexes) Collection<Index> indexes)
{ {
...@@ -1388,7 +1454,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum ...@@ -1388,7 +1454,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
private CleanupGCTransaction(DecoratedKey key, private CleanupGCTransaction(DecoratedKey key,
RegularAndStaticColumns columns, RegularAndStaticColumns columns,
Keyspace keyspace, int nowInSec, Keyspace keyspace,
int nowInSec,
Collection<Index> indexes) Collection<Index> indexes)
{ {
this.key = key; this.key = key;
......
...@@ -61,7 +61,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; ...@@ -61,7 +61,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.utils.concurrent.Refs;