diff --git a/README.md b/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..c48c99e0641be9fb6947292d13d4ddaf1ca63430
--- /dev/null
+++ b/README.md
@@ -0,0 +1,28 @@
+## Content:
+
+This repository contains Cassandra version 2.0.0 which reproduces the bug Cas-6023.
+
+The code is cloned from [DMCK](https://github.com/ucare-uchicago/DMCK/tree/master/dmck-target-systems) model checker's target systems which instruments the code to work together with a model checker. The model checker intercepts the messages in the system and delivers them in a specific order.
+
+This repository modifies the instrumentation so that the Cassandra nodes communicate to a model checker or a tester via network.
+
+## Requirements:
+
+
+- Java 7
+
+- Ant 1.9.14
+
+
+## Compilation:
+
+```
+ant
+```
+
+For more details, please refer to README.txt.
+
+
+
+## Note:
+Installation on a VM is suggested since the software depends on the old versions of the libraries.
diff --git a/build.xml b/build.xml
index bae1c576a6943fc392475a417c4d1b09b2eae67f..bd3c7acdbc6511ad0c9833253ccd021fda4bc98f 100644
--- a/build.xml
+++ b/build.xml
@@ -375,6 +375,7 @@
+
diff --git a/lib/gson-2.8.2.jar b/lib/gson-2.8.2.jar
new file mode 100755
index 0000000000000000000000000000000000000000..d0d030c6371cb44e7caee238f7bf148c7732ce28
Binary files /dev/null and b/lib/gson-2.8.2.jar differ
diff --git a/src/java/edu/uchicago/cs/ucare/dmck/interceptor/ConsistentVerifier.java b/src/java/edu/uchicago/cs/ucare/dmck/interceptor/ConsistentVerifier.java
deleted file mode 100644
index e7b6425b4f174edbbe39e5287297570115dbc63e..0000000000000000000000000000000000000000
--- a/src/java/edu/uchicago/cs/ucare/dmck/interceptor/ConsistentVerifier.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package edu.uchicago.cs.ucare.dmck.interceptor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.HashMap;
-
-import org.apache.log4j.Logger;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
-
-public class ConsistentVerifier {
-
- private static final Logger LOG = Logger.getLogger(ConsistentVerifier.class);
-
- static String workingDir;
- static String path;
- static File[] dataDir;
- static File[] dataLogDir;
- static int numNode;
-
- public static void main(String[] args) {
- if(args.length != 1){
- System.out.println("Usage: please specify ");
- System.exit(1);
- }
- workingDir = args[0];
- getValues();
- }
-
- public static void getValues(){
- String values = "";
-
- Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
- Session session = cluster.connect("test");
- try {
- LOG.info("Querying row from table");
- ResultSet rs = session.execute("SELECT * FROM tests");
- LOG.info("Row acquired");
- Row row = rs.one();
- values += "owner=" + row.getString("owner") + "\n";
- values += "value_1=" + row.getString("value_1") + "\n";
- values += "value_2=" + row.getString("value_2") + "\n";
- values += "value_3=" + row.getString("value_3") + "\n";
- } catch (Exception e) {
- LOG.error("ERROR in reading row.");
- LOG.error(e);
- }
-
- cluster.close();
-
- writeResult(values);
- }
-
- public static void writeResult(String content){
- try{
- PrintWriter writer = new PrintWriter(workingDir + "/temp-verify", "UTF-8");
- writer.println(content);
- writer.close();
- } catch (Exception e){
- LOG.error("ERROR in writing temp-verify result.");
- LOG.error(e);
- }
- }
-
-}
diff --git a/src/java/edu/uchicago/cs/ucare/dmck/interceptor/InterceptionLayer.java b/src/java/edu/uchicago/cs/ucare/dmck/interceptor/InterceptionLayer.java
deleted file mode 100644
index 563c7729acf3bf1e940703bf24d9353dff3f53d3..0000000000000000000000000000000000000000
--- a/src/java/edu/uchicago/cs/ucare/dmck/interceptor/InterceptionLayer.java
+++ /dev/null
@@ -1,315 +0,0 @@
-package edu.uchicago.cs.ucare.dmck.interceptor;
-
-import java.io.*;
-import java.util.Hashtable;
-import org.apache.log4j.Logger;
-
-public class InterceptionLayer {
- private static final Logger LOG = Logger.getLogger(InterceptionLayer.class);
- private static String ipcDir = "/tmp/ipc";
-
- private static Hashtable respTable = new Hashtable();
-
- private static int latestSenderStep = 0;
- private static int latestReceiverStep = 0;
- private static String senderSequencer = "";
- private static String receiverSequencer = "";
-
- public static void interceptPaxosEvent(
- long sender, long recv, String verb, String payload, String usrval) {
- long eventId = getHashId(sender, recv, verb, payload);
-
- String filename = "cassPaxos-" + eventId + "-" + getTimestamp();
-
- String content = "";
- content += "sender=" + sender + "\n";
- content += "recv=" + recv + "\n";
- content += "verb=" + verb + "\n";
- content += "payload=" + payload + "\n";
- content += "usrval=" + usrval + "\n";
- content += "eventId=" + eventId + "\n";
-
- writePacketToFile(filename, content);
-
- LOG.info(
- "[DMCK] Intercept a Paxos message event. sender: "
- + sender
- + " recv: "
- + recv
- + " verb: "
- + verb
- + " payload: "
- + payload
- + "usrval:"
- + usrval
- + " filename: "
- + ipcDir
- + "/"
- + filename);
-
- commitFile(filename);
- waitForAck(filename);
- }
-
- public static void updateState(long sender, String type, int key, String ballot) {
- String filename = "cassUpdate-" + sender + "-" + getTimestamp() + "-" + type;
-
- String content = "";
- content += "sender=" + sender + "\n";
- content += "type=" + type + "\n";
- content += "key=" + key + "\n";
- content += "ballot=" + ballot + "\n";
-
- writePacketToFile(filename, content);
-
- LOG.info(
- "[DMCK] Intercept a state update in node: "
- + sender
- + " type: "
- + type
- + " key: "
- + key
- + " ballot: "
- + ballot
- + " filename: "
- + ipcDir
- + "/"
- + filename);
-
- commitFile(filename);
- }
-
- public static void updateState2(long sender, String type, String ballot, int key, int value) {
- String filename = "cassUpdate-" + sender + "-" + getTimestamp() + "-" + type;
-
- String content = "";
- content += "sender=" + sender + "\n";
- content += "type=" + type + "\n";
- content += "ballot=" + ballot + "\n";
- content += "key=" + key + "\n";
- content += "value=" + value + "\n";
-
- writePacketToFile(filename, content);
-
- LOG.info(
- "[DMCK] Intercept a state update in node: "
- + sender
- + " type: "
- + type
- + " ballot: "
- + ballot
- + " key: "
- + key
- + " value: "
- + value
- + " filename: "
- + ipcDir
- + "/"
- + filename);
-
- commitFile(filename);
- }
-
- public static void updateResponseState(long receiver, String type, boolean response) {
- String filename = "cassResponseUpdate-" + receiver + "-" + getTimestamp() + "-" + type;
-
- if (!respTable.containsKey(type)) {
- respTable.put(type, 0);
- }
- respTable.put(type, response ? respTable.get(type) + 1 : respTable.get(type));
-
- String content = "";
- content += "recv=" + receiver + "\n";
- content += "type=" + type + "\n";
- content += "response=" + respTable.get(type) + "\n";
-
- writePacketToFile(filename, content);
-
- LOG.info(
- "[DMCK] Intercept a state update in node: "
- + receiver
- + " type: "
- + type
- + " response: "
- + response
- + " filename: "
- + ipcDir
- + "/"
- + filename);
-
- commitFile(filename);
- }
-
- public static void updateWorkloadAccomplishement(int id, boolean isApplied) {
- String filename = "cassWorkloadUpdate-" + id;
-
- String content = "";
- content += "id=" + id + "\n";
- content += "isApplied=" + isApplied + "\n";
-
- writePacketToFile(filename, content);
- commitFile(filename);
- }
-
- private static void writePacketToFile(String filename, String content) {
- File file = new File(ipcDir + "/new/" + filename);
-
- try {
- file.createNewFile();
- FileWriter writer = new FileWriter(file);
- writer.write(content);
- writer.flush();
- writer.close();
- } catch (Exception e) {
- LOG.error("[DMCK] Error in writing state content to file.");
- }
- }
-
- private static void commitFile(String filename) {
- try {
- Runtime.getRuntime()
- .exec("mv " + ipcDir + "/new/" + filename + " " + ipcDir + "/send/" + filename);
- } catch (Exception e) {
- LOG.error("[DMCK] Error in committing file.");
- }
- }
-
- private static boolean waitForAck(String filename) {
- File f = new File(ipcDir + "/ack/" + filename);
- boolean dmckResponse = false;
-
- while (!f.exists()) {
- try {
- Thread.sleep(0, 100);
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- }
- }
-
- if (f.exists()) {
- try {
- BufferedReader br = new BufferedReader(new FileReader(f));
- for (String line; (line = br.readLine()) != null; ) {
- if (line.contains("execute=true")) {
- dmckResponse = true;
- }
- if (line.contains("dmckStep")) {
- int step = Integer.parseInt(line.split("=")[1]);
- while (step - latestSenderStep != 1) {
- try {
- // Sleep for 5ms if the senderSequencer does not match
- Thread.sleep(5);
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- }
- }
- }
- }
- } catch (Exception e) {
- LOG.error("[DMCK] Error in reading ack file.");
- }
- }
-
- // Remove DMCK enabling message
- try {
- Runtime.getRuntime().exec("rm " + ipcDir + "/ack/" + filename);
- } catch (Exception e) {
- LOG.error("[DMCK] Error in deleting ack file.");
- }
-
- return dmckResponse;
- }
-
- // Interception Layer to enable receiving message event
- public static void enableReceiving(int sender, int recv, String verb) {
- LOG.info("[DMCK] Receive message " + verb + " from node-" + sender + " at node-" + recv);
- boolean isExpectedFile = false;
- File path = new File(ipcDir + "/ack");
- String expectedFilename = "";
-
- while (!isExpectedFile) {
- if (path.listFiles().length > 0) {
- expectedFilename = "recv-" + recv + "-" + (latestReceiverStep + 1);
- for (File file : path.listFiles()) {
- if (file.getName().equals(expectedFilename)) {
- LOG.info("[DMCK] See expected filename=" + expectedFilename);
- try {
- BufferedReader br = new BufferedReader(new FileReader(file));
- boolean correctSender = false;
- boolean correctVerb = false;
- for (String line; (line = br.readLine()) != null; ) {
- if (line.contains("sendNode")) {
- int s = Integer.parseInt(line.split("=")[1]);
- correctSender = sender == s;
- LOG.info("s=" + s + " vs sender=" + sender + " --> " + correctSender);
- } else if (line.contains("verb")) {
- String v = line.split("=")[1];
- correctVerb = verb.equals(v);
- LOG.info("v=" + v + " vs verb=" + verb + " --> " + correctVerb);
- }
- }
- if (correctSender && correctVerb) {
- LOG.info("[DMCK] Confirmed expected file!");
- isExpectedFile = true;
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- } else {
- try {
- // Sleep for 5ms if the receiverSequencer does not match
- Thread.sleep(5);
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- }
- }
- }
- // Remove DMCK enabling message
- try {
- LOG.info("[DMCK] Remove file=" + path.getAbsolutePath() + "/" + expectedFilename);
- Runtime.getRuntime().exec("rm " + path.getAbsolutePath() + "/" + expectedFilename);
- } catch (Exception e) {
- LOG.error("[DMCK] Error in deleting ack file.");
- }
-
- LOG.info(
- "[DMCK] Enable receiving message " + verb + " from node-" + sender + " at node-" + recv);
- }
-
- // Sender Sequencer Counter
- public static void incrementSenderSequencer() {
- latestSenderStep++;
- senderSequencer += latestSenderStep + "-";
- LOG.info("[DMCK] Sender Sequencer=" + senderSequencer);
- }
-
- // Receiver Sequencer Counter
- public static void incrementReceiverSequencer() {
- latestReceiverStep++;
- receiverSequencer += latestReceiverStep + "-";
- LOG.info("[DMCK] Receiver Sequencer=" + receiverSequencer);
- }
-
- private static long getHashId(long sender, long recv, String verb, String payload) {
- long prime = 31;
- long hash = 1;
- hash = prime * hash + recv;
- hash = prime * hash + sender;
- hash = prime * hash + verb.hashCode();
- if (verb.equals("PAXOS_PROPOSE_RESPONSE") || verb.equals("PAXOS_PREPARE_RESPONSE")) {
- // int response = payload.toLowerCase().contains("response=true") ? 1 : 0;
- // hash = prime * hash + response;
- } else {
- hash = prime * hash + payload.hashCode();
- }
-
- return hash;
- }
-
- private static long getTimestamp() {
- return System.currentTimeMillis() % 100000;
- }
-}
diff --git a/src/java/edu/uchicago/cs/ucare/dmck/interceptor/Workload.java b/src/java/edu/uchicago/cs/ucare/dmck/interceptor/Workload.java
deleted file mode 100644
index 8dfa4efa5ff805622ad73f47a93cff380184bc61..0000000000000000000000000000000000000000
--- a/src/java/edu/uchicago/cs/ucare/dmck/interceptor/Workload.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package edu.uchicago.cs.ucare.dmck.interceptor;
-
-import org.apache.log4j.Logger;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.ResultSet;
-
-public class Workload implements Runnable {
-
- private static final Logger LOG = Logger.getLogger(Workload.class);
-
- private int id;
- private Cluster cluster;
- private String cql;
-
- private Session session;
- private boolean isFinished;
- private Thread t;
-
- private boolean isApplied = false;
-
-
- public Workload(int id, Cluster cluster, String cql) {
- this.id = id;
- this.cluster = cluster;
- this.cql = cql;
-
- this.session = null;
- this.isFinished = false;
- }
-
- public void reset() {
- if (session != null) {
- try {
- session.close();
- } catch (Exception e) {
- LOG.error("Failed to close Cassandra connection.");
- }
- }
- session = null;
- isFinished = false;
- }
-
- @Override
- public void run() {
- t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- session = cluster.connect("test");
- LOG.info("Executing: " + cql);
- ResultSet rs = session.execute(cql);
- isApplied = rs.wasApplied();
- LOG.info("Finished executing: " + cql);
- finish();
- }
-
- });
-
- t.start();
- }
-
- private void finish() {
- if (!isFinished) {
- notifyDMCK();
- isFinished = true;
- }
- }
-
- private void notifyDMCK() {
- LOG.info("Notify DMCK that it has finished a workload with isApplied=" + isApplied);
- InterceptionLayer.updateWorkloadAccomplishement(id, isApplied);
- }
-
- public boolean isFinished() {
- return isFinished;
- }
-
- public boolean isApplied() {
- return isApplied;
- }
-
-}
diff --git a/src/java/edu/uchicago/cs/ucare/dmck/interceptor/WorkloadExecutor.java b/src/java/edu/uchicago/cs/ucare/dmck/interceptor/WorkloadExecutor.java
deleted file mode 100644
index 1d5b6646041676dc39f5ee71fdaf51100dd17585..0000000000000000000000000000000000000000
--- a/src/java/edu/uchicago/cs/ucare/dmck/interceptor/WorkloadExecutor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package edu.uchicago.cs.ucare.dmck.interceptor;
-
-import java.util.Arrays;
-import java.net.InetSocketAddress;
-
-import org.apache.log4j.Logger;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.policies.RoundRobinPolicy;
-import com.datastax.driver.core.policies.WhiteListPolicy;
-
-public class WorkloadExecutor {
-
- private static final Logger LOG = Logger.getLogger(WorkloadExecutor.class);
-
- public static void main(String[] args) {
- if (args.length != 1) {
- LOG.error("Parameters are incorrect: ");
- System.exit(1);
- }
-
- String workload = args[0];
-
- if (workload.equals("cass-6023")) {
- cass6023();
- } else if (workload.equals("cass-6013")) {
- cass6013();
- }
- }
-
- private static void cass6013() {
- Cluster cluster1 = Cluster.builder().addContactPoint("127.0.0.1")
- .withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.1", 9042)))).build();
- cluster1.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
-
- Cluster cluster2 = Cluster.builder().addContactPoint("127.0.0.2")
- .withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.2", 9042)))).build();
- cluster2.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
-
- // initiate each workload
-
- Workload w1 = new Workload(1, cluster1, "UPDATE tests SET owner = 'user_2' WHERE name = 'testing' IF owner = 'user_1'");
- Workload w2 = new Workload(2, cluster2, "UPDATE tests SET value_1 = 'Y' WHERE name = 'testing' IF owner = 'user_1'");
-
- // run each workload; keep re-executing if the write does not succeed
- try {
- w1.run();
- Thread.sleep(1000);
- w2.run();
- waitForWorkload(w1);
- waitForWorkload(w2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- cluster1.close();
- cluster2.close();
- }
-
- private static void cass6023() {
- Cluster cluster1 = Cluster.builder().addContactPoint("127.0.0.1")
- .withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.1", 9042)))).build();
- cluster1.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
-
- Cluster cluster2 = Cluster.builder().addContactPoint("127.0.0.2")
- .withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.2", 9042)))).build();
- cluster2.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
-
- Cluster cluster3 = Cluster.builder().addContactPoint("127.0.0.3")
- .withLoadBalancingPolicy(new WhiteListPolicy(new RoundRobinPolicy(), Arrays.asList(new InetSocketAddress("127.0.0.3", 9042)))).build();
- cluster3.getConfiguration().getPoolingOptions().setPoolTimeoutMillis(120000);
-
-
- // initiate each workload
-
- Workload w1 = new Workload(1, cluster1, "UPDATE tests SET value_1 = 'A' WHERE name = 'testing' IF owner = 'user_1'");
- Workload w2 = new Workload(2, cluster2, "UPDATE tests SET value_1 = 'B', value_2 = 'B' WHERE name = 'testing' IF value_1 = 'A'");
- Workload w3 = new Workload(3, cluster3, "UPDATE tests SET value_3 = 'C' WHERE name = 'testing' IF owner = 'user_1'");
-
- // run each workload; keep re-executing if the write does not succeed
- try {
- w1.run();
- Thread.sleep(2000);
- w2.run();
- Thread.sleep(2000);
- w3.run();
- waitForWorkload(w1);
- waitForWorkload(w2);
- waitForWorkload(w3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- cluster1.close();
- cluster2.close();
- cluster3.close();
- }
-
- private static void waitForWorkload(Workload w) {
- while (!w.isFinished()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
-}
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 110552d424b1cc39461e817f4677fb9f685d8836..e9dd8e2d8125a2ba59c2c16a21c227eca6a77599 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -55,8 +55,6 @@ import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.thrift.cassandraConstants;
import org.apache.cassandra.utils.*;
-import edu.uchicago.cs.ucare.dmck.interceptor.InterceptionLayer;
-
import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
public class SystemKeyspace
@@ -805,11 +803,6 @@ public class SystemKeyspace
ByteBufferUtil.bytesToHex(promise.key),
promise.update.id()));
- // DMCK
- String sourceAddr = FBUtilities.getBroadcastAddress().getHostAddress();
- long sourceId = Long.parseLong(sourceAddr.substring(sourceAddr.length() - 1))-1;
- InterceptionLayer.updateState(sourceId, "inProgress", promise.key.hashCode(), promise.ballot.toString().substring(0, 24));
-// InterceptionLayer.updateState2(sourceId, "inProgress", promise.ballot.toString().substring(0, 24), promise.key.hashCode(), promise.update.asMap().hashCode());
}
public static void savePaxosProposal(Commit commit)
@@ -823,11 +816,6 @@ public class SystemKeyspace
ByteBufferUtil.bytesToHex(commit.key),
commit.update.id()));
- // DMCK
- String sourceAddr = FBUtilities.getBroadcastAddress().getHostAddress();
- long sourceId = Long.parseLong(sourceAddr.substring(sourceAddr.length() - 1))-1;
- InterceptionLayer.updateState(sourceId, "inProgress", commit.key.hashCode(), commit.ballot.toString().substring(0, 24));
-// InterceptionLayer.updateState2(sourceId, "inProgress", commit.ballot.toString().substring(0, 24), commit.key.hashCode(), commit.update.asMap().hashCode());
}
private static int paxosTtl(CFMetaData metadata)
@@ -851,12 +839,5 @@ public class SystemKeyspace
ByteBufferUtil.bytesToHex(commit.update.toBytes()),
ByteBufferUtil.bytesToHex(commit.key),
commit.update.id()));
-
- // DMCK
- String sourceAddr = FBUtilities.getBroadcastAddress().getHostAddress();
- long sourceId = Long.parseLong(sourceAddr.substring(sourceAddr.length() - 1))-1;
- InterceptionLayer.updateState2(sourceId, "inProgress", proposalAfterCommit ? inProgressBallot.toString().substring(0, 24) : commit.ballot.toString().substring(0, 24),
- commit.key.hashCode(), commit.update.asMap().hashCode());
- InterceptionLayer.updateState2(sourceId, "mostRecent", commit.ballot.toString().substring(0, 24), commit.key.hashCode(), commit.update.asMap().hashCode());
}
}
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
old mode 100644
new mode 100755
index d7645b6a89051b56e95ad7db4cf070fea94327dc..4f10760c0a3ca96746a7fdd187d066940ff45325
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.net;
+import org.apache.cassandra.testing.TestingClient;
+import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,5 +56,16 @@ public class MessageDeliveryTask implements Runnable
}
verbHandler.doVerb(message, id);
+ if (message.verb == MessagingService.Verb.PAXOS_PREPARE
+ || message.verb == MessagingService.Verb.PAXOS_PROPOSE
+ || message.verb == MessagingService.Verb.PAXOS_COMMIT
+ || message.verb == MessagingService.Verb.PAXOS_PREPARE_RESPONSE
+ || message.verb == MessagingService.Verb.PAXOS_PROPOSE_RESPONSE
+ || message.verb == MessagingService.Verb.PAXOS_COMMIT_RESPONSE) {
+ // BRC - Send ACK to server that the message is processed
+ String destAddr = FBUtilities.getBroadcastAddress().getHostAddress();
+ int recv = Integer.parseInt(destAddr.substring(destAddr.length() - 1)) - 1;
+ TestingClient.getInstance().writeToSocket(message, recv, FBUtilities.getBroadcastAddress());
+ }
}
}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
old mode 100644
new mode 100755
index 7963ac30377cc9b7187c0e37fad0d202553cb52e..9641f2fbe9949a167e374307a683d55d463990c6
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.net;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
-import edu.uchicago.cs.ucare.dmck.interceptor.InterceptionLayer;
import java.io.*;
import java.lang.management.ManagementFactory;
import java.net.*;
@@ -55,6 +54,7 @@ import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PrepareResponse;
+import org.apache.cassandra.testing.TestingClient;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
@@ -168,6 +168,7 @@ public final class MessagingService implements MessagingServiceMBean {
put(Verb.REPLICATION_FINISHED, Stage.MISC);
put(Verb.SNAPSHOT, Stage.MISC);
+ put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
@@ -194,6 +195,25 @@ public final class MessagingService implements MessagingServiceMBean {
}
};
+
+ private Thread testingThread;
+ private TestingClient tc; //later use: tc.writeToSocket(....);
+
+ //BURCU
+ public void connectToTestServer() {
+ //tc = new TestingClient("127.0.0.1", 4444);
+
+ tc = TestingClient.getInstance();
+
+ testingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ tc.connect();
+ }
+ });
+ testingThread.start();
+ }
+
/**
* Messages we receive in IncomingTcpConnection have a Verb that tells us what kind of message it
* is. Most of the time, this is enough to determine how to deserialize the message payload. The
@@ -357,10 +377,14 @@ public final class MessagingService implements MessagingServiceMBean {
}
public static MessagingService instance() {
+ //logger.info("---Returning instance");
return MSHandle.instance;
}
private MessagingService() {
+
+ connectToTestServer();
+
for (Verb verb : DROPPABLE_VERBS) {
droppedMessages.put(verb, new DroppedMessageMetrics(verb));
lastDroppedInternal.put(verb, 0);
@@ -611,182 +635,42 @@ public final class MessagingService implements MessagingServiceMBean {
sendOneWay(message, id, to);
}
- /**
- * Send a message to a given endpoint. This method adheres to the fire and forget style messaging.
- *
- * @param message messages to be sent.
- * @param to endpoint to which the message needs to be sent
- */
- /*
- public void sendOneWay(MessageOut message, int id, InetAddress to)
- {
- if (logger.isTraceEnabled())
- logger.trace(FBUtilities.getBroadcastAddress() + " sending " + message.verb + " to " + id + "@" + to);
-
- if (to.equals(FBUtilities.getBroadcastAddress()))
- logger.trace("Message-to-self {} going over MessagingService", message);
-
- // message sinks are a testing hook
- MessageOut processedMessage = SinkManager.processOutboundMessage(message, id, to);
- if (processedMessage == null)
- {
- return;
- }
-
- // get pooled connection (really, connection queue)
- OutboundTcpConnection connection = getConnection(to, processedMessage);
-
- // write it
- connection.enqueue(processedMessage, id);
- }
- */
-
// DMCK
public void sendOneWay(MessageOut message, int id, InetAddress to) {
+ //logger.info("SendOneWay: {}", message.verb);
if (message.verb == Verb.PAXOS_PREPARE
|| message.verb == Verb.PAXOS_PROPOSE
|| message.verb == Verb.PAXOS_COMMIT
|| message.verb == Verb.PAXOS_PREPARE_RESPONSE
|| message.verb == Verb.PAXOS_PROPOSE_RESPONSE
|| message.verb == Verb.PAXOS_COMMIT_RESPONSE) {
- logger.info(
- "[DMCK] interceptMessage: "
- + FBUtilities.getBroadcastAddress()
- + " sending "
- + message.verb
- + " to "
- + id
- + "@"
- + to);
- Thread interceptorThread = new Thread(new InterceptorThread(message, id, to));
- interceptorThread.start();
- } else {
- sendMessage(message, id, to);
- }
- }
- private static String transformVerbToString(Verb msgVerb) {
- String result = "";
- switch (msgVerb) {
- case PAXOS_PREPARE:
- result = "PAXOS_PREPARE";
- break;
- case PAXOS_PROPOSE:
- result = "PAXOS_PROPOSE";
- break;
- case PAXOS_COMMIT:
- result = "PAXOS_COMMIT";
- break;
- case PAXOS_PREPARE_RESPONSE:
- result = "PAXOS_PREPARE_RESPONSE";
- break;
- case PAXOS_PROPOSE_RESPONSE:
- result = "PAXOS_PROPOSE_RESPONSE";
- break;
- case PAXOS_COMMIT_RESPONSE:
- result = "PAXOS_COMMIT_RESPONSE";
- break;
- case READ:
- result = "READ";
- break;
- case READ_REQUEST_RESPONSE:
- result = "READ_REQUEST_RESPONSE";
- break;
- default:
- break;
- }
- return result;
- }
-
- private static class InterceptorThread implements Runnable {
-
- private MessageOut message;
- private int id;
- private InetAddress to;
+ // BRC - Removed DMCK code
- public InterceptorThread(MessageOut message, int id, InetAddress to) {
- this.message = message;
- this.id = id;
- this.to = to;
- }
-
- public void run() {
- String sourceAddr = FBUtilities.getBroadcastAddress().getHostAddress();
- String destinationAddr = to.getHostAddress();
- int sourceId = Integer.parseInt(sourceAddr.substring(sourceAddr.length() - 1)) - 1;
- int destinationId =
- Integer.parseInt(destinationAddr.substring(destinationAddr.length() - 1)) - 1;
-
- String verb = transformVerbToString(message.verb);
-
- HashMap payload = new HashMap();
- HashMap usrval = new HashMap();
- if (message.payload instanceof PrepareResponse) {
- PrepareResponse response = (PrepareResponse) message.payload;
- payload.put("response", response.promised);
- payload.put("inProgressCommitKey", response.inProgressCommit.key.hashCode());
- payload.put(
- "inProgressCommitBallot", response.inProgressCommit.ballot.toString().substring(0, 24));
- // payload.put("inProgressCommitUpdate",
- // response.inProgressCommit.update.asMap().hashCode());
- // usrval.put("inProgressCommitUpdate", response.inProgressCommit.update.asMap().hashCode());
- payload.put("mostRecentCommitKey", response.mostRecentCommit.key.hashCode());
- payload.put(
- "mostRecentCommitBallot", response.mostRecentCommit.ballot.toString().substring(0, 24));
- // payload.put("mostRecentCommitUpdate",
- // response.mostRecentCommit.update.asMap().hashCode());
- // usrval.put("mostRecentCommitUpdate", response.mostRecentCommit.update.asMap().hashCode());
- } else if (message.payload instanceof Commit) {
- Commit commit = (Commit) message.payload;
- payload.put("key", commit.key.hashCode());
- payload.put("ballot", commit.ballot.toString().substring(0, 24));
- // payload.put("update", commit.update.asMap().hashCode());
- // usrval.put("update", commit.update.asMap().hashCode());
- } else if (message.payload instanceof Boolean) {
- Boolean response = (Boolean) message.payload;
- payload.put("response", response);
+ // BRC - Write the message content to server, do not send it to destination yet
+ synchronized (tc) {
+ tc.writeToSocket(message, id, to);
}
-
- /*
- messageMap.put(eventId, message);
- messageIdMap.put(eventId, id);
- destinationMap.put(eventId, to);
- */
-
- InterceptionLayer.interceptPaxosEvent(
- sourceId, destinationId, verb, payload.toString(), usrval.toString());
- MessagingService.instance().sendMessage(message, id, to);
-
- // DMCK: LOG on when the real message is executed.
- logger.info("[DMCK] Message " + verb + " to node-" + destinationId + " is sent.");
- InterceptionLayer.incrementSenderSequencer();
- }
- }
-
- /*
- public void enableMessage(long eventId)
- {
- MessageOut message = messageMap.remove(eventId);
- int id = messageIdMap.remove(eventId);
- InetAddress to = destinationMap.remove(eventId);
-
- logger.info("[DMCK] enableMessage: " + FBUtilities.getBroadcastAddress() + " sending " + message.verb + " to " + id + "@" + to);
-
+ } else {
sendMessage(message, id, to);
+ }
}
- */
public void sendMessage(MessageOut message, int id, InetAddress to) {
+ logger.info("IN 1: " + message.verb);
+
if (logger.isTraceEnabled())
- logger.trace(
- FBUtilities.getBroadcastAddress() + " sending " + message.verb + " to " + id + "@" + to);
+ logger.trace(FBUtilities.getBroadcastAddress() + " sending " + message.verb + " to " + id + "@" + to);
if (to.equals(FBUtilities.getBroadcastAddress()))
logger.trace("Message-to-self {} going over MessagingService", message);
+ logger.info("IN 2: " + message.verb);
+
// message sinks are a testing hook
MessageOut processedMessage = SinkManager.processOutboundMessage(message, id, to);
if (processedMessage == null) {
+ logger.info("NULL");
return;
}
@@ -843,6 +727,8 @@ public final class MessagingService implements MessagingServiceMBean {
} catch (IOException e) {
throw new IOError(e);
}
+
+ tc.writeToSocket("END");
}
public void receive(MessageIn message, int id, long timestamp) {
@@ -850,21 +736,7 @@ public final class MessagingService implements MessagingServiceMBean {
TraceState state = Tracing.instance.initializeFromMessage(message);
if (state != null) state.trace("Message received from {}", message.from);
- // DMCK
- String sendAddr = message.from.getHostAddress();
- int sender = Integer.parseInt(sendAddr.substring(sendAddr.length() - 1)) - 1;
- String destAddr = FBUtilities.getBroadcastAddress().getHostAddress();
- int recv = Integer.parseInt(destAddr.substring(destAddr.length() - 1)) - 1;
-
- if (message.verb == Verb.PAXOS_PREPARE
- || message.verb == Verb.PAXOS_PROPOSE
- || message.verb == Verb.PAXOS_COMMIT
- || message.verb == Verb.PAXOS_PREPARE_RESPONSE
- || message.verb == Verb.PAXOS_PROPOSE_RESPONSE
- || message.verb == Verb.PAXOS_COMMIT_RESPONSE) {
- logger.info("[DMCK] Message {} received from {}", message.verb, message.from);
- InterceptionLayer.enableReceiving(sender, recv, transformVerbToString(message.verb));
- }
+ // BRC - Removed DMCK code
message = SinkManager.processInboundMessage(message, id);
if (message == null) return;
@@ -874,33 +746,6 @@ public final class MessagingService implements MessagingServiceMBean {
assert stage != null : "No stage for message type " + message.verb;
stage.execute(runnable, state);
-
- // DMCK
- if (message.verb == Verb.PAXOS_PREPARE
- || message.verb == Verb.PAXOS_PROPOSE
- || message.verb == Verb.PAXOS_COMMIT
- || message.verb == Verb.PAXOS_PREPARE_RESPONSE
- || message.verb == Verb.PAXOS_PROPOSE_RESPONSE
- || message.verb == Verb.PAXOS_COMMIT_RESPONSE) {
- InterceptionLayer.incrementReceiverSequencer();
- }
-
- if (message.verb == Verb.PAXOS_PREPARE_RESPONSE
- || message.verb == Verb.PAXOS_PROPOSE_RESPONSE) {
- Boolean resp = true;
- String verb = "";
- if (message.payload instanceof PrepareResponse) {
- PrepareResponse response = (PrepareResponse) message.payload;
- resp = response.promised;
- verb += "PAXOS_PREPARE_RESPONSE";
- } else if (message.payload instanceof Boolean) {
- Boolean response = (Boolean) message.payload;
- resp = response;
- verb += "PAXOS_PROPOSE_RESPONSE";
- }
- logger.info("[DMCK] Message {} received at {} response={}", verb, recv, resp);
- InterceptionLayer.updateResponseState(recv, verb, resp);
- }
}
public void setCallbackForTests(int messageId, CallbackInfo callback) {
@@ -960,6 +805,7 @@ public final class MessagingService implements MessagingServiceMBean {
return versions.get(endpoint) != null;
}
+
public void incrementDroppedMessages(Verb verb) {
assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
droppedMessages.get(verb).dropped.mark();
@@ -1122,4 +968,5 @@ public final class MessagingService implements MessagingServiceMBean {
}
return result;
}
+
}
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
old mode 100644
new mode 100755
index 20c70e719345ac8b17994b39a766406eb9077676..09df2bcdb9f45ad687e9a2fb1a26acc9012fcfcf
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.Mx4jTool;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.testing.TestingClient;
/**
* The CassandraDaemon
is an abstraction for a Cassandra daemon
* service, which defines not only a way to activate and deactivate it, but also
@@ -377,6 +378,21 @@ public class CassandraDaemon
setup();
}
+ Thread testingThread;
+ TestingClient tc;
+
+ //BRC - Instrumentation to connect to the explorer server
+ public void connectToTestServer() {
+ tc = TestingClient.getInstance();
+
+ testingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ tc.connect();
+ }
+ });
+ testingThread.start();
+ }
/**
* Start the Cassandra Daemon, assuming that it has already been
* initialized via {@link #init(String[])}
@@ -410,6 +426,7 @@ public class CassandraDaemon
logger.info("Cassandra shutting down...");
thriftServer.stop();
nativeServer.stop();
+ //testingThread.stop();
}
@@ -465,6 +482,7 @@ public class CassandraDaemon
System.exit(3);
}
+
}
/**
diff --git a/src/java/org/apache/cassandra/testing/PaxosEvent.java b/src/java/org/apache/cassandra/testing/PaxosEvent.java
new file mode 100755
index 0000000000000000000000000000000000000000..511ec0a94e0cab9243f83cb06d51c11333c47158
--- /dev/null
+++ b/src/java/org/apache/cassandra/testing/PaxosEvent.java
@@ -0,0 +1,153 @@
+package org.apache.cassandra.testing;
+
+import com.google.gson.Gson;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PrepareResponse;
+import org.apache.cassandra.utils.FBUtilities;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+
+public class PaxosEvent {
+
+ long sender;
+ long recv;
+ String verb;
+ String payload;
+ String usrval;
+
+ public static final String ACK_PAYLOAD = "ACK";
+
+ public PaxosEvent(long sender, long recv, String verb, String payload, String usrval) {
+ this.sender = sender;
+ this.recv = recv;
+ this.verb = verb;
+ this.payload = payload;
+ this.usrval = usrval;
+ }
+
+ public long getSender() {
+ return sender;
+ }
+
+ public long getRecv() {
+ return recv;
+ }
+
+ public String getVerb() {
+ return verb;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public String getUsrval() {
+ return usrval;
+ }
+
+ public static PaxosEvent createFromMessage(MessageOut message, InetAddress to) {
+ String sourceAddr = FBUtilities.getBroadcastAddress().getHostAddress();
+ String destinationAddr = to.getHostAddress();
+ int sourceId = Integer.parseInt(sourceAddr.substring(sourceAddr.length() - 1)) - 1;
+ int destinationId = Integer.parseInt(destinationAddr.substring(destinationAddr.length() - 1)) - 1;
+
+ String verb = transformVerbToString(message.verb);
+
+ HashMap payload = new HashMap();
+ HashMap usrval = new HashMap();
+ if (message.payload instanceof PrepareResponse) {
+ PrepareResponse response = (PrepareResponse) message.payload;
+ payload.put("response", response.promised);
+ payload.put("inProgressCommitKey", response.inProgressCommit.key.hashCode());
+ payload.put("inProgressCommitBallot", response.inProgressCommit.ballot.toString().substring(0, 24));
+ payload.put("mostRecentCommitKey", response.mostRecentCommit.key.hashCode());
+ payload.put("mostRecentCommitBallot", response.mostRecentCommit.ballot.toString().substring(0, 24));
+ } else if (message.payload instanceof Commit) {
+ Commit commit = (Commit) message.payload;
+ payload.put("key", commit.key.hashCode());
+ payload.put("ballot", commit.ballot.toString().substring(0, 24));
+ } else if (message.payload instanceof Boolean) {
+ Boolean response = (Boolean) message.payload;
+ payload.put("response", response);
+ }
+
+ return new PaxosEvent(sourceId, destinationId, verb, payload.toString(), usrval.toString());
+ }
+
+ private static String transformVerbToString(MessagingService.Verb msgVerb) {
+ String result = "";
+ switch (msgVerb) {
+ case PAXOS_PREPARE:
+ result = "PAXOS_PREPARE";
+ break;
+ case PAXOS_PROPOSE:
+ result = "PAXOS_PROPOSE";
+ break;
+ case PAXOS_COMMIT:
+ result = "PAXOS_COMMIT";
+ break;
+ case PAXOS_PREPARE_RESPONSE:
+ result = "PAXOS_PREPARE_RESPONSE";
+ break;
+ case PAXOS_PROPOSE_RESPONSE:
+ result = "PAXOS_PROPOSE_RESPONSE";
+ break;
+ case PAXOS_COMMIT_RESPONSE:
+ result = "PAXOS_COMMIT_RESPONSE";
+ break;
+ case READ:
+ result = "READ";
+ break;
+ case READ_REQUEST_RESPONSE:
+ result = "READ_REQUEST_RESPONSE";
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+
+ public static String toJsonStr(PaxosEvent obj) {
+ Gson gson = new Gson();
+ //System.out.println(gson.toJson(obj));
+ return gson.toJson(obj);
+ }
+
+ public static PaxosEvent toObject(String json) {
+ Gson gson = new Gson();
+ System.out.println(json);
+ return gson.fromJson(json, PaxosEvent.class);
+ }
+
+ public boolean isAckEvent() {
+ return payload.equals(ACK_PAYLOAD);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(obj == this) return true;
+
+ if(!(obj instanceof PaxosEvent)) return false;
+
+ PaxosEvent e = (PaxosEvent) obj;
+ return sender == e.sender &&
+ recv == e.recv &&
+ verb.equals(e.verb) &&
+ payload.equals(e.payload) &&
+ usrval.equals(e.usrval);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + (int)sender;
+ result = 31 * result + verb.hashCode();
+ result = 31 * result + (int)recv;
+ result = 31 * result + payload.hashCode();
+ result = 31 * result + usrval.hashCode();
+ return result;
+ }
+}
diff --git a/src/java/org/apache/cassandra/testing/Serializer.java b/src/java/org/apache/cassandra/testing/Serializer.java
new file mode 100755
index 0000000000000000000000000000000000000000..a479237e5f88e1ae860ce34e4cef15d2bd194560
--- /dev/null
+++ b/src/java/org/apache/cassandra/testing/Serializer.java
@@ -0,0 +1,18 @@
+package org.apache.cassandra.testing;
+
+import com.google.gson.Gson;
+import org.apache.cassandra.net.MessageOut;
+
+public class Serializer {
+ public String toJsonStr(MessageOut obj) {
+ Gson gson = new Gson();
+ System.out.println(gson.toJson(obj));
+ return gson.toJson(obj);
+ }
+
+ public MessageOut toObject(String json) {
+ Gson gson = new Gson();
+ System.out.println(json);
+ return gson.fromJson(json, MessageOut.class);
+ }
+}
diff --git a/src/java/org/apache/cassandra/testing/TestingClient.java b/src/java/org/apache/cassandra/testing/TestingClient.java
new file mode 100755
index 0000000000000000000000000000000000000000..2b96bd5130783347e273b463342cc86b800731fd
--- /dev/null
+++ b/src/java/org/apache/cassandra/testing/TestingClient.java
@@ -0,0 +1,128 @@
+package org.apache.cassandra.testing;
+
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestingClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestingClient.class);
+
+ private final String hostName;
+ private final int portNumber;
+
+ private Socket socket;
+ private PrintWriter out;
+ private BufferedReader in;
+
+ // onFlight matches IO communicated PaxosEvent to internal event MessageIn
+ private Map onFlight;
+
+ public static TestingClient getInstance() {
+ return instance;
+ }
+
+ private static TestingClient instance = new TestingClient("127.0.0.1", 4444);
+
+ private TestingClient(String hostName, int portNumber) {
+ this.hostName = hostName;
+ this.portNumber = portNumber;
+
+ this.onFlight = new HashMap();
+ }
+
+ // Cassandra node sending a message
+ public void writeToSocket(MessageOut message, int id, InetAddress to) {
+ PaxosEvent pe = PaxosEvent.createFromMessage(message, to);
+ SendingInfo si = new SendingInfo(message, id, to);
+ onFlight.put(pe, si);
+ out.println(PaxosEvent.toJsonStr(pe));
+ }
+
+ // Cassandra node received a message
+ public void writeToSocket(MessageIn message, int senderOfAck, InetAddress to) {
+ PaxosEvent pe = new PaxosEvent(senderOfAck, senderOfAck, message.verb.toString(), PaxosEvent.ACK_PAYLOAD, PaxosEvent.ACK_PAYLOAD);
+ out.println(PaxosEvent.toJsonStr(pe));
+ }
+
+ public void writeToSocket(String str) {
+ out.println(str);
+ }
+
+ public void connect() {
+ logger.info("Inside testing client");
+ try {
+ socket = new Socket(hostName, portNumber);
+ out = new PrintWriter(socket.getOutputStream(), true);
+ in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ runInLoop();
+ } catch (UnknownHostException e) {
+ logger.error("Don't know about host " + hostName);
+ System.exit(1);
+ } catch (IOException e) {
+ logger.error("Couldn't get I/O for the connection to " + hostName);
+ System.exit(1);
+ }
+ }
+
+ public void disconnect() {
+
+ }
+
+ public void runInLoop() {
+ String fromServer;
+
+ try {
+ while ((fromServer = in.readLine()) != null) {
+ if (fromServer.equals("END"))
+ break;
+
+ logger.info("---Received from server: " + fromServer);
+ PaxosEvent received = PaxosEvent.toObject(fromServer);
+
+ //if(!onFlight.containsKey(received)) {
+ // logger.info("Message not found");
+ //}
+ SendingInfo si = onFlight.remove(received);
+ //logger.info("---Removed: " + si.message.toString());
+ MessagingService.instance().sendMessage(si.message, si.id, si.to);
+ }
+ } catch (Exception e) {
+ logger.error("Error in receive loop", e);
+ System.exit(1);
+ }
+ }
+
+ public class SendingInfo {
+ MessageOut message;
+ int id;
+ InetAddress to;
+
+ public SendingInfo(MessageOut message, int id, InetAddress to) {
+ this.message = message;
+ this.id = id;
+ this.to = to;
+ }
+
+ public MessageOut getMessage() {
+ return message;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public InetAddress getTo() {
+ return to;
+ }
+ }
+}
\ No newline at end of file