Commit 7bbc97e5 authored by Alex Petrov's avatar Alex Petrov

Merge branch 'cassandra-3.11' into trunk

parents 69ea5ffd 04b00498
......@@ -551,7 +551,7 @@
<dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
<dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
<dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
<dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.1" />
<dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.2" />
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
......
......@@ -18,18 +18,13 @@
package org.apache.cassandra.distributed;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.impl.InstanceConfig;
import org.apache.cassandra.distributed.shared.Builder;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.Versions;
/**
......@@ -39,9 +34,9 @@ import org.apache.cassandra.distributed.shared.Versions;
public class Cluster extends AbstractCluster<IInvokableInstance>
{
private Cluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader)
private Cluster(Builder builder)
{
super(root, version, configs, sharedClassLoader);
super(builder);
}
protected IInvokableInstance newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config)
......@@ -49,22 +44,12 @@ public class Cluster extends AbstractCluster<IInvokableInstance>
return new Wrapper(generation, version, config);
}
public static Builder<IInvokableInstance, Cluster> build()
public static Builder build()
{
return new Builder<IInvokableInstance, Cluster>(Cluster::new)
{
{
withVersion(CURRENT_VERSION);
}
protected IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
{
return InstanceConfig.generate(nodeNum, ipAddress, networkTopology, root, token, seedIp);
}
};
return new Builder();
}
public static Builder<IInvokableInstance, Cluster> build(int nodeCount)
public static Builder build(int nodeCount)
{
return build().withNodes(nodeCount);
}
......@@ -78,5 +63,14 @@ public class Cluster extends AbstractCluster<IInvokableInstance>
{
return build(nodeCount).start();
}
public static final class Builder extends AbstractBuilder<IInvokableInstance, Cluster, Builder>
{
public Builder()
{
super(Cluster::new);
withVersion(CURRENT_VERSION);
}
}
}
......@@ -18,18 +18,13 @@
package org.apache.cassandra.distributed;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.impl.InstanceConfig;
import org.apache.cassandra.distributed.shared.Builder;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.Versions;
/**
......@@ -41,9 +36,9 @@ import org.apache.cassandra.distributed.shared.Versions;
*/
public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> implements AutoCloseable
{
private UpgradeableCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader)
private UpgradeableCluster(Builder builder)
{
super(root, version, configs, sharedClassLoader);
super(builder);
}
protected IUpgradeableInstance newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config)
......@@ -51,18 +46,12 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
return new Wrapper(generation, version, config);
}
public static Builder<IUpgradeableInstance, UpgradeableCluster> build()
public static Builder build()
{
return new Builder<IUpgradeableInstance, UpgradeableCluster>(UpgradeableCluster::new)
{
protected IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
{
return InstanceConfig.generate(nodeNum, ipAddress, networkTopology, root, token, seedIp);
}
};
return new Builder();
}
public static Builder<IUpgradeableInstance, UpgradeableCluster> build(int nodeCount)
public static Builder build(int nodeCount)
{
return build().withNodes(nodeCount);
}
......@@ -81,5 +70,14 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
{
return build(nodeCount).withVersion(version).start();
}
public static final class Builder extends AbstractBuilder<IUpgradeableInstance, UpgradeableCluster, Builder>
{
public Builder()
{
super(UpgradeableCluster::new);
}
}
}
......@@ -55,6 +55,8 @@ import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
import org.apache.cassandra.distributed.shared.MessageFilters;
import org.apache.cassandra.distributed.shared.NetworkTopology;
......@@ -96,10 +98,15 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
// to ensure we have instantiated the main classloader's LoggerFactory (and any LogbackStatusListener)
// before we instantiate any for a new instance
private static final Logger logger = LoggerFactory.getLogger(AbstractCluster.class);
private static final AtomicInteger generation = new AtomicInteger();
private static final AtomicInteger GENERATION = new AtomicInteger();
private final File root;
private final ClassLoader sharedClassLoader;
private final int subnet;
private final TokenSupplier tokenSupplier;
private final Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
private final Consumer<IInstanceConfig> configUpdater;
private final int broadcastPort;
// mutated by starting/stopping a node
private final List<I> instances;
......@@ -248,18 +255,26 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
}
}
protected AbstractCluster(File root, Versions.Version initialVersion, List<IInstanceConfig> configs,
ClassLoader sharedClassLoader)
protected AbstractCluster(AbstractBuilder<I, ? extends ICluster<I>, ?> builder)
{
this.root = root;
this.sharedClassLoader = sharedClassLoader;
this.root = builder.getRoot();
this.sharedClassLoader = builder.getSharedClassLoader();
this.subnet = builder.getSubnet();
this.tokenSupplier = builder.getTokenSupplier();
this.nodeIdTopology = builder.getNodeIdTopology();
this.configUpdater = builder.getConfigUpdater();
this.broadcastPort = builder.getBroadcastPort();
this.instances = new ArrayList<>();
this.instanceMap = new HashMap<>();
this.initialVersion = initialVersion;
int generation = AbstractCluster.generation.incrementAndGet();
this.initialVersion = builder.getVersion();
this.filters = new MessageFilters();
for (IInstanceConfig config : configs)
int generation = GENERATION.incrementAndGet();
for (int i = 0; i < builder.getNodeCount(); ++i)
{
int nodeNum = i + 1;
InstanceConfig config = createInstanceConfig(nodeNum);
I instance = newInstanceWrapperInternal(generation, initialVersion, config);
instances.add(instance);
// we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
......@@ -267,7 +282,27 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
if (null != prev)
throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddress() + " vs " + prev.broadcastAddress());
}
this.filters = new MessageFilters();
}
public InstanceConfig newInstanceConfig()
{
return createInstanceConfig(size() + 1);
}
private InstanceConfig createInstanceConfig(int nodeNum)
{
String ipPrefix = "127.0." + subnet + ".";
String seedIp = ipPrefix + "1";
String ipAddress = ipPrefix + nodeNum;
long token = tokenSupplier.token(nodeNum);
NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology);
InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
if (configUpdater != null)
configUpdater.accept(config);
return config;
}
protected abstract I newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config);
......
......@@ -36,6 +36,8 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.QueryResult;
import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.QueryPager;
......@@ -54,18 +56,18 @@ public class Coordinator implements ICoordinator
}
@Override
public QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues)
public SimpleQueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues)
{
return instance().sync(() -> executeInternal(query, consistencyLevel, boundValues)).call();
}
public Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
public Future<SimpleQueryResult> asyncExecuteWithTracingWithResult(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
{
return instance.async(() -> {
try
{
Tracing.instance.newSession(sessionId, Collections.emptyMap());
return executeInternal(query, consistencyLevelOrigin, boundValues).toObjectArrays();
return executeInternal(query, consistencyLevelOrigin, boundValues);
}
finally
{
......@@ -79,7 +81,7 @@ public class Coordinator implements ICoordinator
return org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal());
}
private QueryResult executeInternal(String query, ConsistencyLevel consistencyLevelOrigin, Object[] boundValues)
private SimpleQueryResult executeInternal(String query, ConsistencyLevel consistencyLevelOrigin, Object[] boundValues)
{
ClientState clientState = makeFakeClientState();
CQLStatement prepared = QueryProcessor.getStatement(query, clientState);
......@@ -100,17 +102,7 @@ public class Coordinator implements ICoordinator
null),
System.nanoTime());
if (res != null && res.kind == ResultMessage.Kind.ROWS)
{
ResultMessage.Rows rows = (ResultMessage.Rows) res;
String[] names = rows.result.metadata.names.stream().map(c -> c.name.toString()).toArray(String[]::new);
Object[][] results = RowUtil.toObjects(rows);
return new QueryResult(names, results);
}
else
{
return QueryResult.EMPTY;
}
return RowUtil.toQueryResult(res);
}
public Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
......@@ -124,7 +116,7 @@ public class Coordinator implements ICoordinator
}
@Override
public Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevelOrigin, int pageSize, Object... boundValues)
public QueryResult executeWithPagingWithResult(String query, ConsistencyLevel consistencyLevelOrigin, int pageSize, Object... boundValues)
{
if (pageSize <= 0)
throw new IllegalArgumentException("Page size should be strictly positive but was " + pageSize);
......@@ -157,8 +149,9 @@ public class Coordinator implements ICoordinator
// Usually pager fetches a single page (see SelectStatement#execute). We need to iterate over all
// of the results lazily.
return new Iterator<Object[]>() {
Iterator<Object[]> iter = RowUtil.toObjects(UntypedResultSet.create(selectStatement, toCassandraCL(consistencyLevel), clientState, pager, pageSize));
UntypedResultSet rs = UntypedResultSet.create(selectStatement, toCassandraCL(consistencyLevel), clientState, pager, pageSize);
Iterator<Object[]> it = new Iterator<Object[]>() {
Iterator<Object[]> iter = RowUtil.toObjects(rs);
public boolean hasNext()
{
......@@ -171,6 +164,7 @@ public class Coordinator implements ICoordinator
return instance.sync(() -> iter.next()).call();
}
};
return QueryResults.fromObjectArrayIterator(RowUtil.getColumnNames(rs.metadata()), it);
}).call();
}
......
......@@ -33,6 +33,7 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.shared.NetworkTopology;
public abstract class DelegatingInvokableInstance implements IInvokableInstance
......@@ -58,6 +59,11 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
return delegate().executeInternal(query, args);
}
public SimpleQueryResult executeInternalWithResult(String query, Object... args)
{
return delegate().executeInternalWithResult(query, args);
}
@Override
public UUID schemaVersion()
{
......
......@@ -32,7 +32,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.management.ListenerNotFoundException;
......@@ -69,9 +68,9 @@ import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
......@@ -169,17 +168,13 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
public InetSocketAddress broadcastAddress() { return config.broadcastAddress(); }
@Override
public Object[][] executeInternal(String query, Object... args)
public SimpleQueryResult executeInternalWithResult(String query, Object... args)
{
return sync(() -> {
QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
QueryProcessor.makeInternalOptions(prepared.statement, args));
if (result instanceof ResultMessage.Rows)
return RowUtil.toObjects((ResultMessage.Rows)result);
else
return null;
return RowUtil.toQueryResult(result);
}).call();
}
......
......@@ -28,10 +28,32 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.transport.messages.ResultMessage;
public class RowUtil
{
public static SimpleQueryResult toQueryResult(ResultMessage res)
{
if (res != null && res.kind == ResultMessage.Kind.ROWS)
{
ResultMessage.Rows rows = (ResultMessage.Rows) res;
String[] names = getColumnNames(rows.result.metadata.names);
Object[][] results = RowUtil.toObjects(rows);
return new SimpleQueryResult(names, results);
}
else
{
return QueryResults.empty();
}
}
public static String[] getColumnNames(List<ColumnSpecification> names)
{
return names.stream().map(c -> c.name.toString()).toArray(String[]::new);
}
public static Object[][] toObjects(ResultMessage.Rows rows)
{
Object[][] result = new Object[rows.result.rows.size()][];
......
......@@ -25,18 +25,15 @@ import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.Builder;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.shared.DistributedTestBase.KEYSPACE;
// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
public class BootstrapTest extends TestBaseImpl
......@@ -47,19 +44,18 @@ public class BootstrapTest extends TestBaseImpl
{
int originalNodeCount = 2;
int expandedNodeCount = originalNodeCount + 1;
Builder<IInstance, ICluster> builder = builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP));
Cluster.Builder builder = builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP));
Map<Integer, Long> withBootstrap = null;
Map<Integer, Long> naturally = null;
try (ICluster<IInvokableInstance> cluster = builder.withNodes(originalNodeCount).start())
try (Cluster cluster = builder.withNodes(originalNodeCount).start())
{
populate(cluster);
IInstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.newInstanceConfig(cluster);
IInstanceConfig config = cluster.newInstanceConfig();
config.set("auto_bootstrap", true);
cluster.bootstrap(config).startup();
......
......@@ -23,8 +23,6 @@ import org.junit.BeforeClass;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.shared.Builder;
import org.apache.cassandra.distributed.shared.DistributedTestBase;
public class TestBaseImpl extends DistributedTestBase
......@@ -40,9 +38,9 @@ public class TestBaseImpl extends DistributedTestBase
}
@Override
public <I extends IInstance, C extends ICluster> Builder<I, C> builder() {
public Cluster.Builder builder() {
// This is definitely not the smartest solution, but given the complexity of the alternatives and low risk, we can just rely on the
// fact that this code is going to work accross _all_ versions.
return (Builder<I, C>) Cluster.build();
return Cluster.build();
}
}
......@@ -30,15 +30,13 @@ import org.junit.BeforeClass;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.impl.InstanceConfig;
import org.apache.cassandra.distributed.shared.Builder;
import org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.distributed.shared.Versions;
import static org.apache.cassandra.distributed.shared.Versions.Version;
import static org.apache.cassandra.distributed.shared.Versions.Major;
import static org.apache.cassandra.distributed.shared.Versions.Version;
import static org.apache.cassandra.distributed.shared.Versions.find;
public class UpgradeTestBase extends DistributedTestBase
......@@ -57,9 +55,9 @@ public class UpgradeTestBase extends DistributedTestBase
}
public <I extends IInstance, C extends ICluster> Builder<I, C> builder()
public UpgradeableCluster.Builder builder()
{
return (Builder<I, C>) UpgradeableCluster.build();
return UpgradeableCluster.build();
}
public static interface RunOnCluster
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment