diff --git a/CHANGES.txt b/CHANGES.txt index 9b62b0667d3ae47803a387e5ae65559627086a43..7fbd0b4993bb1771c09d9f4f3b93df26a2c8d441 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha5 + * Improve handling of 2i initialization failures (CASSANDRA-13606) * Add completion_ratio column to sstable_tasks virtual table (CASANDRA-15759) * Add support for adding custom Verbs (CASSANDRA-15725) * Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783) diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index b9f38ce52b6af4a955312b28233842d0dcc960ca..6d716be470d678d3b80b625a1478808e5913fb48 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -44,7 +44,6 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; -import org.apache.cassandra.utils.concurrent.OpOrder; /** * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations, @@ -136,6 +135,23 @@ import org.apache.cassandra.utils.concurrent.OpOrder; */ public interface Index { + /** + * Supported loads. An index could be badly initialized and support only reads i.e. + */ + public enum LoadType + { + READ, WRITE, ALL, NOOP; + + public boolean supportsWrites() + { + return this == ALL || this == WRITE; + } + + public boolean supportsReads() + { + return this == ALL || this == READ; + } + } /* * Helpers for building indexes from SSTable data @@ -180,13 +196,32 @@ public interface Index * single pass through the data. The singleton instance returned from the default method implementation builds * indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data. * - * @return an instance of the index build taski helper. Index implementations which return the same instance + * @return an instance of the index build task helper. Index implementations which return the same instance * will be built using a single task. */ default IndexBuildingSupport getBuildTaskSupport() { return INDEX_BUILDER_SUPPORT; } + + /** + * Same as {@code getBuildTaskSupport} but can be overloaded with a specific 'recover' logic different than the index building one + */ + default IndexBuildingSupport getRecoveryTaskSupport() + { + return getBuildTaskSupport(); + } + + /** + * Returns the type of operations supported by the index in case its building has failed and it's needing recovery. + * + * @param isInitialBuild {@code true} if the failure is for the initial build task on index creation, {@code false} + * if the failure is for a full rebuild or recovery. + */ + default LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild) + { + return isInitialBuild ? LoadType.WRITE : LoadType.ALL; + } /** * Return a task to perform any initialization work when a new index instance is created. diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 7f28c443322f613cf764e085ef173f40a9779a19..3822549c28d4d15d6e5974012bfc34b5b8688753 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -59,6 +59,7 @@ import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.Index.IndexBuildingSupport; import org.apache.cassandra.index.internal.CassandraIndex; import org.apache.cassandra.index.transactions.*; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -144,6 +145,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum * The indexes that are available for querying. */ private final Set queryableIndexes = Sets.newConcurrentHashSet(); + + /** + * The indexes that are available for writing. + */ + private final Map writableIndexes = Maps.newConcurrentMap(); /** * The count of pending index builds for each index. @@ -207,6 +213,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { final Index index = createInstance(indexDef); index.register(this); + if (writableIndexes.put(index.getIndexMetadata().name, index) == null) + logger.info("Index [{}] registered and writable.", index.getIndexMetadata().name); markIndexesBuilding(ImmutableSet.of(index), true, isNewCF); @@ -220,7 +228,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum } catch (Throwable t) { - logAndMarkIndexesFailed(Collections.singleton(index), t); + logAndMarkIndexesFailed(Collections.singleton(index), t, true); throw t; } } @@ -239,7 +247,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum @Override public void onFailure(Throwable t) { - logAndMarkIndexesFailed(Collections.singleton(index), t); + logAndMarkIndexesFailed(Collections.singleton(index), t, true); initialization.setException(t); } @@ -273,12 +281,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum * Checks if the specified index is queryable. * * @param index the index - * @return true if the specified index is registered, false otherwise + * @return true if the specified index is queryable, false otherwise */ public boolean isIndexQueryable(Index index) { return queryableIndexes.contains(index.getIndexMetadata().name); } + + /** + * Checks if the specified index is writable. + * + * @param index the index + * @return true if the specified index is writable, false otherwise + */ + public boolean isIndexWritable(Index index) + { + return writableIndexes.containsKey(index.getIndexMetadata().name); + } /** * Checks if the specified index has any running build task. @@ -326,8 +345,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum } /** - * Does a blocking full rebuild of the specifed indexes from all the sstables in the base table. - * Note also that this method of (re)building indexes: + * Does a blocking full rebuild/recovery of the specifed indexes from all the sstables in the base table. + * Note also that this method of (re)building/recovering indexes: * a) takes a set of index *names* rather than Indexers * b) marks existing indexes removed prior to rebuilding * c) fails if such marking operation conflicts with any ongoing index builds, as full rebuilds cannot be run @@ -337,19 +356,40 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum */ public void rebuildIndexesBlocking(Set indexNames) { - try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); - Refs allSSTables = viewFragment.refs) + // Get the set of indexes that require blocking build + Set toRebuild = indexes.values() + .stream() + .filter(index -> indexNames.contains(index.getIndexMetadata().name)) + .filter(Index::shouldBuildBlocking) + .collect(Collectors.toSet()); + + if (toRebuild.isEmpty()) { - Set toRebuild = indexes.values().stream() - .filter(index -> indexNames.contains(index.getIndexMetadata().name)) - .filter(Index::shouldBuildBlocking) - .collect(Collectors.toSet()); - if (toRebuild.isEmpty()) + logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames)); + return; + } + + // Optimistically mark the indexes as writable, so we don't miss incoming writes + boolean needsFlush = false; + for (Index index : toRebuild) + { + String name = index.getIndexMetadata().name; + if (writableIndexes.put(name, index) == null) { - logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames)); - return; + logger.info("Index [{}] became writable starting recovery.", name); + needsFlush = true; } + } + + // Once we are tracking new writes, flush any memtable contents to not miss them from the sstable-based rebuild + if (needsFlush) + baseCfs.forceBlockingFlush(); + // Now that we are tracking new writes and we haven't left untracked contents on the memtables, we are ready to + // index the sstables + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); + Refs allSSTables = viewFragment.refs) + { buildIndexesBlocking(allSSTables, toRebuild, true); } } @@ -426,8 +466,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum } /** - * Performs a blocking (re)indexing of the specified SSTables for the specified indexes. + * Performs a blocking (re)indexing/recovery of the specified SSTables for the specified indexes. * + * If the index doesn't support ALL {@link Index.LoadType} it performs a recovery {@link Index#getRecoveryTaskSupport()} + * instead of a build {@link Index#getBuildTaskSupport()} + * * @param sstables the SSTables to be (re)indexed * @param indexes the indexes to be (re)built for the specifed SSTables * @param isFullRebuild True if this method is invoked as a full index rebuild, false otherwise @@ -443,15 +486,16 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum markIndexesBuilding(indexes, isFullRebuild, false); // Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed: - final Set builtIndexes = new HashSet<>(); - final Set unbuiltIndexes = new HashSet<>(); + final Set builtIndexes = Sets.newConcurrentHashSet(); + final Set unbuiltIndexes = Sets.newConcurrentHashSet(); // Any exception thrown during index building that could be suppressed by the finally block Exception accumulatedFail = null; try { - logger.info("Submitting index build of {} for data in {}", + logger.info("Submitting index {} of {} for data in {}", + isFullRebuild ? "recovery" : "build", indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")), sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(","))); @@ -459,7 +503,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum Map> byType = new HashMap<>(); for (Index index : indexes) { - Set stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>()); + IndexBuildingSupport buildOrRecoveryTask = isFullRebuild + ? index.getBuildTaskSupport() + : index.getRecoveryTaskSupport(); + Set stored = byType.computeIfAbsent(buildOrRecoveryTask, i -> new HashSet<>()); stored.add(index); } @@ -474,7 +521,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum @Override public void onFailure(Throwable t) { - logAndMarkIndexesFailed(groupedIndexes, t); + logAndMarkIndexesFailed(groupedIndexes, t, false); unbuiltIndexes.addAll(groupedIndexes); build.setException(t); } @@ -507,7 +554,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum Set failedIndexes = Sets.difference(indexes, Sets.union(builtIndexes, unbuiltIndexes)); if (!failedIndexes.isEmpty()) { - logAndMarkIndexesFailed(failedIndexes, accumulatedFail); + logAndMarkIndexesFailed(failedIndexes, accumulatedFail, false); } // Flush all built indexes with an aynchronous callback to log the success or failure of the flush @@ -571,8 +618,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum * the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same * lock, but this is fine as the work done while holding such lock is trivial. *

- * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index)} should be always called after the - * rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt. + * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index, boolean)} should be always called after + * the rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt. * * @param indexes the index to be marked as building * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise @@ -620,7 +667,13 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { String indexName = index.getIndexMetadata().name; if (isFullRebuild) - queryableIndexes.add(indexName); + { + if (queryableIndexes.add(indexName)) + logger.info("Index [{}] became queryable after successful build.", indexName); + + if (writableIndexes.put(indexName, index) == null) + logger.info("Index [{}] became writable after successful build.", indexName); + } AtomicInteger counter = inProgressBuilds.get(indexName); if (counter != null) @@ -640,10 +693,12 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum * {@link #markIndexesBuilding(Set, boolean, boolean)} should always be invoked before this method. * * @param index the index to be marked as built + * @param isInitialBuild {@code true} if the index failed during its initial build, {@code false} otherwise */ - private synchronized void markIndexFailed(Index index) + private synchronized void markIndexFailed(Index index, boolean isInitialBuild) { String indexName = index.getIndexMetadata().name; + AtomicInteger counter = inProgressBuilds.get(indexName); if (counter != null) { @@ -655,17 +710,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName); needsFullRebuild.add(indexName); + + if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsWrites() && writableIndexes.remove(indexName) != null) + logger.info("Index [{}] became not-writable because of failed build.", indexName); + + if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsReads() && queryableIndexes.remove(indexName)) + logger.info("Index [{}] became not-queryable because of failed build.", indexName); } } - private void logAndMarkIndexesFailed(Set indexes, Throwable indexBuildFailure) + private void logAndMarkIndexesFailed(Set indexes, Throwable indexBuildFailure, boolean isInitialBuild) { JVMStabilityInspector.inspectThrowable(indexBuildFailure); if (indexBuildFailure != null) logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes), indexBuildFailure); else logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes)); - indexes.forEach(SecondaryIndexManager.this::markIndexFailed); + indexes.forEach(i -> this.markIndexFailed(i, isInitialBuild)); } /** @@ -677,6 +738,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName); queryableIndexes.remove(indexName); + writableIndexes.remove(indexName); needsFullRebuild.remove(indexName); inProgressBuilds.remove(indexName); } @@ -1081,9 +1143,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum index.validate(update); } - /** + /* * IndexRegistry methods */ + public void registerIndex(Index index) { String name = index.getIndexMetadata().name; @@ -1113,7 +1176,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum return ImmutableSet.copyOf(indexes.values()); } - /** + /* * Handling of index updates. * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances. @@ -1126,17 +1189,19 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { if (!hasIndexes()) return UpdateTransaction.NO_OP; - - Index.Indexer[] indexers = indexes.values().stream() - .map(i -> i.indexerFor(update.partitionKey(), - update.columns(), - nowInSec, - ctx, - IndexTransaction.Type.UPDATE)) - .filter(Objects::nonNull) - .toArray(Index.Indexer[]::new); - - return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers); + + ArrayList idxrs = new ArrayList<>(); + for (Index i : writableIndexes.values()) + { + Index.Indexer idxr = i.indexerFor(update.partitionKey(), update.columns(), nowInSec, ctx, IndexTransaction.Type.UPDATE); + if (idxr != null) + idxrs.add(idxr); + } + + if (idxrs.size() == 0) + return UpdateTransaction.NO_OP; + else + return new WriteTimeTransaction(idxrs.toArray(new Index.Indexer[idxrs.size()])); } /** @@ -1148,7 +1213,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum int nowInSec) { // the check for whether there are any registered indexes is already done in CompactionIterator - return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, listIndexes()); + return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, writableIndexes.values()); } /** @@ -1161,7 +1226,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum if (!hasIndexes()) return CleanupTransaction.NO_OP; - return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, listIndexes()); + return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, writableIndexes.values()); } /** @@ -1283,7 +1348,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum private IndexGCTransaction(DecoratedKey key, RegularAndStaticColumns columns, - Keyspace keyspace, int versions, + Keyspace keyspace, + int versions, int nowInSec, Collection indexes) { @@ -1388,7 +1454,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum private CleanupGCTransaction(DecoratedKey key, RegularAndStaticColumns columns, - Keyspace keyspace, int nowInSec, + Keyspace keyspace, + int nowInSec, Collection indexes) { this.key = key; diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 58056b9d866b0334e2afceb1ef1fd8dfba0da788..f74a656b0d07cbf58a018c25bff6e43d21da97ea 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -61,7 +61,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Refs; import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; @@ -145,7 +144,7 @@ public abstract class CassandraIndex implements Index Clustering clustering, CellPath path, ByteBuffer cellValue); - + public ColumnMetadata getIndexedColumn() { return indexedColumn; diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index 07327ea6d6ca06d85817cd1eb3884588daeb174a..592499e46e433f9fe1cedac333c365a2d88b9f42 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -19,7 +19,6 @@ package org.apache.cassandra.index.sasi; import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import com.googlecode.concurrenttrees.common.Iterables; diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java index 8f3b97d5aabb76e73a1d3af2715fd9652ed9f856..95637801e41e710064d169125923b8b4091edb73 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java @@ -22,6 +22,8 @@ import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import com.google.common.collect.ImmutableSet; + import org.apache.commons.lang3.StringUtils; import org.junit.Test; @@ -1053,6 +1055,84 @@ public class SecondaryIndexTest extends CQLTester } } + @Test // A Bad init could leave an index only accepting reads + public void testReadOnlyIndex() throws Throwable + { + // On successful initialization both reads and writes go through + String tableName = createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))"); + String indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + ReadOnlyOnFailureIndex.class.getName() + "'"); + assertTrue(waitForIndex(keyspace(), tableName, indexName)); + execute("SELECT value FROM %s WHERE value = 1"); + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1); + ReadOnlyOnFailureIndex index = (ReadOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName); + assertEquals(1, index.rowsInserted.size()); + + // Upon rebuild, both reads and writes still go through + getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName)); + assertEquals(1, index.rowsInserted.size()); + execute("SELECT value FROM %s WHERE value = 1"); + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1); + assertEquals(2, index.rowsInserted.size()); + dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName)); + + // On bad initial build writes are not forwarded to the index + ReadOnlyOnFailureIndex.failInit = true; + indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + ReadOnlyOnFailureIndex.class.getName() + "'"); + index = (ReadOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName); + assertTrue(waitForIndexBuilds(keyspace(), indexName)); + assertInvalidThrow(IndexNotAvailableException.class, "SELECT value FROM %s WHERE value = 1"); + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1); + assertEquals(0, index.rowsInserted.size()); + + // Upon recovery, we can index data again + index.reset(); + getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName)); + assertEquals(2, index.rowsInserted.size()); + execute("SELECT value FROM %s WHERE value = 1"); + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1); + assertEquals(3, index.rowsInserted.size()); + dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName)); + } + + @Test // A Bad init could leave an index only accepting writes + public void testWriteOnlyIndex() throws Throwable + { + // On successful initialization both reads and writes go through + String tableName = createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))"); + String indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + WriteOnlyOnFailureIndex.class.getName() + "'"); + assertTrue(waitForIndex(keyspace(), tableName, indexName)); + execute("SELECT value FROM %s WHERE value = 1"); + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1); + WriteOnlyOnFailureIndex index = (WriteOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName); + assertEquals(1, index.rowsInserted.size()); + + // Upon rebuild, both reads and writes still go through + getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName)); + assertEquals(1, index.rowsInserted.size()); + execute("SELECT value FROM %s WHERE value = 1"); + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1); + assertEquals(2, index.rowsInserted.size()); + dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName)); + + // On bad initial build writes are forwarded to the index + WriteOnlyOnFailureIndex.failInit = true; + indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + WriteOnlyOnFailureIndex.class.getName() + "'"); + index = (WriteOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName); + assertTrue(waitForIndexBuilds(keyspace(), indexName)); + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1); + assertEquals(1, index.rowsInserted.size()); + assertInvalidThrow(IndexNotAvailableException.class, "SELECT value FROM %s WHERE value = 1"); + + // Upon recovery, we can query data again + index.reset(); + getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName)); + assertEquals(2, index.rowsInserted.size()); + execute("SELECT value FROM %s WHERE value = 1"); + execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1); + assertEquals(3, index.rowsInserted.size()); + dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName)); + } + @Test public void droppingIndexInvalidatesPreparedStatements() throws Throwable { @@ -1375,6 +1455,7 @@ public class SecondaryIndexTest extends CQLTester execute("INSERT INTO %s (k, v) VALUES (?, ?)", 0, udt1); String indexName = createIndex("CREATE INDEX ON %s (v)"); + execute("INSERT INTO %s (k, v) VALUES (?, ?)", 1, udt2); execute("INSERT INTO %s (k, v) VALUES (?, ?)", 1, udt1); assertTrue(waitForIndex(keyspace(), tableName, indexName)); @@ -1560,4 +1641,62 @@ public class SecondaryIndexTest extends CQLTester return super.getInvalidateTask(); } } + + /** + * {@code StubIndex} that only supports some load. Could be intentional or a result of a bad init. + */ + public static class LoadTypeConstrainedIndex extends StubIndex + { + static volatile boolean failInit = false; + final LoadType supportedLoadOnFailure; + + LoadTypeConstrainedIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef, LoadType supportedLoadOnFailure) + { + super(baseCfs, indexDef); + this.supportedLoadOnFailure = supportedLoadOnFailure; + } + + @Override + public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild) + { + return supportedLoadOnFailure; + } + + @Override + public void reset() + { + super.reset(); + failInit = false; + } + + @Override + public Callable getInitializationTask() + { + if (failInit) + return () -> {throw new IllegalStateException("Index is configured to fail.");}; + + return null; + } + + public boolean shouldBuildBlocking() + { + return true; + } + } + + public static class ReadOnlyOnFailureIndex extends LoadTypeConstrainedIndex + { + public ReadOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + super(baseCfs, indexDef, LoadType.READ); + } + } + + public static class WriteOnlyOnFailureIndex extends LoadTypeConstrainedIndex + { + public WriteOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + super(baseCfs, indexDef, LoadType.WRITE); + } + } } diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java index c09b16c094f7946e8c8160acb9026bd1d050fc76..bfe1c6d908620c25abe89c3ab4130fb7138e73e5 100644 --- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java @@ -484,7 +484,7 @@ public class SecondaryIndexTest .build(); MigrationManager.announceTableUpdate(updated, true); - // fait for the index to be built + // wait for the index to be built Index index = cfs.indexManager.getIndex(indexDef); do { diff --git a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java index 2207f4809e73d2f7f71024760aee3de45676f983..d8fb99f40fe89f9a5cf8a10ed4ac0549b59535fa 100644 --- a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java +++ b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Sets; import org.junit.After; import org.junit.Test; @@ -67,14 +68,14 @@ public class SecondaryIndexManagerTest extends CQLTester } @Test - public void rebuildingIndexMarksTheIndexAsBuilt() throws Throwable + public void rebuilOrRecoveringIndexMarksTheIndexAsBuilt() throws Throwable { String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); String indexName = createIndex("CREATE INDEX ON %s(c)"); waitForIndex(KEYSPACE, tableName, indexName); assertMarkedAsBuilt(indexName); - + assertTrue(tryRebuild(indexName, false)); assertMarkedAsBuilt(indexName); } @@ -119,56 +120,70 @@ public class SecondaryIndexManagerTest extends CQLTester } @Test - public void cannotRebuildWhileInitializationIsInProgress() throws Throwable + public void cannotRebuildRecoverWhileInitializationIsInProgress() throws Throwable { // create an index which blocks on creation TestingIndex.blockCreate(); String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); - String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); - - // try to rebuild the index before the index creation task has finished - assertFalse(tryRebuild(indexName, false)); - assertNotMarkedAsBuilt(indexName); + String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); + String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); + + // try to rebuild/recover the index before the index creation task has finished + assertFalse(tryRebuild(defaultIndexName, false)); + assertFalse(tryRebuild(readOnlyIndexName, false)); + assertFalse(tryRebuild(writeOnlyIndexName, false)); + assertNotMarkedAsBuilt(defaultIndexName); + assertNotMarkedAsBuilt(readOnlyIndexName); + assertNotMarkedAsBuilt(writeOnlyIndexName); // check that the index is marked as built when the creation finishes TestingIndex.unblockCreate(); - waitForIndex(KEYSPACE, tableName, indexName); - assertMarkedAsBuilt(indexName); - - // now verify you can rebuild - assertTrue(tryRebuild(indexName, false)); - assertMarkedAsBuilt(indexName); + waitForIndex(KEYSPACE, tableName, defaultIndexName); + waitForIndex(KEYSPACE, tableName, readOnlyIndexName); + waitForIndex(KEYSPACE, tableName, writeOnlyIndexName); + assertMarkedAsBuilt(defaultIndexName); + assertMarkedAsBuilt(readOnlyIndexName); + assertMarkedAsBuilt(writeOnlyIndexName); + + // now verify you can rebuild/recover + assertTrue(tryRebuild(defaultIndexName, false)); + assertTrue(tryRebuild(readOnlyIndexName, false)); + assertTrue(tryRebuild(readOnlyIndexName, false)); + assertMarkedAsBuilt(defaultIndexName); + assertMarkedAsBuilt(readOnlyIndexName); + assertMarkedAsBuilt(writeOnlyIndexName); } @Test - public void cannotRebuildWhileAnotherRebuildIsInProgress() throws Throwable + public void cannotRebuildOrRecoverWhileAnotherRebuildIsInProgress() throws Throwable { - final String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); - final String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); + String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); + String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); final AtomicBoolean error = new AtomicBoolean(); // wait for index initialization and verify it's built: - waitForIndex(KEYSPACE, tableName, indexName); - assertMarkedAsBuilt(indexName); + waitForIndex(KEYSPACE, tableName, defaultIndexName); + waitForIndex(KEYSPACE, tableName, readOnlyIndexName); + waitForIndex(KEYSPACE, tableName, writeOnlyIndexName); + assertMarkedAsBuilt(defaultIndexName); + assertMarkedAsBuilt(readOnlyIndexName); + assertMarkedAsBuilt(writeOnlyIndexName); // rebuild the index in another thread, but make it block: TestingIndex.blockBuild(); - Thread asyncBuild = new Thread() - { - - @Override - public void run() + Thread asyncBuild = new Thread(() -> { + try { - try - { - tryRebuild(indexName, false); - } - catch (Throwable ex) - { - error.set(true); - } + tryRebuild(defaultIndexName, false); } - }; + catch (Throwable ex) + { + error.set(true); + } + }); asyncBuild.start(); // wait for the rebuild to block, so that we can proceed unblocking all further operations: @@ -178,17 +193,23 @@ public class SecondaryIndexManagerTest extends CQLTester TestingIndex.shouldBlockBuild = false; // verify rebuilding the index before the previous index build task has finished fails - assertFalse(tryRebuild(indexName, false)); - assertNotMarkedAsBuilt(indexName); + assertFalse(tryRebuild(defaultIndexName, false)); + assertNotMarkedAsBuilt(defaultIndexName); // check that the index is marked as built when the build finishes TestingIndex.unblockBuild(); asyncBuild.join(); - assertMarkedAsBuilt(indexName); + assertMarkedAsBuilt(defaultIndexName); + assertMarkedAsBuilt(readOnlyIndexName); + assertMarkedAsBuilt(writeOnlyIndexName); // now verify you can rebuild - assertTrue(tryRebuild(indexName, false)); - assertMarkedAsBuilt(indexName); + assertTrue(tryRebuild(defaultIndexName, false)); + assertTrue(tryRebuild(readOnlyIndexName, false)); + assertTrue(tryRebuild(writeOnlyIndexName, false)); + assertMarkedAsBuilt(defaultIndexName); + assertMarkedAsBuilt(readOnlyIndexName); + assertMarkedAsBuilt(writeOnlyIndexName); } @Test @@ -204,23 +225,17 @@ public class SecondaryIndexManagerTest extends CQLTester // add sstables in another thread, but make it block: TestingIndex.blockBuild(); - Thread asyncBuild = new Thread() - { - - @Override - public void run() + Thread asyncBuild = new Thread(() -> { + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + try (Refs sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL))) { - ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); - try (Refs sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL))) - { - cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker()); - } - catch (Throwable ex) - { - error.set(true); - } + cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker()); + } + catch (Throwable ex) + { + error.set(true); } - }; + }); asyncBuild.start(); // wait for the build to block, so that we can proceed unblocking all further operations: @@ -256,22 +271,16 @@ public class SecondaryIndexManagerTest extends CQLTester // rebuild the index in another thread, but make it block: TestingIndex.blockBuild(); - Thread asyncBuild = new Thread() - { - - @Override - public void run() + Thread asyncBuild = new Thread(() -> { + try { - try - { - tryRebuild(indexName, false); - } - catch (Throwable ex) - { - error.set(true); - } + tryRebuild(indexName, false); + } + catch (Throwable ex) + { + error.set(true); } - }; + }); asyncBuild.start(); // wait for the rebuild to block, so that we can proceed unblocking all further operations: @@ -310,22 +319,16 @@ public class SecondaryIndexManagerTest extends CQLTester // rebuild the index in another thread, but make it block: TestingIndex.blockBuild(); - Thread asyncBuild = new Thread() - { - - @Override - public void run() + Thread asyncBuild = new Thread(() -> { + try { - try - { - tryRebuild(indexName, false); - } - catch (Throwable ex) - { - error.set(true); - } + tryRebuild(indexName, false); } - }; + catch (Throwable ex) + { + error.set(true); + } + }); asyncBuild.start(); // wait for the rebuild to block, so that we can proceed unblocking all further operations: @@ -382,37 +385,57 @@ public class SecondaryIndexManagerTest extends CQLTester } @Test - public void initializingIndexNotQueryable() throws Throwable + public void initializingIndexNotQueryableButMaybeWritable() throws Throwable { TestingIndex.blockCreate(); String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); - String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); + String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); // the index shouldn't be queryable while the initialization hasn't finished - ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); - Index index = cfs.indexManager.getIndexByName(indexName); - assertFalse(cfs.indexManager.isIndexQueryable(index)); + assertFalse(isQueryable(defaultIndexName)); + assertFalse(isQueryable(readOnlyIndexName)); + assertFalse(isQueryable(writeOnlyIndexName)); + assertTrue(isWritable(defaultIndexName)); + assertTrue(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); // the index should be queryable once the initialization has finished TestingIndex.unblockCreate(); - waitForIndex(KEYSPACE, tableName, indexName); - assertTrue(cfs.indexManager.isIndexQueryable(index)); + waitForIndex(KEYSPACE, tableName, defaultIndexName); + waitForIndex(KEYSPACE, tableName, readOnlyIndexName); + waitForIndex(KEYSPACE, tableName, writeOnlyIndexName); + assertTrue(isQueryable(defaultIndexName)); + assertTrue(isQueryable(readOnlyIndexName)); + assertTrue(isQueryable(writeOnlyIndexName)); + assertTrue(isWritable(defaultIndexName)); + assertTrue(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); } @Test - public void initializingIndexNotQueryableAfterPartialRebuild() throws Throwable + public void initializingIndexNotQueryableButMaybeNotWritableAfterPartialRebuild() throws Throwable { TestingIndex.blockCreate(); String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); - String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); + String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); - // the index shouldn't be queryable while the initialization hasn't finished - ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); - Index index = cfs.indexManager.getIndexByName(indexName); - assertFalse(cfs.indexManager.isIndexQueryable(index)); + // the index should never be queryable while the initialization hasn't finished + assertFalse(isQueryable(defaultIndexName)); + assertFalse(isQueryable(readOnlyIndexName)); + assertFalse(isQueryable(writeOnlyIndexName)); + + // the index should always we writable while the initialization hasn't finished + assertTrue(isWritable(defaultIndexName)); + assertTrue(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); - // a failing partial build doesn't set the index as queryable + // a failing partial build doesn't set the index as queryable, but might set it as not writable TestingIndex.shouldFailBuild = true; + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); try { cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this); @@ -422,59 +445,99 @@ public class SecondaryIndexManagerTest extends CQLTester { assertTrue(ex.getMessage().contains("configured to fail")); } - assertFalse(cfs.indexManager.isIndexQueryable(index)); - - // a successful partial build doesn't set the index as queryable + assertFalse(isQueryable(defaultIndexName)); + assertFalse(isQueryable(readOnlyIndexName)); + assertFalse(isQueryable(writeOnlyIndexName)); + assertTrue(isWritable(defaultIndexName)); + assertFalse(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); + + // a successful partial build doesn't set the index as queryable nor writable TestingIndex.shouldFailBuild = false; cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this); - assertFalse(cfs.indexManager.isIndexQueryable(index)); + assertFalse(isQueryable(defaultIndexName)); + assertFalse(isQueryable(readOnlyIndexName)); + assertFalse(isQueryable(writeOnlyIndexName)); + assertTrue(isWritable(defaultIndexName)); + assertFalse(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); // the index should be queryable once the initialization has finished TestingIndex.unblockCreate(); - waitForIndex(KEYSPACE, tableName, indexName); - assertTrue(cfs.indexManager.isIndexQueryable(index)); + waitForIndex(KEYSPACE, tableName, defaultIndexName); + assertTrue(isQueryable(defaultIndexName)); + assertTrue(isQueryable(readOnlyIndexName)); + assertTrue(isQueryable(writeOnlyIndexName)); + assertTrue(isWritable(defaultIndexName)); + assertTrue(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); } @Test - public void indexWithFailedInitializationIsQueryableAfterFullRebuild() throws Throwable + public void indexWithFailedInitializationIsQueryableAndWritableAfterFullRebuild() throws Throwable { - createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); - TestingIndex.shouldFailCreate = true; - String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); - - tryRebuild(indexName, true); + createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); + String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); + String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); + assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName)); + assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName)); + assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName)); + + tryRebuild(defaultIndexName, true); + tryRebuild(readOnlyIndexName, true); + tryRebuild(writeOnlyIndexName, true); TestingIndex.shouldFailCreate = false; - // a successfull full rebuild should set the index as queryable + // a successfull full rebuild should set the index as queryable/writable ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); - Index index = cfs.indexManager.getIndexByName(indexName); - cfs.indexManager.rebuildIndexesBlocking(Collections.singleton(indexName)); - assertTrue(cfs.indexManager.isIndexQueryable(index)); + cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(defaultIndexName, readOnlyIndexName, writeOnlyIndexName)); + assertTrue(isQueryable(defaultIndexName)); + assertTrue(isQueryable(readOnlyIndexName)); + assertTrue(isQueryable(writeOnlyIndexName)); + assertTrue(isWritable(defaultIndexName)); + assertTrue(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); } @Test - public void indexWithFailedInitializationIsNotQueryableAfterPartialRebuild() throws Throwable + public void indexWithFailedInitializationDoesNotChangeQueryabilityNorWritabilityAfterPartialRebuild() throws Throwable { TestingIndex.shouldFailCreate = true; createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"); - String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); - assertTrue(waitForIndexBuilds(KEYSPACE, indexName)); + String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName())); + String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName())); + String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName())); + assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName)); + assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName)); + assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName)); TestingIndex.shouldFailCreate = false; - // the index shouldn't be queryable after the failed initialization - ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); - Index index = cfs.indexManager.getIndexByName(indexName); - assertFalse(cfs.indexManager.isIndexQueryable(index)); + // the index should never be queryable, but it could be writable after the failed initialization + assertFalse(isQueryable(defaultIndexName)); + assertFalse(isQueryable(readOnlyIndexName)); + assertFalse(isQueryable(writeOnlyIndexName)); + assertTrue(isWritable(defaultIndexName)); + assertFalse(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); - // a successful partial build doesn't set the index as queryable + // a successful partial build doesn't set the index as queryable nor writable + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this); - assertTrue(waitForIndexBuilds(KEYSPACE, indexName)); - assertFalse(cfs.indexManager.isIndexQueryable(index)); + assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName)); + assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName)); + assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName)); + assertFalse(isQueryable(defaultIndexName)); + assertFalse(isQueryable(readOnlyIndexName)); + assertFalse(isQueryable(writeOnlyIndexName)); + assertTrue(isWritable(defaultIndexName)); + assertFalse(isWritable(readOnlyIndexName)); + assertTrue(isWritable(writeOnlyIndexName)); } @Test - public void handleJVMStablityOnFailedCreate() throws Throwable + public void handleJVMStablityOnFailedCreate() { handleJVMStablityOnFailedCreate(new SocketException("Should not fail"), false); handleJVMStablityOnFailedCreate(new FileNotFoundException("Should not fail"), false); @@ -483,7 +546,7 @@ public class SecondaryIndexManagerTest extends CQLTester handleJVMStablityOnFailedCreate(new RuntimeException("Should not fail"), false); } - private void handleJVMStablityOnFailedCreate(Throwable throwable, boolean shouldKillJVM) throws Throwable + private void handleJVMStablityOnFailedCreate(Throwable throwable, boolean shouldKillJVM) { KillerForTests killerForTests = new KillerForTests(); JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); @@ -584,71 +647,100 @@ public class SecondaryIndexManagerTest extends CQLTester return done; } + private boolean isQueryable(String indexName) + { + SecondaryIndexManager manager = getCurrentColumnFamilyStore().indexManager; + Index index = manager.getIndexByName(indexName); + return manager.isIndexQueryable(index); + } + + private boolean isWritable(String indexName) + { + SecondaryIndexManager manager = getCurrentColumnFamilyStore().indexManager; + Index index = manager.getIndexByName(indexName); + return manager.isIndexWritable(index); + } + public static class TestingIndex extends StubIndex { private static volatile CountDownLatch createLatch; private static volatile CountDownLatch buildLatch; private static volatile CountDownLatch createWaitLatch; private static volatile CountDownLatch buildWaitLatch; - public static volatile boolean shouldBlockCreate = false; - public static volatile boolean shouldBlockBuild = false; - public static volatile boolean shouldFailCreate = false; - public static volatile boolean shouldFailBuild = false; - public static volatile Throwable failedCreateThrowable; - public static volatile Throwable failedBuildTrowable; - + static volatile boolean shouldBlockCreate = false; + static volatile boolean shouldBlockBuild = false; + static volatile boolean shouldFailCreate = false; + static volatile boolean shouldFailBuild = false; + static volatile Throwable failedCreateThrowable; + static volatile Throwable failedBuildTrowable; + + @SuppressWarnings("WeakerAccess") public TestingIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata) { super(baseCfs, metadata); } - public static void blockCreate() + static void blockCreate() { shouldBlockCreate = true; createLatch = new CountDownLatch(1); createWaitLatch = new CountDownLatch(1); } - public static void blockBuild() + static void blockBuild() { shouldBlockBuild = true; buildLatch = new CountDownLatch(1); buildWaitLatch = new CountDownLatch(1); } - public static void unblockCreate() + static void unblockCreate() { createLatch.countDown(); } - public static void unblockBuild() + static void unblockBuild() { buildLatch.countDown(); } - public static void waitBlockedOnCreate() throws InterruptedException + static void waitBlockedOnCreate() throws InterruptedException { createWaitLatch.await(); } - public static void waitBlockedOnBuild() throws InterruptedException + static void waitBlockedOnBuild() throws InterruptedException { buildWaitLatch.await(); } - public static void clear() + static void clear() { + reset(createLatch); + reset(createWaitLatch); + reset(buildLatch); + reset(buildWaitLatch); createLatch = null; createWaitLatch = null; buildLatch = null; buildWaitLatch = null; shouldBlockCreate = false; shouldBlockBuild = false; + shouldFailCreate = false; shouldFailBuild = false; failedCreateThrowable = null; failedBuildTrowable = null; } + private static void reset(CountDownLatch latch) + { + if (latch == null) + return; + + while (0L < latch.getCount()) + latch.countDown(); + } + public Callable getInitializationTask() { return () -> @@ -719,4 +811,38 @@ public class SecondaryIndexManagerTest extends CQLTester return true; } } + + /** + * TestingIndex that only supports reads when initial build or full rebuild has failed. + */ + public static class ReadOnlyOnFailureIndex extends TestingIndex + { + public ReadOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + super(baseCfs, indexDef); + } + + @Override + public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild) + { + return LoadType.READ; + } + } + + /** + * TestingIndex that only supports writes when initial build or full rebuild has failed. + */ + public static class WriteOnlyOnFailureIndex extends TestingIndex + { + public WriteOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) + { + super(baseCfs, indexDef); + } + + @Override + public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild) + { + return LoadType.WRITE; + } + } } diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java index a351cce0cdb8d1aded54baa7859eeca05fe222a7..02ccbff1349972d34a179ffb9baa79fadc6c5e3b 100644 --- a/test/unit/org/apache/cassandra/index/StubIndex.java +++ b/test/unit/org/apache/cassandra/index/StubIndex.java @@ -36,7 +36,6 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.OpOrder; /** * Basic custom index implementation for testing.