Commit a4d09780 authored by Jonathan Mace's avatar Jonathan Mace
Browse files

Minor changes to workload generator so I knwo what's going on

parent c44fd11f
tpcds {
scaleFactor = 1
dataLocation = "hdfs://127.0.0.1:9000/tpcds"
scaleFactor = 100
dataLocation = "hdfs://namenode:9000/tpcds100"
dataFormat = "parquet"
overwrite = false
partitionTables = false
partitionTables = true
useDoubleForDecimal = false
clusterByPartitionColumns = false
filterOutNullPartitionValues = false
......
......@@ -32,7 +32,7 @@ class Tables(sqlContext: SQLContext, scaleFactor: Int) extends Serializable {
case class Table(name: String, partitionColumns: Seq[String], fields: StructField*) {
val schema = StructType(fields)
val partitions = if (partitionColumns.isEmpty) 1 else 100
val partitions = if (partitionColumns.isEmpty) 1 else 20
def nonPartitioned: Table = {
Table(name, Nil, fields : _*)
......@@ -113,13 +113,17 @@ class Tables(sqlContext: SQLContext, scaleFactor: Int) extends Serializable {
overwrite: Boolean,
clusterByPartitionColumns: Boolean,
filterOutNullPartitionValues: Boolean): Unit = {
println(s"Begin genData for table $name in database to $location")
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore
val data = df(format != "text")
println(s"Got dataframe for format $format")
val tempTableName = s"${name}_text"
data.registerTempTable(tempTableName)
println(s"Registered temp table $tempTableName")
val writer = if (partitionColumns.nonEmpty) {
println(s"Getting writer for nonEmpty partitionColumns")
if (clusterByPartitionColumns) {
val columnString = data.schema.fields.map { field =>
field.name
......@@ -150,16 +154,21 @@ class Tables(sqlContext: SQLContext, scaleFactor: Int) extends Serializable {
}
} else {
// If the table is not partitioned, coalesce the data to a single file.
println("Coalesce to a single file")
data.coalesce(1).write
}
println("A")
writer.format(format).mode(mode)
if (partitionColumns.nonEmpty) {
println("B")
writer.partitionBy(partitionColumns : _*)
}
println(s"Generating table $name in database to $location with save mode $mode.")
log.info(s"Generating table $name in database to $location with save mode $mode.")
writer.save(location)
println("Saved")
sqlContext.dropTempTable(tempTableName)
println("Dropped temp table")
}
def createExternalTable(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment