Commit a4fcaa7c authored by Jonathan Mace's avatar Jonathan Mace
Browse files

Initial commit with dsdgen util, some queries, and some files from databricks spark-sql-perf

parents
target
.DS_Store
Usage:
To compile, invoke
mvn clean package
To generate data with spark
bin/spark-submit --master yarn --class edu.brown.cs.systems.tpcds.spark.SparkTPCDSDataGenerator ~/tpcds/tpcds-workload-gen/target/spark-workloadgen-4.0-jar-with-dependencies.jar
To run:
bin/spark-submit --master yarn --class edu.brown.cs.systems.tpcds.spark.SparkTPCDSWorkloadGenerator ~/tpcds/tpcds-workload-gen/target/spark-workloadgen-4.0-jar-with-dependencies.jar
Benchmark config:
tpcds {
scaleFactor = 1
dataLocation = "hdfs://127.0.0.1:9000/tpcds"
dataFormat = "parquet"
overwrite = true
partitionTables = false
useDoubleForDecimal = false
clusterByPartitionColumns = false
filterOutNullPartitionValues = false
}
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.brown.cs.systems.tpcds</groupId>
<artifactId>spark-workloadgen</artifactId>
<packaging>jar</packaging>
<version>4.0</version>
<name>TPC-DS - Spark Workload Generator</name>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.5</version>
</dependency>
<!-- <dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-sql-perf</artifactId>
<version>0.4.5-SNAPSHOT</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<additionalProjectnatures>
<projectnature>
org.scala-ide.sdt.core.scalanature
</projectnature>
</additionalProjectnatures>
<buildcommands>
<buildcommand>
org.scala-ide.sdt.core.scalabuilder
</buildcommand>
</buildcommands>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
<version>1.10</version>
<executions>
<execution>
<id>package</id>
<goals>
<goal>assemble</goal>
</goals>
</execution>
</executions>
<configuration>
<extraJvmArguments>-Xmx16m</extraJvmArguments>
<programs>
<program>
<mainClass>edu.brown.cs.systems.tpcds.Queries</mainClass>
<id>list-queries</id>
</program>
<program>
<mainClass>edu.brown.cs.systems.tpcds.Dsdgen</mainClass>
<id>dsdgen</id>
</program>
</programs>
</configuration>
</plugin>
</plugins>
</build>
</project>
package edu.brown.cs.systems.tpcds;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import com.google.common.io.Files;
/**
* Invokes the dsdgen platform-specific executable
*/
public class Dsdgen {
/**
* Attempts to extract the dsdgen executable for the current platform to a
* temporary directory. Returns the new, temp directory containing dsdgen.
*/
public static File get() {
// Determine platform
String platform = null;
if (SystemUtils.IS_OS_LINUX) platform = "linux";
else if (SystemUtils.IS_OS_MAC_OSX) platform = "osx";
if (platform == null) {
System.err.println("We don't have a precompiled dsdgen binary for platform " + System.getProperty("os.name"));
return null;
}
// Extract the resource to a temp file
File tempDir = Files.createTempDir();
//tempDir.deleteOnExit();
try {
String sourceDirectory = String.format("dsdgen%s%s", File.separator, platform);
JarUtils.copyDirectory(sourceDirectory, tempDir);
new File(tempDir, "dsdgen").setExecutable(true);
return tempDir;
} catch (IOException e) {
System.err.println("Failed to extract dsdgen");
e.printStackTrace();
return null;
}
}
public static BufferedReader invoke(String... args) {
File dsdgen = get();
if (dsdgen == null) {
System.err.println("Unable to invoke dsdgen");
return null;
}
String command = String.format("./dsdgen %s", StringUtils.join(args, " "));
try {
ProcessBuilder p = new ProcessBuilder(command.split(" "));
p.directory(dsdgen);
return new BufferedReader(new InputStreamReader(p.start().getInputStream()));
} catch (IOException e) {
System.err.println("IOException attempt to invoke dsdgen:\n"+command);
e.printStackTrace();
return null;
}
}
public static void main(String[] args) throws IOException {
BufferedReader r = invoke(args);
String line = null;
while ((line = r.readLine()) != null) {
System.out.println(line);
}
}
}
package edu.brown.cs.systems.tpcds;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Enumeration;
import java.util.List;
import java.util.Scanner;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import org.apache.commons.io.IOUtils;
import com.google.common.collect.Lists;
public class JarUtils {
/** List a directory that may exist within a jar file or on the classpath somewhere
* Only valid for resources within this project */
public static List<String> listDir(String dir) throws IOException {
// Figure out whether we're loading from a JAR or from file
File jarFile = new File(JarUtils.class.getProtectionDomain().getCodeSource().getLocation().getPath());
List<String> contents = Lists.newArrayList();
if(jarFile.isFile()) {
JarFile jar = new JarFile(jarFile);
Enumeration<JarEntry> entries = jar.entries(); //gives ALL entries in jar
while(entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
String name = entry.getName();
if (name.startsWith(dir + File.separator) && !entry.isDirectory()) {
contents.add(name.substring(dir.length()+1));
}
}
jar.close();
} else {
URL benchmarkFolder = Thread.currentThread().getContextClassLoader().getResource(dir);
if (benchmarkFolder != null) {
try {
final File f = new File(benchmarkFolder.toURI());
for (File resource : f.listFiles()) {
contents.add(resource.getName());
}
} catch (URISyntaxException ex) {
}
}
}
return contents;
}
/** Read a file that may exist within a jar file or on the classpath somewhere
* Only valid for resources within this project */
public static String readFile(String fileName) {
return new Scanner(Queries.class.getClassLoader().getResourceAsStream(fileName)).useDelimiter("\\Z").next();
}
public static void copyDirectory(String resourceDirName, File destDir) throws IOException {
for (String file : JarUtils.listDir(resourceDirName)) {
String resourceFileName = File.separator + resourceDirName + File.separator + file;
File destFile = new File(destDir, file);
destFile.createNewFile();
destFile.deleteOnExit();
copyResource(resourceFileName, destFile);
}
}
public static void copyResource(String resourceFileName, File destFile) throws IOException {
OutputStream out = new FileOutputStream(destFile);
InputStream in = Dsdgen.class.getResourceAsStream(resourceFileName);
try {
IOUtils.copy(in, out);
} finally {
in.close();
out.close();
}
}
}
package edu.brown.cs.systems.tpcds;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Lists;
public class Queries {
/**
* Multiple different people have implemented TPC-DS queries. This function
* returns the names of the variants available in this package. Most queries
* don't actually work. View the documentation to see which queries do work
* and which ones to use.
*/
public static List<String> availableBenchmarks() {
try {
return JarUtils.listDir("queries");
} catch (IOException e) {
return Lists.<String>newArrayList();
}
}
/** Lists the names of queries in a named benchmark
* @throws IOException */
public static List<String> queriesInBenchmark(String benchmark) throws IOException {
List<String> files = JarUtils.listDir(String.format("queries%s%s", File.separator, benchmark));
files.remove("README");
return files;
}
/** Returns the benchmark's README text */
public static String description(String benchmark) {
String fileName = String.format("queries%s%s%sREADME", File.separator, benchmark, File.separator);
return JarUtils.readFile(fileName);
}
/** Strips comment line and trailing semicolon from query */
private static String formatQuery(String query) {
String[] lines = query.split("\n");
List<String> newLines = Lists.newArrayListWithExpectedSize(lines.length);
for (String line : lines) {
// Ignore comment lines
line = line.trim();
if (line.startsWith("--")) {
continue;
}
if (line.equals("exit;")) {
continue;
}
while (line.endsWith(";")) {
line = line.substring(0, line.length() - 1);
}
if (!line.isEmpty()) {
newLines.add(line);
}
}
return StringUtils.join(newLines, "\n");
}
/**
* Finds a named query on the classpath, loads it, and strips out comments
* and trailing semicolons
*/
public static String loadQuery(String benchmark, String queryName) throws FileNotFoundException {
String fileName = String.format("queries%s%s%s%s", File.separator, benchmark, File.separator, queryName);
return formatQuery(JarUtils.readFile(fileName));
}
public static void main(String[] args) throws IOException {
for (String benchmark : availableBenchmarks()) {
System.out.println(benchmark);
System.out.println(description(benchmark));
for (String query : queriesInBenchmark(benchmark)) {
System.out.println(query);
}
}
}
}
package edu.brown.cs.systems.tpcds.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import com.databricks.spark.sql.perf.tpcds.Tables;
import edu.brown.cs.systems.tpcds.Dsdgen;
public class SparkTPCDSDataGenerator {
public static void generateData() {
generateData(TPCDSSettings.dataLocation(), TPCDSSettings.dataFormat(),
TPCDSSettings.scaleFactor());
}
public static void generateData(String location, String format, int scaleFactor) {
generateData(location, format, scaleFactor, TPCDSSettings.overwrite(),
TPCDSSettings.partitionTables(), TPCDSSettings.useDoubleForDecimal(),
TPCDSSettings.clusterByPartitionColumns(), TPCDSSettings.filterOutNullPartitionValues());
}
public static void generateData(String location, String format, int scaleFactor,
boolean overwrite, boolean partitionTables, boolean useDoubleForDecimal, boolean clusterByPartitionColumns,
boolean filterOutNullPartitionValues) {
SparkConf conf = new SparkConf().setAppName("TPC-DS generateData");
SparkContext sc = new SparkContext(conf);
SQLContext sql = new SQLContext(sc);
Tables tables = new Tables(sql, scaleFactor);
tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, clusterByPartitionColumns,
filterOutNullPartitionValues, "");
sc.stop();
}
public static void main(String[] args) {
generateData();
}
}
package edu.brown.cs.systems.tpcds.spark;
import java.io.FileNotFoundException;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import edu.brown.cs.systems.tpcds.Queries;
import com.databricks.spark.sql.perf.tpcds.Tables;
public class SparkTPCDSWorkloadGenerator {
public static final Logger log = LoggerFactory.getLogger(SparkTPCDSWorkloadGenerator.class);
/** Load tpcds tables into memory using default configuration.
* Creates the spark context and sql context
* @return SQL context with tables loaded
*/
public static SQLContext spinUpWithDefaults() {
SparkConf c = new SparkConf().setAppName("SparkTPCDSWorkloadGenerator");
SparkContext sc = new SparkContext(c);
SQLContext sql = new SQLContext(sc);
loadExistingTablesIntoMemory(sql, TPCDSSettings.dataLocation(), TPCDSSettings.dataFormat());
return sql;
}
/** Loads tpcds tables into memory on Spark from a source location, eg from HDFS.
*
* @param sql A SQLContext to load the tables
* @param dataLocation The location of the TPC-DS data, eg "/Users/jon/tpcds/data", "hdfs://127.0.0.1:9000/tpcds/data", etc.
* @param dataFormat The format of the generated data, eg "text", "parquet", etc.
*/
public static void loadExistingTablesIntoMemory(SQLContext sql, String dataLocation, String dataFormat) {
/* Tables constructor takes dsdgenDir and scaleFactor, but they are not used when loading existing data.
* So we just use default values for these instead of adding them as confusing and unused parameters */
Tables tables = new Tables(sql, TPCDSSettings.scaleFactor());
tables.createTemporaryTables(dataLocation, dataFormat, "");
}
public static void main(String[] args) throws FileNotFoundException {
System.out.println("Starting SparkTPCDSWorkloadGenerator");
SQLContext sql = spinUpWithDefaults();
String benchmark = "impala-tpcds-modified-queries";
String query = "q19.sql";
String q = Queries.loadQuery(benchmark, query);
System.out.printf("Running query %s/%s\n", benchmark, query);
System.out.println(q);
Row[] rows = sql.sql(q).collect();
for (Row r : rows) {
System.out.println(r);
}
// for (String queryName : Queries.all()) {
// try {
// String queryText = Queries.loadQuery(queryName);
// queries.put(queryName, queryText);
// System.out.println("Executing " + queryName);
// sql.sql(queryText);
// successful.add(queryName);
// System.out.println(queryName + " succeeded.");
// } catch (Throwable t) {
// failures.add(queryName);
// reasons.put(queryName, t);
// System.out.println(queryName + " failed.");
// }
// }
//
// System.out.println("Failure reasons:");
// for (String queryName : reasons.keySet()) {
// System.out.println(queryName);
// reasons.get(queryName).printStackTrace();
// System.out.println();
// System.out.println();
// }
// System.out.println();
// System.out.println();
// System.out.println();
// System.out.println("Successful:");
// System.out.println(StringUtils.join(successful, "\n"));
// System.out.println();
// System.out.println();
// System.out.println();
// System.out.println("Failed:");
// System.out.println(StringUtils.join(failures, "\n"));
// System.out.println();
//
// System.out.printf("%d successful, %d failures\n", successful.size(), failures.size());
//
//// loadExistingTablesIntoMemory(sql, dataLocation, dataFormat);
//
// TPCDS tpcds = new TPCDS (sql);
// tpcds.tpcds1_4QueriesMap().get("q7");
//
// Seq<Benchmarkable> allQueries = tpcds.allQueries();
// Iterator<Benchmarkable> it = allQueries.iterator();