Commit e7e27e40 authored by Jonathan Mace's avatar Jonathan Mace
Browse files

Remove dependencies to tracing framework

parent d3d8e268
......@@ -11,23 +11,15 @@ import org.apache.spark.SparkContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.databricks.spark.sql.perf.tpcds.Tables;
import edu.brown.cs.systems.baggage.Baggage;
import edu.brown.cs.systems.retro.Netro;
import edu.brown.cs.systems.retro.Retro;
import edu.brown.cs.systems.tpcds.QueryUtils;
import edu.brown.cs.systems.tpcds.QueryUtils.Benchmark;
import edu.brown.cs.systems.tpcds.QueryUtils.Benchmark.Query;
import edu.brown.cs.systems.xtrace.XTrace;
import edu.brown.cs.systems.xtrace.XTraceBaggageInterface;
import edu.brown.cs.systems.xtrace.logging.XTraceLogger;
public class SparkTPCDSBatchGenerator {
public static final XTraceLogger xtrace = XTrace.getLogger(SparkTPCDSBatchGenerator.class);
public static final Logger log = LoggerFactory.getLogger(SparkTPCDSBatchGenerator.class);
public final String name;
......@@ -120,22 +112,7 @@ public class SparkTPCDSBatchGenerator {
int iteration = 1;
Long taskId = null;
for (Query query : allQueries) {
Baggage.discard();
if (taskId == null) {
XTrace.startTask(true);
taskId = XTraceBaggageInterface.getTaskID();
} else {
// Set a higher task ID so that previous task gets all events if there's a conflict -- this is useful for checking where instrumentation is wrong
taskId += 10;
XTrace.setTask(taskId, 0L);
}
Retro.setTenant(3);
Retro.enableInBaggageCounting(true);
Netro.set("query", query.queryName);
xtrace.tag("Running TPCDS query", query.queryName, "TPCDS");
for (Query query : allQueries) {
// Run the query
long begin = System.currentTimeMillis();
long end;
......@@ -147,7 +124,6 @@ public class SparkTPCDSBatchGenerator {
end = System.currentTimeMillis();
successful = true;
System.out.printf("%s completed successfully in %.1f seconds\n", query, (end-begin) / 1000.0);
xtrace.log(String.format("Completed in %.1f seconds", (end-begin)/1000.0), "Baggage", edu.brown.cs.systems.tracingplane.transit_layer.Baggage.take());
} catch (Exception e) {
end = System.currentTimeMillis();
System.out.println("Query " + query + " failed due to " + e.getClass().getSimpleName() + ": " + e.getMessage());
......
......@@ -8,24 +8,16 @@ import org.apache.spark.SparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.hive.HiveContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.databricks.spark.sql.perf.tpcds.Tables;
import edu.brown.cs.systems.baggage.Baggage;
import edu.brown.cs.systems.retro.Netro;
import edu.brown.cs.systems.retro.Retro;
import edu.brown.cs.systems.tpcds.QueryUtils;
import edu.brown.cs.systems.tpcds.QueryUtils.Benchmark;
import edu.brown.cs.systems.tpcds.QueryUtils.Benchmark.Query;
import edu.brown.cs.systems.xtrace.XTrace;
import edu.brown.cs.systems.xtrace.XTraceBaggageInterface;
import edu.brown.cs.systems.xtrace.logging.XTraceLogger;
public class SparkTPCDSWorkloadGenerator {
public static final XTraceLogger xtrace = XTrace.getLogger(SparkTPCDSWorkloadGenerator.class);
public static final Logger log = LoggerFactory.getLogger(SparkTPCDSWorkloadGenerator.class);
public final String name;
......@@ -101,23 +93,6 @@ public class SparkTPCDSWorkloadGenerator {
Long taskId = null;
for (int i = 0; i < numQueries; i++) {
long postLoad = System.currentTimeMillis();
Baggage.discard();
if (taskId == null) {
XTrace.startTask(true);
taskId = XTraceBaggageInterface.getTaskID();
} else {
// Set a higher task ID so that previous task gets all events if there's a conflict -- this is useful for checking where instrumentation is wrong
taskId += 10;
XTrace.setTask(taskId, 0L);
}
Retro.setTenant(i);
Retro.enableInBaggageCounting(true);
String qNumStr = splits[1].replace(".sql", "").replace("q","");
System.out.printf("Setting baggage to use query %s", qNumStr);
Netro.set("query", qNumStr);
xtrace.tag("Running TPCDS query", q.queryName, "TPCDS");
// Run the query
Row[] rows = gen.sqlContext.sql(q.queryText).collect();
......@@ -129,7 +104,6 @@ public class SparkTPCDSWorkloadGenerator {
long postQ = System.currentTimeMillis();
System.out.printf("Load time: %d, Query time: %d\n", postLoad-preLoad, postQ-postLoad);
xtrace.log(String.format("Load time: %d, Query time: %d", postLoad-preLoad, postQ-postLoad), "Baggage", edu.brown.cs.systems.tracingplane.transit_layer.Baggage.take());
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment