Commit 58141308 authored by bryn's avatar bryn Committed by Brandon Williams

CASSANDRA-15677 Add the ability to run dTests on the same interface.

parent 6317fefe
......@@ -148,7 +148,15 @@ public class FBUtilities
{
if (localInetAddressAndPort == null)
{
localInetAddressAndPort = InetAddressAndPort.getByAddress(getJustLocalAddress());
if(DatabaseDescriptor.getRawConfig() == null)
{
localInetAddressAndPort = InetAddressAndPort.getByAddress(getJustLocalAddress());
}
else
{
localInetAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustLocalAddress(),
DatabaseDescriptor.getStoragePort());
}
}
return localInetAddressAndPort;
}
......@@ -175,7 +183,15 @@ public class FBUtilities
{
if (broadcastInetAddressAndPort == null)
{
broadcastInetAddressAndPort = InetAddressAndPort.getByAddress(getJustBroadcastAddress());
if(DatabaseDescriptor.getRawConfig() == null)
{
broadcastInetAddressAndPort = InetAddressAndPort.getByAddress(getJustBroadcastAddress());
}
else
{
broadcastInetAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustBroadcastAddress(),
DatabaseDescriptor.getStoragePort());
}
}
return broadcastInetAddressAndPort;
}
......@@ -218,8 +234,15 @@ public class FBUtilities
public static InetAddressAndPort getBroadcastNativeAddressAndPort()
{
if (broadcastNativeAddressAndPort == null)
broadcastNativeAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustBroadcastNativeAddress(),
DatabaseDescriptor.getNativeTransportPort());
if(DatabaseDescriptor.getRawConfig() == null)
{
broadcastNativeAddressAndPort = InetAddressAndPort.getByAddress(getJustBroadcastNativeAddress());
}
else
{
broadcastNativeAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustBroadcastNativeAddress(),
DatabaseDescriptor.getNativeTransportPort());
}
return broadcastNativeAddressAndPort;
}
......
......@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.collect.Sets;
......@@ -56,7 +57,6 @@ import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
import org.apache.cassandra.distributed.shared.MessageFilters;
import org.apache.cassandra.distributed.shared.NetworkTopology;
......@@ -66,6 +66,8 @@ import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort;
/**
* AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}.
*
......@@ -116,8 +118,30 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
// mutated by user-facing API
private final MessageFilters filters;
private final INodeProvisionStrategy.Strategy nodeProvisionStrategy;
private volatile Thread.UncaughtExceptionHandler previousHandler = null;
/**
* Common builder, add methods that are applicable to both Cluster and Upgradable cluster here.
*/
public static abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>>
extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, B>
{
private INodeProvisionStrategy.Strategy nodeProvisionStrategy = INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
public AbstractBuilder(Factory<I, C, B> factory)
{
super(factory);
}
public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy nodeProvisionStrategy)
{
this.nodeProvisionStrategy = nodeProvisionStrategy;
return (B) this;
}
}
protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
{
private final int generation;
......@@ -264,6 +288,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
this.nodeIdTopology = builder.getNodeIdTopology();
this.configUpdater = builder.getConfigUpdater();
this.broadcastPort = builder.getBroadcastPort();
this.nodeProvisionStrategy = builder.nodeProvisionStrategy;
this.instances = new ArrayList<>();
this.instanceMap = new HashMap<>();
this.initialVersion = builder.getVersion();
......@@ -291,20 +316,30 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
private InstanceConfig createInstanceConfig(int nodeNum)
{
String ipPrefix = "127.0." + subnet + ".";
String seedIp = ipPrefix + "1";
String ipAddress = ipPrefix + nodeNum;
INodeProvisionStrategy provisionStrategy = nodeProvisionStrategy.create(subnet);
long token = tokenSupplier.token(nodeNum);
NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology);
InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
NetworkTopology topology = buildNetworkTopology(provisionStrategy, nodeIdTopology);
InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, Long.toString(token));
if (configUpdater != null)
configUpdater.accept(config);
return config;
}
public static NetworkTopology buildNetworkTopology(INodeProvisionStrategy provisionStrategy,
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
{
NetworkTopology topology = NetworkTopology.build("", 0, Collections.emptyMap());
IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
InetSocketAddress addressAndPort = addressAndPort(provisionStrategy.ipAddress(nodeId), provisionStrategy.storagePort(nodeId));
NetworkTopology.DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
topology.put(addressAndPort, dcAndRack);
});
return topology;
}
protected abstract I newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config);
protected I newInstanceWrapperInternal(int generation, Versions.Version version, IInstanceConfig config)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.impl;
public interface INodeProvisionStrategy
{
public enum Strategy
{
OneNetworkInterface
{
INodeProvisionStrategy create(int subnet) {
return new INodeProvisionStrategy()
{
public String seedIp()
{
return "127.0." + subnet + ".1";
}
public int seedPort()
{
return 7012;
}
public String ipAddress(int nodeNum)
{
return "127.0." + subnet + ".1";
}
public int storagePort(int nodeNum)
{
return 7011 + nodeNum;
}
public int nativeTransportPort(int nodeNum)
{
return 9041 + nodeNum;
}
};
}
},
MultipleNetworkInterfaces
{
INodeProvisionStrategy create(int subnet) {
String ipPrefix = "127.0." + subnet + ".";
return new INodeProvisionStrategy()
{
public String seedIp()
{
return ipPrefix + "1";
}
public int seedPort()
{
return 7012;
}
public String ipAddress(int nodeNum)
{
return ipPrefix + nodeNum;
}
public int storagePort(int nodeNum)
{
return 7012;
}
public int nativeTransportPort(int nodeNum)
{
return 9042;
}
};
}
};
abstract INodeProvisionStrategy create(int subnet);
}
abstract String seedIp();
abstract int seedPort();
abstract String ipAddress(int nodeNum);
abstract int storagePort(int nodeNum);
abstract int nativeTransportPort(int nodeNum);
}
......@@ -66,12 +66,15 @@ public class InstanceConfig implements IInstanceConfig
String broadcast_rpc_address,
String rpc_address,
String seedIp,
int seedPort,
String saved_caches_directory,
String[] data_file_directories,
String commitlog_directory,
String hints_directory,
String cdc_raw_directory,
String initial_token)
String initial_token,
int storage_port,
int native_transport_port)
{
this.num = num;
this.networkTopology = networkTopology;
......@@ -97,10 +100,11 @@ public class InstanceConfig implements IInstanceConfig
.set("concurrent_compactors", 1)
.set("memtable_heap_space_in_mb", 10)
.set("commitlog_sync", "batch")
.set("storage_port", 7012)
.set("storage_port", storage_port)
.set("native_transport_port", native_transport_port)
.set("endpoint_snitch", DistributedTestSnitch.class.getName())
.set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(),
Collections.singletonMap("seeds", seedIp + ":7012")))
Collections.singletonMap("seeds", seedIp + ":" + seedPort)))
// required settings for dtest functionality
.set("diagnostic_events_enabled", true)
.set("auto_bootstrap", false)
......@@ -263,21 +267,28 @@ public class InstanceConfig implements IInstanceConfig
return (String)params.get(name);
}
public static InstanceConfig generate(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
public static InstanceConfig generate(int nodeNum,
INodeProvisionStrategy provisionStrategy,
NetworkTopology networkTopology,
File root,
String token)
{
return new InstanceConfig(nodeNum,
networkTopology,
ipAddress,
ipAddress,
ipAddress,
ipAddress,
seedIp,
provisionStrategy.ipAddress(nodeNum),
provisionStrategy.ipAddress(nodeNum),
provisionStrategy.ipAddress(nodeNum),
provisionStrategy.ipAddress(nodeNum),
provisionStrategy.seedIp(),
provisionStrategy.seedPort(),
String.format("%s/node%d/saved_caches", root, nodeNum),
new String[] { String.format("%s/node%d/data", root, nodeNum) },
String.format("%s/node%d/commitlog", root, nodeNum),
String.format("%s/node%d/hints", root, nodeNum),
String.format("%s/node%d/cdc", root, nodeNum),
token);
token,
provisionStrategy.storagePort(nodeNum),
provisionStrategy.nativeTransportPort(nodeNum));
}
public InstanceConfig forVersion(Versions.Major major)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment