Commit c44fd11f authored by Jonathan Mace's avatar Jonathan Mace
Browse files

Add some stuff to workload gen client

parent 8d18e40f
......@@ -12,13 +12,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.databricks.spark.sql.perf.tpcds.Tables;
import edu.brown.cs.systems.baggage.Baggage;
import edu.brown.cs.systems.retro.Netro;
import edu.brown.cs.systems.retro.Retro;
import edu.brown.cs.systems.tpcds.QueryUtils;
import edu.brown.cs.systems.tpcds.QueryUtils.Benchmark;
import edu.brown.cs.systems.tpcds.QueryUtils.Benchmark.Query;
import edu.brown.cs.systems.xtrace.XTrace;
import edu.brown.cs.systems.xtrace.XTraceBaggageInterface;
import edu.brown.cs.systems.xtrace.logging.XTraceLogger;
public class SparkTPCDSWorkloadGenerator {
public static final XTraceLogger xtrace = XTrace.getLogger(SparkTPCDSWorkloadGenerator.class);
public static final Logger log = LoggerFactory.getLogger(SparkTPCDSWorkloadGenerator.class);
public final String name;
......@@ -89,18 +95,42 @@ public class SparkTPCDSWorkloadGenerator {
System.out.printf("Running query %s on %s dataset %s\n", q, settings.dataFormat, settings.dataLocation);
SparkTPCDSWorkloadGenerator gen = spinUp("SparkTPCDSWorkloadGenerator", settings);
long postLoad = System.currentTimeMillis();
// Run the query
Row[] rows = gen.sqlContext.sql(q.queryText).collect();
// Print the output rows
for (Row r : rows) {
System.out.println(r);
int numQueries = 3;
Long taskId = null;
for (int i = 0; i < numQueries; i++) {
long postLoad = System.currentTimeMillis();
Baggage.discard();
if (taskId == null) {
XTrace.startTask(true);
taskId = XTraceBaggageInterface.getTaskID();
} else {
// Set a higher task ID so that previous task gets all events if there's a conflict -- this is useful for checking where instrumentation is wrong
taskId += 10;
XTrace.setTask(taskId, 0L);
}
Retro.setTenant(i);
Retro.enableInBaggageCounting(true);
String qNumStr = splits[1].replace(".sql", "").replace("q","");
int qNum = Integer.parseInt(qNumStr);
System.out.printf("Setting baggage to use query %d", qNum);
Netro.set("query", String.valueOf(qNum));
xtrace.tag("Running TPCDS query", q.queryName, "TPCDS");
// Run the query
Row[] rows = gen.sqlContext.sql(q.queryText).collect();
// Print the output rows
for (Row r : rows) {
System.out.println(r);
}
long postQ = System.currentTimeMillis();
System.out.printf("Load time: %d, Query time: %d\n", postLoad-preLoad, postQ-postLoad);
xtrace.log(String.format("Load time: %d, Query time: %d", postLoad-preLoad, postQ-postLoad));
}
long postQ = System.currentTimeMillis();
System.out.printf("Load time: %d, Query time: %d\n", postLoad-preLoad, postQ-postLoad);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment