Commit 04533e6c authored by Sam Tunnicliffe's avatar Sam Tunnicliffe

Avoid blocking AntiEntropyStage when submitting validation requests

Patch by Sam Tunnicliffe; reviewed by Benjamin Lerer for CASSANDRA-15812

Switches ValidationExecutor's work queue to LinkedBlockingQueue to
avoid blocking AntiEntropyStage when the executor is saturated. This
requires VE.corePoolSize to be set to concurrent_validations as now
it will always prefer to queue requests rather than start new threads.

This commit also adds a hard limit on concurrent_validations, as allowing
an unbounded number of validations to run concurrently is never safe.
This was always true, but setting a high value here is now more
dangerous as it controls the number of core, not max, threads.
This hard limit is linked to concurrent_compactors, so operators may
set concurrent_validations between 1 and concurrent_compactors.
The meaning of setting it < 1 has changed from "unbounded" to
"whatever concurrent_compactors is set to".

This safety valve can be overridden with a system property at startup
and/or a JMX property.

CASSANDRA-9292 removed the 1hr timeout on prepare messages, but this
was inadvertently undone when CASSANDRA-13397 was committed. As nothing
long running is done in the repair phase anymore, this timeout can
safely be reduced.

If using RepairCommandPoolFullStrategy.queue, the core pool size
for repairCommandExecutor must be increased from the default
value of 1 or else all concurrent tasks will be queued and no
more threads created.
parent bc600f1b
4.0-alpha5
* Prevent validation request submission from blocking ANTI_ENTROPY stage (CASSANDRA-15812)
* Add fqltool and auditlogviewer to rpm and deb packages (CASSANDRA-14712)
* Include DROPPED_COLUMNS in schema digest computation (CASSANDRA-15843)
* Fix Cassandra restart from rpm install (CASSANDRA-15830)
......
......@@ -814,8 +814,13 @@ column_index_cache_size_in_kb: 2
# to the number of cores.
#concurrent_compactors: 1
# Number of simultaneous repair validations to allow. Default is unbounded
# Values less than one are interpreted as unbounded (the default)
# Number of simultaneous repair validations to allow. If not set or set to
# a value less than 1, it defaults to the value of concurrent_compactors.
# To set a value greeater than concurrent_compactors at startup, the system
# property cassandra.allow_unlimited_concurrent_validations must be set to
# true. To dynamically resize to a value > concurrent_compactors on a running
# node, first call the bypassConcurrentValidatorsLimit method on the
# org.apache.cassandra.db:type=StorageService mbean
# concurrent_validations: 0
# Number of simultaneous materialized view builder tasks to allow.
......
......@@ -216,7 +216,6 @@ public class Config
public volatile int compaction_large_partition_warning_threshold_mb = 100;
public int min_free_space_per_drive_in_mb = 50;
public volatile int concurrent_validations = Integer.MAX_VALUE;
public volatile int concurrent_materialized_view_builders = 1;
/**
......@@ -416,6 +415,7 @@ public class Config
public volatile boolean back_pressure_enabled = false;
public volatile ParameterizedClass back_pressure_strategy;
public volatile int concurrent_validations;
public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
public int repair_command_pool_size = concurrent_validations;
......
......@@ -152,6 +152,8 @@ public class DatabaseDescriptor
// turns some warnings into exceptions for testing
private static final boolean strictRuntimeChecks = Boolean.getBoolean("cassandra.strict.runtime.checks");
public static volatile boolean allowUnlimitedConcurrentValidations = Boolean.getBoolean("cassandra.allow_unlimited_concurrent_validations");
private static Function<CommitLog, AbstractCommitLogSegmentManager> commitLogSegmentMgrProvider = c -> DatabaseDescriptor.isCDCEnabled()
? new CommitLogSegmentManagerCDC(c, DatabaseDescriptor.getCommitLogLocation())
: new CommitLogSegmentManagerStandard(c, DatabaseDescriptor.getCommitLogLocation());
......@@ -668,12 +670,12 @@ public class DatabaseDescriptor
if (conf.concurrent_compactors == null)
conf.concurrent_compactors = Math.min(8, Math.max(2, Math.min(FBUtilities.getAvailableProcessors(), conf.data_file_directories.length)));
if (conf.concurrent_validations < 1)
conf.concurrent_validations = Integer.MAX_VALUE;
if (conf.concurrent_compactors <= 0)
throw new ConfigurationException("concurrent_compactors should be strictly greater than 0, but was " + conf.concurrent_compactors, false);
applyConcurrentValidations(conf);
applyRepairCommandPoolSize(conf);
if (conf.concurrent_materialized_view_builders <= 0)
throw new ConfigurationException("concurrent_materialized_view_builders should be strictly greater than 0, but was " + conf.concurrent_materialized_view_builders, false);
......@@ -841,6 +843,27 @@ public class DatabaseDescriptor
validateMaxConcurrentAutoUpgradeTasksConf(conf.max_concurrent_automatic_sstable_upgrades);
}
@VisibleForTesting
static void applyConcurrentValidations(Config config)
{
if (config.concurrent_validations < 1)
{
config.concurrent_validations = config.concurrent_compactors;
}
else if (config.concurrent_validations > config.concurrent_compactors && !allowUnlimitedConcurrentValidations)
{
throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " +
"set the system property cassandra.allow_unlimited_concurrent_validations=true");
}
}
@VisibleForTesting
static void applyRepairCommandPoolSize(Config config)
{
if (config.repair_command_pool_size < 1)
config.repair_command_pool_size = config.concurrent_validations;
}
private static String storagedirFor(String type)
{
return storagedir(type + "_directory") + File.separator + type;
......
......@@ -124,7 +124,7 @@ public class CompactionManager implements CompactionManagerMBean
}
private final CompactionExecutor executor = new CompactionExecutor();
private final CompactionExecutor validationExecutor = new ValidationExecutor();
private final ValidationExecutor validationExecutor = new ValidationExecutor();
private final CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor();
private final CompactionExecutor viewBuildExecutor = new ViewBuildExecutor();
......@@ -1897,9 +1897,32 @@ public class CompactionManager implements CompactionManagerMBean
// TODO: pull out relevant parts of CompactionExecutor and move to ValidationManager
public static class ValidationExecutor extends CompactionExecutor
{
// CompactionExecutor, and by extension ValidationExecutor, use DebuggableThreadPoolExecutor's
// default RejectedExecutionHandler which blocks the submitting thread when the work queue is
// full. The calling thread in this case is AntiEntropyStage, so in most cases we don't actually
// want to block when the ValidationExecutor is saturated as this prevents progress on all
// repair tasks and may cause repair sessions to time out. Also, it can lead to references to
// heavyweight validation responses containing merkle trees being held for extended periods which
// increases GC pressure. Using LinkedBlockingQueue instead of the default SynchronousQueue allows
// tasks to be submitted without blocking the caller, but will always prefer queueing to creating
// new threads if the pool already has at least `corePoolSize` threads already running. For this
// reason we set corePoolSize to the maximum desired concurrency, but allow idle core threads to
// be terminated.
public ValidationExecutor()
{
super(1, DatabaseDescriptor.getConcurrentValidations(), "ValidationExecutor", new SynchronousQueue<Runnable>());
super(DatabaseDescriptor.getConcurrentValidations(),
DatabaseDescriptor.getConcurrentValidations(),
"ValidationExecutor",
new LinkedBlockingQueue());
allowCoreThreadTimeOut(true);
}
public void adjustPoolSize()
{
setMaximumPoolSize(DatabaseDescriptor.getConcurrentValidations());
setCorePoolSize(DatabaseDescriptor.getConcurrentValidations());
}
}
......@@ -2021,10 +2044,9 @@ public class CompactionManager implements CompactionManagerMBean
}
}
public void setConcurrentValidations(int value)
public void setConcurrentValidations()
{
value = value > 0 ? value : Integer.MAX_VALUE;
validationExecutor.setMaximumPoolSize(value);
validationExecutor.adjustPoolSize();
}
public void setConcurrentViewBuilders(int value)
......
......@@ -23,6 +23,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
......@@ -126,24 +127,55 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions = new ConcurrentHashMap<>();
public final static ExecutorService repairCommandExecutor;
static
{
Config.RepairCommandPoolFullStrategy strategy = DatabaseDescriptor.getRepairCommandPoolFullStrategy();
RepairMetrics.init();
}
public static class RepairCommandExecutorHandle
{
private static final ThreadPoolExecutor repairCommandExecutor =
initializeExecutor(DatabaseDescriptor.getRepairCommandPoolSize(),
DatabaseDescriptor.getRepairCommandPoolFullStrategy());
}
@VisibleForTesting
static ThreadPoolExecutor initializeExecutor(int maxPoolSize, Config.RepairCommandPoolFullStrategy strategy)
{
int corePoolSize = 1;
BlockingQueue<Runnable> queue;
if (strategy == Config.RepairCommandPoolFullStrategy.reject)
{
// new threads will be created on demand up to max pool
// size so we can leave corePoolSize at 1 to start with
queue = new SynchronousQueue<>();
}
else
{
// new threads are only created if > corePoolSize threads are running
// and the queue is full, so set corePoolSize to the desired max as the
// queue will _never_ be full. Idle core threads will eventually time
// out and may be re-created if/when subsequent tasks are submitted.
corePoolSize = maxPoolSize;
queue = new LinkedBlockingQueue<>();
}
repairCommandExecutor = new JMXEnabledThreadPoolExecutor(1,
DatabaseDescriptor.getRepairCommandPoolSize(),
1, TimeUnit.HOURS,
queue,
new NamedThreadFactory("Repair-Task"),
"internal",
new ThreadPoolExecutor.AbortPolicy());
RepairMetrics.init();
ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(corePoolSize,
maxPoolSize,
1,
TimeUnit.HOURS,
queue,
new NamedThreadFactory("Repair-Task"),
"internal",
new ThreadPoolExecutor.AbortPolicy());
// allow idle core threads to be terminated
executor.allowCoreThreadTimeOut(true);
return executor;
}
public static ThreadPoolExecutor repairCommandExecutor()
{
return RepairCommandExecutorHandle.repairCommandExecutor;
}
private final IFailureDetector failureDetector;
......@@ -461,10 +493,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
}
try
{
// Failed repair is expensive so we wait for longer time.
if (!prepareLatch.await(1, TimeUnit.HOURS)) {
if (!prepareLatch.await(DatabaseDescriptor.getRpcTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS))
failRepair(parentRepairSession, "Did not get replies from all endpoints.");
}
}
catch (InterruptedException e)
{
......
......@@ -1440,6 +1440,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
CompactionManager.instance.setConcurrentCompactors(value);
}
public void bypassConcurrentValidatorsLimit()
{
logger.info("Enabling the ability to set concurrent validations to an unlimited value");
DatabaseDescriptor.allowUnlimitedConcurrentValidations = true ;
}
public void enforceConcurrentValidatorsLimit()
{
logger.info("Disabling the ability to set concurrent validations to an unlimited value");
DatabaseDescriptor.allowUnlimitedConcurrentValidations = false ;
}
public boolean isConcurrentValidatorsLimitEnforced()
{
return DatabaseDescriptor.allowUnlimitedConcurrentValidations;
}
public int getConcurrentValidators()
{
return DatabaseDescriptor.getConcurrentValidations();
......@@ -1447,8 +1464,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void setConcurrentValidators(int value)
{
int concurrentCompactors = DatabaseDescriptor.getConcurrentCompactors();
if (value > concurrentCompactors && !DatabaseDescriptor.allowUnlimitedConcurrentValidations)
throw new IllegalArgumentException(
String.format("Cannot set concurrent_validations greater than concurrent_compactors (%d)",
concurrentCompactors));
if (value <= 0)
{
logger.info("Using default value of concurrent_compactors ({}) for concurrent_validations", concurrentCompactors);
value = concurrentCompactors;
}
else
{
logger.info("Setting concurrent_validations to {}", value);
}
DatabaseDescriptor.setConcurrentValidations(value);
CompactionManager.instance.setConcurrentValidations(DatabaseDescriptor.getConcurrentValidations());
CompactionManager.instance.setConcurrentValidations();
}
public int getConcurrentViewBuilders()
......@@ -3733,7 +3766,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Pair.create(0, Futures.immediateFuture(null));
int cmd = nextRepairCommand.incrementAndGet();
return Pair.create(cmd, ActiveRepairService.repairCommandExecutor.submit(createRepairTask(cmd, keyspace, option, listeners)));
return Pair.create(cmd, ActiveRepairService.repairCommandExecutor().submit(createRepairTask(cmd, keyspace, option, listeners)));
}
/**
......
......@@ -581,6 +581,10 @@ public interface StorageServiceMBean extends NotificationEmitter
public int getConcurrentCompactors();
public void setConcurrentCompactors(int value);
public void bypassConcurrentValidatorsLimit();
public void enforceConcurrentValidatorsLimit();
public boolean isConcurrentValidatorsLimitEnforced();
public int getConcurrentValidators();
public void setConcurrentValidators(int value);
......
......@@ -553,7 +553,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
() -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES),
() -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
() -> SSTableReader.shutdownBlocking(1L, MINUTES),
() -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor)),
() -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
() -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
);
......
......@@ -42,19 +42,19 @@ public abstract class RepairCoordinatorTimeout extends RepairCoordinatorBase
NodeToolResult result = repair(1, KEYSPACE, table);
result.asserts()
.failure()
.errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
.errorContains("Did not get replies from all endpoints.");
if (withNotifications)
{
result.asserts()
.notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
.notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
.notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
.notificationContains(NodeToolResult.ProgressEventType.ERROR, "Did not get replies from all endpoints.")
.notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
}
if (repairType != RepairType.PREVIEW)
{
assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Did not get replies from all endpoints.");
}
else
{
......
......@@ -26,9 +26,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import com.google.common.base.Throwables;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -37,6 +37,7 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -455,4 +456,50 @@ public class DatabaseDescriptorTest
assertEquals(100, // total size is more than preferred so keep the configured limit
DatabaseDescriptor.calculateDefaultSpaceInMB("type", "/path", "setting_name", preferredInMB, spaceInBytes, numerator, denominator));
}
@Test
public void testConcurrentValidations()
{
Config conf = new Config();
conf.concurrent_compactors = 8;
// if concurrent_validations is < 1 (including being unset) it should default to concurrent_compactors
assertThat(conf.concurrent_validations).isLessThan(1);
DatabaseDescriptor.applyConcurrentValidations(conf);
assertThat(conf.concurrent_validations).isEqualTo(conf.concurrent_compactors);
// otherwise, it must be <= concurrent_compactors
conf.concurrent_validations = conf.concurrent_compactors + 1;
try
{
DatabaseDescriptor.applyConcurrentValidations(conf);
fail("Expected exception");
}
catch (ConfigurationException e)
{
assertThat(e.getMessage()).isEqualTo("To set concurrent_validations > concurrent_compactors, " +
"set the system property cassandra.allow_unlimited_concurrent_validations=true");
}
// unless we disable that check (done with a system property at startup or via JMX)
DatabaseDescriptor.allowUnlimitedConcurrentValidations = true;
conf.concurrent_validations = conf.concurrent_compactors + 1;
DatabaseDescriptor.applyConcurrentValidations(conf);
assertThat(conf.concurrent_validations).isEqualTo(conf.concurrent_compactors + 1);
}
@Test
public void testRepairCommandPoolSize()
{
Config conf = new Config();
conf.concurrent_validations = 3;
// if repair_command_pool_size is < 1 (including being unset) it should default to concurrent_validations
assertThat(conf.repair_command_pool_size).isLessThan(1);
DatabaseDescriptor.applyRepairCommandPoolSize(conf);
assertThat(conf.repair_command_pool_size).isEqualTo(conf.concurrent_validations);
// but it can be overridden
conf.repair_command_pool_size = conf.concurrent_validations + 1;
DatabaseDescriptor.applyRepairCommandPoolSize(conf);
assertThat(conf.repair_command_pool_size).isEqualTo(conf.concurrent_validations + 1);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
public class ValidationExecutorTest
{
CompactionManager.ValidationExecutor validationExecutor;
@Before
public void setup()
{
DatabaseDescriptor.clientInitialization();
// required for static initialization of CompactionManager
DatabaseDescriptor.setConcurrentCompactors(2);
DatabaseDescriptor.setConcurrentValidations(2);
// shutdown the singleton CompactionManager to ensure MBeans are unregistered
CompactionManager.instance.forceShutdown();
}
@After
public void tearDown()
{
if (null != validationExecutor)
validationExecutor.shutdownNow();
}
@Test
public void testQueueOnValidationSubmission() throws InterruptedException
{
Condition taskBlocked = new SimpleCondition();
AtomicInteger threadsAvailable = new AtomicInteger(DatabaseDescriptor.getConcurrentValidations());
CountDownLatch taskComplete = new CountDownLatch(5);
validationExecutor = new CompactionManager.ValidationExecutor();
ExecutorService testExecutor = Executors.newSingleThreadExecutor();
for (int i=0; i< 5; i++)
testExecutor.submit(() -> {
threadsAvailable.decrementAndGet();
validationExecutor.submit(new Task(taskBlocked, taskComplete));
});
// wait for all tasks to be submitted & check that the excess ones were queued
while (threadsAvailable.get() > 0)
TimeUnit.MILLISECONDS.sleep(10);
assertEquals(2, validationExecutor.getActiveTaskCount());
assertEquals(3, validationExecutor.getPendingTaskCount());
taskBlocked.signalAll();
taskComplete.await(10, TimeUnit.SECONDS);
validationExecutor.shutdownNow();
}
@Test
public void testAdjustPoolSize()
{
// adjusting the pool size should dynamically set core and max pool
// size to DatabaseDescriptor::getConcurrentValidations
validationExecutor = new CompactionManager.ValidationExecutor();
int corePoolSize = validationExecutor.getCorePoolSize();
int maxPoolSize = validationExecutor.getMaximumPoolSize();
DatabaseDescriptor.setConcurrentValidations(corePoolSize * 2);
validationExecutor.adjustPoolSize();
assertThat(validationExecutor.getCorePoolSize()).isEqualTo(corePoolSize * 2);
assertThat(validationExecutor.getMaximumPoolSize()).isEqualTo(maxPoolSize * 2);
validationExecutor.shutdownNow();
}
private static class Task implements Runnable
{
private final Condition blocked;
private final CountDownLatch complete;
Task(Condition blocked, CountDownLatch complete)
{
this.blocked = blocked;
this.complete = complete;
}
public void run()
{
Uninterruptibles.awaitUninterruptibly(blocked, 10, TimeUnit.SECONDS);
complete.countDown();
}
}
}
......@@ -19,15 +19,26 @@
package org.apache.cassandra.service;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
......@@ -47,6 +58,7 @@ import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.apache.cassandra.repair.messages.RepairOption.DATACENTERS_KEY;
import static org.apache.cassandra.repair.messages.RepairOption.FORCE_REPAIR_KEY;
......@@ -352,4 +364,98 @@ public class ActiveRepairServiceTest
// full repair
Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(false)), false));
}
@Test
public void testRejectWhenPoolFullStrategy() throws InterruptedException
{
// Using RepairCommandPoolFullStrategy.reject, new threads are spawned up to
// repair_command_pool_size, at which point futher submissions are rejected
ExecutorService validationExecutor = ActiveRepairService.initializeExecutor(2, Config.RepairCommandPoolFullStrategy.reject);
try
{
Condition blocked = new SimpleCondition();
CountDownLatch completed = new CountDownLatch(2);