Commit 55a74fe7 authored by Burcu Özkan's avatar Burcu Özkan

Instrumentation to communicate to server via network instead of file-based communication

parent 61290700
## Content:
This repository contains Cassandra version 2.0.0 which reproduces the bug Cas-6023.
The code is cloned from [DMCK](https://github.com/ucare-uchicago/DMCK/tree/master/dmck-target-systems) model checker's target systems which instruments the code to work together with a model checker. The model checker intercepts the messages in the system and delivers them in a specific order.
This repository modifies the instrumentation so that the Cassandra nodes communicate to a model checker or a tester via network.
## Requirements:
- Java 7
- Ant 1.9.14
## Compilation:
```
ant
```
For more details, please refer to README.txt.
## Note:
Installation on a VM is suggested since the software depends on the old versions of the libraries.
...@@ -375,6 +375,7 @@ ...@@ -375,6 +375,7 @@
<dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" /> <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" /> <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.netty" artifactId="netty" version="3.5.9.Final" /> <dependency groupId="io.netty" artifactId="netty" version="3.5.9.Final" />
<dependency groupId="com.google.code.gson" artifactId="gson" version="2.8.5" />
</dependencyManagement> </dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/> <developer id="alakshman" name="Avinash Lakshman"/>
<developer id="antelder" name="Anthony Elder"/> <developer id="antelder" name="Anthony Elder"/>
......
package edu.uchicago.cs.ucare.dmck.interceptor;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import org.apache.log4j.Logger;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
public class ConsistentVerifier {
private static final Logger LOG = Logger.getLogger(ConsistentVerifier.class);
static String workingDir;
static String path;
static File[] dataDir;
static File[] dataLogDir;
static int numNode;
public static void main(String[] args) {
if(args.length != 1){
System.out.println("Usage: please specify <working_dir>");
System.exit(1);
}
workingDir = args[0];
getValues();
}
public static void getValues(){
String values = "";
Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
Session session = cluster.connect("test");
try {
LOG.info("Querying row from table");
ResultSet rs = session.execute("SELECT * FROM tests");
LOG.info("Row acquired");
Row row = rs.one();
values += "owner=" + row.getString("owner") + "\n";
values += "value_1=" + row.getString("value_1") + "\n";
values += "value_2=" + row.getString("value_2") + "\n";
values += "value_3=" + row.getString("value_3") + "\n";
} catch (Exception e) {
LOG.error("ERROR in reading row.");
LOG.error(e);
}
cluster.close();
writeResult(values);
}
public static void writeResult(String content){
try{
PrintWriter writer = new PrintWriter(workingDir + "/temp-verify", "UTF-8");
writer.println(content);
writer.close();
} catch (Exception e){
LOG.error("ERROR in writing temp-verify result.");
LOG.error(e);
}
}
}
package edu.uchicago.cs.ucare.dmck.interceptor;
import java.io.*;
import java.util.Hashtable;
import org.apache.log4j.Logger;
public class InterceptionLayer {
private static final Logger LOG = Logger.getLogger(InterceptionLayer.class);
private static String ipcDir = "/tmp/ipc";
private static Hashtable<String, Integer> respTable = new Hashtable<String, Integer>();
private static int latestSenderStep = 0;
private static int latestReceiverStep = 0;
private static String senderSequencer = "";
private static String receiverSequencer = "";
public static void interceptPaxosEvent(
long sender, long recv, String verb, String payload, String usrval) {
long eventId = getHashId(sender, recv, verb, payload);
String filename = "cassPaxos-" + eventId + "-" + getTimestamp();
String content = "";
content += "sender=" + sender + "\n";
content += "recv=" + recv + "\n";
content += "verb=" + verb + "\n";
content += "payload=" + payload + "\n";
content += "usrval=" + usrval + "\n";
content += "eventId=" + eventId + "\n";
writePacketToFile(filename, content);
LOG.info(
"[DMCK] Intercept a Paxos message event. sender: "
+ sender
+ " recv: "
+ recv
+ " verb: "
+ verb
+ " payload: "
+ payload
+ "usrval:"
+ usrval
+ " filename: "
+ ipcDir
+ "/"
+ filename);
commitFile(filename);
waitForAck(filename);
}
public static void updateState(long sender, String type, int key, String ballot) {
String filename = "cassUpdate-" + sender + "-" + getTimestamp() + "-" + type;
String content = "";
content += "sender=" + sender + "\n";
content += "type=" + type + "\n";
content += "key=" + key + "\n";
content += "ballot=" + ballot + "\n";
writePacketToFile(filename, content);
LOG.info(
"[DMCK] Intercept a state update in node: "
+ sender
+ " type: "
+ type
+ " key: "
+ key
+ " ballot: "
+ ballot
+ " filename: "
+ ipcDir
+ "/"
+ filename);
commitFile(filename);
}
public static void updateState2(long sender, String type, String ballot, int key, int value) {
String filename = "cassUpdate-" + sender + "-" + getTimestamp() + "-" + type;
String content = "";
content += "sender=" + sender + "\n";
content += "type=" + type + "\n";
content += "ballot=" + ballot + "\n";
content += "key=" + key + "\n";
content += "value=" + value + "\n";
writePacketToFile(filename, content);
LOG.info(
"[DMCK] Intercept a state update in node: "
+ sender
+ " type: "
+ type
+ " ballot: "
+ ballot
+ " key: "
+ key
+ " value: "
+ value
+ " filename: "
+ ipcDir
+ "/"
+ filename);
commitFile(filename);
}
public static void updateResponseState(long receiver, String type, boolean response) {
String filename = "cassResponseUpdate-" + receiver + "-" + getTimestamp() + "-" + type;
if (!respTable.containsKey(type)) {
respTable.put(type, 0);
}
respTable.put(type, response ? respTable.get(type) + 1 : respTable.get(type));
String content = "";
content += "recv=" + receiver + "\n";
content += "type=" + type + "\n";
content += "response=" + respTable.get(type) + "\n";
writePacketToFile(filename, content);
LOG.info(
"[DMCK] Intercept a state update in node: "
+ receiver
+ " type: "
+ type
+ " response: "
+ response
+ " filename: "
+ ipcDir
+ "/"
+ filename);
commitFile(filename);
}
public static void updateWorkloadAccomplishement(int id, boolean isApplied) {
String filename = "cassWorkloadUpdate-" + id;
String content = "";
content += "id=" + id + "\n";
content += "isApplied=" + isApplied + "\n";
writePacketToFile(filename, content);
commitFile(filename);
}
private static void writePacketToFile(String filename, String content) {
File file = new File(ipcDir + "/new/" + filename);
try {
file.createNewFile();
FileWriter writer = new FileWriter(file);
writer.write(content);
writer.flush();
writer.close();
} catch (Exception e) {
LOG.error("[DMCK] Error in writing state content to file.");
}
}
private static void commitFile(String filename) {
try {
Runtime.getRuntime()
.exec("mv " + ipcDir + "/new/" + filename + " " + ipcDir + "/send/" + filename);
} catch (Exception e) {
LOG.error("[DMCK] Error in committing file.");
}
}
private static boolean waitForAck(String filename) {
File f = new File(ipcDir + "/ack/" + filename);
boolean dmckResponse = false;
while (!f.exists()) {
try {
Thread.sleep(0, 100);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
if (f.exists()) {
try {
BufferedReader br = new BufferedReader(new FileReader(f));
for (String line; (line = br.readLine()) != null; ) {
if (line.contains("execute=true")) {
dmckResponse = true;
}
if (line.contains("dmckStep")) {
int step = Integer.parseInt(line.split("=")[1]);
while (step - latestSenderStep != 1) {
try {
// Sleep for 5ms if the senderSequencer does not match
Thread.sleep(5);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
} catch (Exception e) {
LOG.error("[DMCK] Error in reading ack file.");
}
}
// Remove DMCK enabling message
try {
Runtime.getRuntime().exec("rm " + ipcDir + "/ack/" + filename);
} catch (Exception e) {
LOG.error("[DMCK] Error in deleting ack file.");
}
return dmckResponse;
}
// Interception Layer to enable receiving message event
public static void enableReceiving(int sender, int recv, String verb) {
LOG.info("[DMCK] Receive message " + verb + " from node-" + sender + " at node-" + recv);
boolean isExpectedFile = false;
File path = new File(ipcDir + "/ack");
String expectedFilename = "";
while (!isExpectedFile) {
if (path.listFiles().length > 0) {
expectedFilename = "recv-" + recv + "-" + (latestReceiverStep + 1);
for (File file : path.listFiles()) {
if (file.getName().equals(expectedFilename)) {
LOG.info("[DMCK] See expected filename=" + expectedFilename);
try {
BufferedReader br = new BufferedReader(new FileReader(file));
boolean correctSender = false;
boolean correctVerb = false;
for (String line; (line = br.readLine()) != null; ) {
if (line.contains("sendNode")) {
int s = Integer.parseInt(line.split("=")[1]);
correctSender = sender == s;
LOG.info("s=" + s + " vs sender=" + sender + " --> " + correctSender);
} else if (line.contains("verb")) {
String v = line.split("=")[1];
correctVerb = verb.equals(v);
LOG.info("v=" + v + " vs verb=" + verb + " --> " + correctVerb);
}
}
if (correctSender && correctVerb) {
LOG.info("[DMCK] Confirmed expected file!");
isExpectedFile = true;
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
} else {
try {
// Sleep for 5ms if the receiverSequencer does not match
Thread.sleep(5);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
// Remove DMCK enabling message
try {
LOG.info("[DMCK] Remove file=" + path.getAbsolutePath() + "/" + expectedFilename);
Runtime.getRuntime().exec("rm " + path.getAbsolutePath() + "/" + expectedFilename);
} catch (Exception e) {
LOG.error("[DMCK] Error in deleting ack file.");
}
LOG.info(
"[DMCK] Enable receiving message " + verb + " from node-" + sender + " at node-" + recv);
}
// Sender Sequencer Counter
public static void incrementSenderSequencer() {
latestSenderStep++;
senderSequencer += latestSenderStep + "-";
LOG.info("[DMCK] Sender Sequencer=" + senderSequencer);
}
// Receiver Sequencer Counter
public static void incrementReceiverSequencer() {
latestReceiverStep++;
receiverSequencer += latestReceiverStep + "-";
LOG.info("[DMCK] Receiver Sequencer=" + receiverSequencer);
}
private static long getHashId(long sender, long recv, String verb, String payload) {
long prime = 31;
long hash = 1;
hash = prime * hash + recv;
hash = prime * hash + sender;
hash = prime * hash + verb.hashCode();
if (verb.equals("PAXOS_PROPOSE_RESPONSE") || verb.equals("PAXOS_PREPARE_RESPONSE")) {
// int response = payload.toLowerCase().contains("response=true") ? 1 : 0;
// hash = prime * hash + response;
} else {
hash = prime * hash + payload.hashCode();
}
return hash;
}
private static long getTimestamp() {
return System.currentTimeMillis() % 100000;
}
}
package edu.uchicago.cs.ucare.dmck.interceptor;
import org.apache.log4j.Logger;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
public class Workload implements Runnable {
private static final Logger LOG = Logger.getLogger(Workload.class);
private int id;
private Cluster cluster;
private String cql;
private Session session;
private boolean isFinished;
private Thread t;
private boolean isApplied = false;
public Workload(int id, Cluster cluster, String cql) {
this.id = id;
this.cluster = cluster;
this.cql = cql;
this.session = null;
this.isFinished = false;
}
public void reset() {
if (session != null) {
try {
session.close();
} catch (Exception e) {
LOG.error("Failed to close Cassandra connection.");
}
}
session = null;
isFinished = false;
}
@Override
public void run() {
t = new Thread(new Runnable() {
@Override
public void run() {
session = cluster.connect("test");
LOG.info("Executing: " + cql);
ResultSet rs = session.execute(cql);
isApplied = rs.wasApplied();
LOG.info("Finished executing: " + cql);
finish();
}
});
t.start();
}
private void finish() {
if (!isFinished) {
notifyDMCK();
isFinished = true;
}
}
private void notifyDMCK() {
LOG.info("Notify DMCK that it has finished a workload with isApplied=" + isApplied);
InterceptionLayer.updateWorkloadAccomplishement(id, isApplied);
}
public boolean isFinished() {
return isFinished;
}
public boolean isApplied() {
return isApplied;
}
}
package edu.uchicago.cs.ucare.dmck.interceptor;
import java.util.Arrays;
import java.net.InetSocketAddress;
import org.apache.log4j.Logger;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.WhiteListPolicy;
public class WorkloadExecutor {
private static final Logger LOG = Logger.getLogger(WorkloadExecutor.class);
public static void main(String[] args) {
if (args.length != 1) {
LOG.error("Parameters are incorrect: <workload_type>");
System.exit(1);
}
String workload = args[0];
if (workload.equals("cass-6023")) {
cass6023();
} else if (workload.equals("cass-6013")) {
cass6013();
}
}
private static void cass6013() {
Cluster cluster1 = Cluster.builder().addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.1", 9042)))).build();
cluster1.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
Cluster cluster2 = Cluster.builder().addContactPoint("127.0.0.2")
.withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.2", 9042)))).build();
cluster2.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
// initiate each workload
Workload w1 = new Workload(1, cluster1, "UPDATE tests SET owner = 'user_2' WHERE name = 'testing' IF owner = 'user_1'");
Workload w2 = new Workload(2, cluster2, "UPDATE tests SET value_1 = 'Y' WHERE name = 'testing' IF owner = 'user_1'");
// run each workload; keep re-executing if the write does not succeed
try {
w1.run();
Thread.sleep(1000);
w2.run();
waitForWorkload(w1);
waitForWorkload(w2);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster1.close();
cluster2.close();
}
private static void cass6023() {
Cluster cluster1 = Cluster.builder().addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.1", 9042)))).build();
cluster1.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
Cluster cluster2 = Cluster.builder().addContactPoint("127.0.0.2")
.withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.2", 9042)))).build();
cluster2.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
Cluster cluster3 = Cluster.builder().addContactPoint("127.0.0.3")
.withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.3", 9042)))).build();
cluster3.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);