diff --git a/runner/parquet/convert_to_parquet.py b/runner/parquet/convert_to_parquet.py new file mode 100644 index 0000000..29f0d0b --- /dev/null +++ b/runner/parquet/convert_to_parquet.py @@ -0,0 +1,8 @@ +from pyspark import SparkContext +from pyspark.sql import HiveContext + +sc = SparkContext(appName = "Parquet Converter") +hiveContext = HiveContext(sc) +hiveContext.table("rankings").saveAsParquetFile("/user/spark/benchmark/rankings-parquet") +hiveContext.table("uservisits").saveAsParquetFile("/user/spark/benchmark/uservisits-parquet") +hiveContext.table("documents").saveAsParquetFile("/user/spark/benchmark/documents-parquet") diff --git a/runner/prepare_benchmark.py b/runner/prepare_benchmark.py index b71bb75..e5c29b9 100644 --- a/runner/prepare_benchmark.py +++ b/runner/prepare_benchmark.py @@ -44,10 +44,12 @@ def parse_args(): parser.add_option("-m", "--impala", action="store_true", default=False, help="Whether to include Impala") - parser.add_option("-s", "--shark", action="store_true", default=False, - help="Whether to include Shark") + parser.add_option("-s", "--spark", action="store_true", default=False, + help="Whether to include Spark SQL") parser.add_option("-r", "--redshift", action="store_true", default=False, help="Whether to include Redshift") + parser.add_option("--shark", action="store_true", default=False, + help="Whether to include Shark") parser.add_option("--hive", action="store_true", default=False, help="Whether to include Hive") parser.add_option("--hive-tez", action="store_true", default=False, @@ -57,10 +59,14 @@ def parse_args(): parser.add_option("-a", "--impala-host", help="Hostname of Impala state store node") - parser.add_option("-b", "--shark-host", - help="Hostname of Shark master node") + parser.add_option("--impala-data-node", + help="Hostname of an Impala data node. Required for Impala conversion to Parquet") + parser.add_option("-b", "--spark-host", + help="Hostname of Spark master node") parser.add_option("-c", "--redshift-host", help="Hostname of Redshift ODBC endpoint") + parser.add_option("--shark-host", + help="Hostname of Shark master node") parser.add_option("--hive-host", help="Hostname of Hive master node") parser.add_option("--hive-slaves", @@ -68,7 +74,9 @@ def parse_args(): parser.add_option("-x", "--impala-identity-file", help="SSH private key file to use for logging into Impala node") - parser.add_option("-y", "--shark-identity-file", + parser.add_option("-y", "--spark-identity-file", + help="SSH private key file to use for logging into Spark node") + parser.add_option("--shark-identity-file", help="SSH private key file to use for logging into Shark node") parser.add_option("--hive-identity-file", help="SSH private key file to use for logging into Hive node") @@ -84,6 +92,8 @@ def parse_args(): parser.add_option("-f", "--file-format", default="sequence-snappy", help="File format to copy (text, text-deflate, "\ "sequence, or sequence-snappy)") + parser.add_option("--executor-memory", type="string", default="24G", + help="How much executor memory spark sql nodes should use") parser.add_option("-d", "--aws-key-id", help="Access key ID for AWS") @@ -92,10 +102,12 @@ def parse_args(): parser.add_option("--skip-s3-import", action="store_true", default=False, help="Assumes s3 data is already loaded") + parser.add_option("--parquet", action="store_true", default=False, + help="Convert benchmark data to parquet") (opts, args) = parser.parse_args() - if not (opts.impala or opts.shark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh): + if not (opts.impala or opts.spark or opts.shark or opts.redshift or opts.hive or opts.hive_tez or opts.hive_cdh): parser.print_help() sys.exit(1) @@ -112,6 +124,14 @@ def parse_args(): print >> stderr, "Impala requires identity file, hostname, and AWS creds" sys.exit(1) + if opts.spark and (opts.spark_identity_file is None or + opts.spark_host is None or + opts.aws_key_id is None or + opts.aws_key is None): + print >> stderr, \ + "Spark SQL requires identity file, shark hostname, and AWS credentials" + sys.exit(1) + if opts.shark and (opts.shark_identity_file is None or opts.shark_host is None or opts.aws_key_id is None or @@ -174,6 +194,131 @@ def add_aws_credentials(remote_host, remote_user, identity_file, out.close() scp_to(remote_host, identity_file, remote_user, local_xml, remote_xml_file) +def prepare_spark_dataset(opts): + def ssh_spark(command): + command = "source /root/.bash_profile; %s" % command + ssh(opts.spark_host, "root", opts.spark_identity_file, command) + + if not opts.skip_s3_import: + print "=== IMPORTING BENCHMARK DATA FROM S3 ===" + try: + ssh_spark("/root/ephemeral-hdfs/bin/hadoop fs -mkdir /user/spark/benchmark") + except Exception: + pass # Folder may already exist + + add_aws_credentials(opts.spark_host, "root", opts.spark_identity_file, + "/root/ephemeral-hdfs/conf/core-site.xml", opts.aws_key_id, opts.aws_key) + + ssh_spark("/root/ephemeral-hdfs/bin/start-mapred.sh") + + ssh_spark( + "/root/ephemeral-hdfs/bin/hadoop distcp " \ + "s3n://big-data-benchmark/pavlo/%s/%s/rankings/ " \ + "/user/spark/benchmark/rankings/" % (opts.file_format, opts.data_prefix)) + + ssh_spark( + "/root/ephemeral-hdfs/bin/hadoop distcp " \ + "s3n://big-data-benchmark/pavlo/%s/%s/uservisits/ " \ + "/user/spark/benchmark/uservisits/" % ( + opts.file_format, opts.data_prefix)) + + ssh_spark( + "/root/ephemeral-hdfs/bin/hadoop distcp " \ + "s3n://big-data-benchmark/pavlo/%s/%s/crawl/ " \ + "/user/spark/benchmark/crawl/" % (opts.file_format, opts.data_prefix)) + + print "=== CREATING HIVE TABLES FOR BENCHMARK ===" + hive_site = ''' + + + fs.default.name + hdfs://NAMENODE:9000 + + + fs.defaultFS + hdfs://NAMENODE:9000 + + + mapred.job.tracker + NONE + + + mapreduce.framework.name + NONE + + + '''.replace("NAMENODE", opts.spark_host).replace('\n', '') + + ssh_spark('echo "%s" > ~/ephemeral-hdfs/conf/hive-site.xml' % hive_site) + + scp_to(opts.spark_host, opts.spark_identity_file, "root", "udf/url_count.py", + "/root/url_count.py") + ssh_spark("/root/spark-ec2/copy-dir /root/url_count.py") + + ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s" % opts.executor_memory) + + #TODO: Should keep checking to see if the JDBC server has started yet + print "Sleeping for 30 seconds so the jdbc server can start" + time.sleep(30) + + def beeline(query): + ssh_spark("/root/spark/bin/beeline -u jdbc:hive2://localhost:10000 -n root -e \"%s\"" % query) + + beeline("DROP TABLE IF EXISTS rankings") + beeline( + "CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \ + "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/rankings\\\";") + + beeline("DROP TABLE IF EXISTS uservisits;") + beeline( + "CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \ + "visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \ + "languageCode STRING,searchWord STRING,duration INT ) " \ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS TEXTFILE LOCATION \\\"/user/spark/benchmark/uservisits\\\";") + + beeline("DROP TABLE IF EXISTS documents;") + beeline( + "CREATE EXTERNAL TABLE documents (line STRING) STORED AS TEXTFILE " \ + "LOCATION \\\"/user/spark/benchmark/crawl\\\";") + + if opts.parquet: + ssh_spark("/root/spark/sbin/stop-thriftserver.sh") + + print "Sleeping for 30 seconds so the jdbc server can stop" + time.sleep(30) + + scp_to(opts.spark_host, opts.spark_identity_file, "root", "parquet/convert_to_parquet.py", + "/root/convert_to_parquet.py") + ssh_spark("/root/spark/bin/spark-submit --master spark://%s:7077 /root/convert_to_parquet.py" % opts.spark_host) + + ssh_spark("/root/spark/sbin/start-thriftserver.sh --executor-memory %s" % opts.executor_memory) + + print "Sleeping for 30 seconds so the jdbc server can start" + time.sleep(30) + + beeline("DROP TABLE IF EXISTS rankings") + beeline( + "CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \ + "avgDuration INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS PARQUET LOCATION \\\"/user/spark/benchmark/rankings-parquet\\\";") + + beeline("DROP TABLE IF EXISTS uservisits;") + beeline( + "CREATE EXTERNAL TABLE uservisits (sourceIP STRING,destURL STRING," \ + "visitDate STRING,adRevenue DOUBLE,userAgent STRING,countryCode STRING," \ + "languageCode STRING,searchWord STRING,duration INT ) " \ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY \\\",\\\" " \ + "STORED AS PARQUET LOCATION \\\"/user/spark/benchmark/uservisits-parquet\\\";") + + beeline("DROP TABLE IF EXISTS documents;") + beeline( + "CREATE EXTERNAL TABLE documents (line STRING) STORED AS PARQUET " \ + "LOCATION \\\"/user/spark/benchmark/documents-parquet\\\";") + + print "=== FINISHED CREATING BENCHMARK DATA ===" + def prepare_shark_dataset(opts): def ssh_shark(command): command = "source /root/.bash_profile; %s" % command @@ -182,34 +327,34 @@ def ssh_shark(command): if not opts.skip_s3_import: print "=== IMPORTING BENCHMARK DATA FROM S3 ===" try: - ssh_shark("/root/ephemeral-hdfs/bin/hdfs dfs -mkdir /user/shark/benchmark") + ssh_shark("/root/ephemeral-hdfs/bin/hadoop fs -mkdir /user/shark/benchmark") except Exception: pass # Folder may already exist add_aws_credentials(opts.shark_host, "root", opts.shark_identity_file, - "/root/mapreduce/conf/core-site.xml", opts.aws_key_id, opts.aws_key) + "/root/ephemeral-hdfs/conf/core-site.xml", opts.aws_key_id, opts.aws_key) - ssh_shark("/root/mapreduce/bin/start-mapred.sh") + ssh_shark("/root/ephemeral-hdfs/bin/start-mapred.sh") ssh_shark( - "/root/mapreduce/bin/hadoop distcp " \ + "/root/ephemeral-hdfs/bin/hadoop distcp " \ "s3n://big-data-benchmark/pavlo/%s/%s/rankings/ " \ "/user/shark/benchmark/rankings/" % (opts.file_format, opts.data_prefix)) ssh_shark( - "/root/mapreduce/bin/hadoop distcp " \ + "/root/ephemeral-hdfs/bin/hadoop distcp " \ "s3n://big-data-benchmark/pavlo/%s/%s/uservisits/ " \ "/user/shark/benchmark/uservisits/" % ( opts.file_format, opts.data_prefix)) ssh_shark( - "/root/mapreduce/bin/hadoop distcp " \ + "/root/ephemeral-hdfs/bin/hadoop distcp " \ "s3n://big-data-benchmark/pavlo/%s/%s/crawl/ " \ "/user/shark/benchmark/crawl/" % (opts.file_format, opts.data_prefix)) # Scratch table used for JVM warmup ssh_shark( - "/root/mapreduce/bin/hadoop distcp /user/shark/benchmark/rankings " \ + "/root/ephemeral-hdfs/bin/hadoop distcp /user/shark/benchmark/rankings " \ "/user/shark/benchmark/scratch" ) @@ -241,15 +386,6 @@ def ssh_shark(command): "/root/url_count.py") ssh_shark("/root/spark-ec2/copy-dir /root/url_count.py") - ssh_shark(""" - mv shark shark-back; - git clone https://github.com/ahirreddy/shark.git -b branch-0.8; - cp shark-back/conf/shark-env.sh shark/conf/shark-env.sh; - cd shark; - sbt/sbt assembly; - /root/spark-ec2/copy-dir --delete /root/shark; - """) - ssh_shark( "/root/shark/bin/shark -e \"DROP TABLE IF EXISTS rankings; " \ "CREATE EXTERNAL TABLE rankings (pageURL STRING, pageRank INT, " \ @@ -278,7 +414,7 @@ def ssh_shark(command): def prepare_impala_dataset(opts): def ssh_impala(command): - ssh(opts.impala_host, "ubuntu", opts.impala_identity_file, command) + ssh(opts.impala_host, "root", opts.impala_identity_file, command) if not opts.skip_s3_import: print "=== IMPORTING BENCHMARK FROM S3 ===" @@ -290,9 +426,9 @@ def ssh_impala(command): ssh_impala("sudo chmod 777 /etc/hadoop/conf/hdfs-site.xml") ssh_impala("sudo chmod 777 /etc/hadoop/conf/core-site.xml") - add_aws_credentials(opts.impala_host, "ubuntu", opts.impala_identity_file, + add_aws_credentials(opts.impala_host, "root", opts.impala_identity_file, "/etc/hadoop/conf/hdfs-site.xml", opts.aws_key_id, opts.aws_key) - add_aws_credentials(opts.impala_host, "ubuntu", opts.impala_identity_file, + add_aws_credentials(opts.impala_host, "root", opts.impala_identity_file, "/etc/hadoop/conf/core-site.xml", opts.aws_key_id, opts.aws_key) ssh_impala( @@ -329,6 +465,19 @@ def ssh_impala(command): "TERMINATED BY \\\"\\001\\\" " \ "STORED AS SEQUENCEFILE LOCATION \\\"/tmp/benchmark/scratch\\\";\"") + if opts.parquet: + print "=== CONVERTING TABLES TO PARQUET ===" + + ssh(opts.impala_data_node, "root", opts.impala_identity_file, + "impala-shell -r -q \"CREATE TABLE rankings_parquet STORED AS PARQUET AS SELECT * FROM rankings; " \ + "DROP TABLE rankings; " \ + "ALTER TABLE rankings_parquet RENAME TO rankings;\"") + + ssh(opts.impala_data_node, "root", opts.impala_identity_file, + "impala-shell -r -q \"CREATE TABLE uservisits_parquet STORED AS PARQUET AS SELECT * FROM uservisits; " \ + "DROP TABLE uservisits; " \ + "ALTER TABLE uservisits_parquet RENAME TO uservisits;\"") + print "=== FINISHED CREATING BENCHMARK DATA ===" def prepare_hive_dataset(opts): @@ -590,10 +739,12 @@ def main(): if opts.impala: prepare_impala_dataset(opts) - if opts.shark: - prepare_shark_dataset(opts) + if opts.spark: + prepare_spark_dataset(opts) if opts.redshift: prepare_redshift_dataset(opts) + if opts.shark: + prepare_shark_dataset(opts) if opts.hive: prepare_hive_dataset(opts) if opts.hive_tez: diff --git a/runner/run_query.py b/runner/run_query.py index 8c7407b..f218407 100644 --- a/runner/run_query.py +++ b/runner/run_query.py @@ -171,10 +171,12 @@ def parse_args(): parser.add_option("-m", "--impala", action="store_true", default=False, help="Whether to include Impala") - parser.add_option("-s", "--shark", action="store_true", default=False, - help="Whether to include Shark") + parser.add_option("-s", "--spark", action="store_true", default=False, + help="Whether to include Spark SQL") parser.add_option("-r", "--redshift", action="store_true", default=False, help="Whether to include Redshift") + parser.add_option("--shark", action="store_true", default=False, + help="Whether to include Shark") parser.add_option("--hive", action="store_true", default=False, help="Whether to include Hive") parser.add_option("--tez", action="store_true", default=False, @@ -182,21 +184,25 @@ def parse_args(): parser.add_option("--hive-cdh", action="store_true", default=False, help="Hive on CDH cluster") - parser.add_option("-g", "--shark-no-cache", action="store_true", + parser.add_option("-g", "--spark-no-cache", action="store_true", + default=False, help="Disable caching in Spark SQL") + parser.add_option("--shark-no-cache", action="store_true", default=False, help="Disable caching in Shark") parser.add_option("--impala-use-hive", action="store_true", default=False, help="Use Hive for query executio on Impala nodes") - parser.add_option("-t", "--reduce-tasks", type="int", default=150, - help="Number of reduce tasks in Shark") + parser.add_option("-t", "--reduce-tasks", type="int", default=200, + help="Number of reduce tasks in Shark & Spark SQL") parser.add_option("-z", "--clear-buffer-cache", action="store_true", default=False, help="Clear disk buffer cache between query runs") parser.add_option("-a", "--impala-hosts", help="Hostnames of Impala nodes (comma seperated)") - parser.add_option("-b", "--shark-host", - help="Hostname of Shark master node") + parser.add_option("-b", "--spark-host", + help="Hostname of Spark master node") parser.add_option("-c", "--redshift-host", help="Hostname of Redshift ODBC endpoint") + parser.add_option("--shark-host", + help="Hostname of Shark master node") parser.add_option("--hive-host", help="Hostname of Hive master node") parser.add_option("--hive-slaves", @@ -204,7 +210,9 @@ def parse_args(): parser.add_option("-x", "--impala-identity-file", help="SSH private key file to use for logging into Impala node") - parser.add_option("-y", "--shark-identity-file", + parser.add_option("-y", "--spark-identity-file", + help="SSH private key file to use for logging into Spark node") + parser.add_option("--shark-identity-file", help="SSH private key file to use for logging into Shark node") parser.add_option("--hive-identity-file", help="SSH private key file to use for logging into Hive node") @@ -218,6 +226,8 @@ def parse_args(): help="Number of trials to run for this query") parser.add_option("--prefix", type="string", default="", help="Prefix result files with this string") + parser.add_option("--shark-mem", type="string", default="24g", + help="How much executor memory shark nodes should use") parser.add_option("-q", "--query-num", default="1a", help="Which query to run in benchmark: " \ @@ -225,7 +235,7 @@ def parse_args(): (opts, args) = parser.parse_args() - if not (opts.impala or opts.shark or opts.redshift or opts.hive or opts.hive_cdh): + if not (opts.impala or opts.spark or opts.shark or opts.redshift or opts.hive or opts.hive_cdh): parser.print_help() sys.exit(1) @@ -234,6 +244,12 @@ def parse_args(): print >> stderr, "Impala requires identity file and hostname" sys.exit(1) + if opts.spark and (opts.spark_identity_file is None or + opts.spark_host is None): + print >> stderr, \ + "Spark requires identity file and hostname" + sys.exit(1) + if opts.shark and (opts.shark_identity_file is None or opts.shark_host is None): print >> stderr, \ @@ -281,6 +297,115 @@ def scp_from(host, identity_file, username, remote_file, local_file): "scp -q -o StrictHostKeyChecking=no -i %s '%s@%s:%s' '%s'" % (identity_file, username, host, remote_file, local_file), shell=True) +def run_spark_benchmark(opts): + def ssh_spark(command): + command = "source /root/.bash_profile; %s" % command + ssh(opts.spark_host, "root", opts.spark_identity_file, command) + + local_clean_query = CLEAN_QUERY + local_query_map = QUERY_MAP + + prefix = str(time.time()).split(".")[0] + query_file_name = "%s_workload.sh" % prefix + local_query_file = os.path.join(LOCAL_TMP_DIR, query_file_name) + query_file = open(local_query_file, 'w') + remote_result_file = "/mnt/%s_results" % prefix + remote_tmp_file = "/mnt/%s_out" % prefix + remote_query_file = "/mnt/%s" % query_file_name + + runner = "/root/spark/bin/beeline -u jdbc:hive2://localhost:10000 -n root" + + # Two modes here: Spark SQL Mem and Spark SQL Disk. If using Spark SQL disk use + # uncached tables. If using Spark SQL Mem, used cached tables. + + query_list = "set spark.sql.codegen=true; set spark.sql.shuffle.partitions = %s;" % opts.reduce_tasks + + # Create cached queries for Spark SQL Mem + if not opts.spark_no_cache: + + # Set up cached tables + if '4' in opts.query_num: + # Query 4 uses entirely different tables + query_list += """ + CACHE TABLE documents; + SELECT COUNT(*) FROM documents; + """ + else: + query_list += """ + CACHE TABLE uservisits; + CACHE TABLE rankings; + SELECT COUNT(*) FROM uservisits; + SELECT COUNT(*) FROM rankings; + """ + + if '4' not in opts.query_num: + query_list += local_clean_query + query_list += local_query_map[opts.query_num][0] + + # Store the result only in mem + if not opts.spark_no_cache: + query_list = query_list.replace("CREATE TABLE", "CACHE TABLE") + + query_list = re.sub("\s\s+", " ", query_list.replace('\n', ' ')) + + print "\nQuery:" + print query_list.replace(';', ";\n") + + query_file.write( + "%s %s > %s 2>&1\n" % (runner, " ".join("-e '%s'" % q.strip() for q in query_list.split(";") if q.strip()), remote_tmp_file)) + + query_file.write( + "cat %s | grep seconds >> %s\n" % ( + remote_tmp_file, remote_result_file)) + + query_file.close() + + print "Copying files to Spark" + scp_to(opts.spark_host, opts.spark_identity_file, "root", local_query_file, + remote_query_file) + ssh_spark("chmod 775 %s" % remote_query_file) + + # Run benchmark + print "Running remote benchmark..." + + # Collect results + results = [] + contents = [] + + for i in range(opts.num_trials): + print "Query %s : Trial %i" % (opts.query_num, i+1) + ssh_spark("%s" % remote_query_file) + local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) + scp_from(opts.spark_host, opts.spark_identity_file, "root", + "/mnt/%s_results" % prefix, local_results_file) + content = open(local_results_file).readlines() + all_times = [float(x.split("(")[1].split(" ")[0]) for x in content] + + if '4' in opts.query_num: + query_times = all_times[-4:] + part_a = query_times[1] + part_b = query_times[3] + print "Parts: %s, %s" % (part_a, part_b) + result = float(part_a) + float(part_b) + else: + result = all_times[-1] # Only want time of last query + + print "Result: ", result + print "Raw Times: ", content + + results.append(result) + contents.append(content) + + # Clean-up + #ssh_shark("rm /mnt/%s*" % prefix) + print "Clean Up...." + ssh_spark("rm /mnt/%s_results" % prefix) + os.remove(local_results_file) + + os.remove(local_query_file) + + return results, contents + def run_shark_benchmark(opts): def ssh_shark(command): command = "source /root/.bash_profile; %s" % command @@ -291,28 +416,13 @@ def ssh_shark(command): prefix = str(time.time()).split(".")[0] query_file_name = "%s_workload.sh" % prefix - slaves_file_name = "%s_slaves" % prefix local_query_file = os.path.join(LOCAL_TMP_DIR, query_file_name) - local_slaves_file = os.path.join(LOCAL_TMP_DIR, slaves_file_name) query_file = open(local_query_file, 'w') remote_result_file = "/mnt/%s_results" % prefix remote_tmp_file = "/mnt/%s_out" % prefix remote_query_file = "/mnt/%s" % query_file_name - runner = "/root/shark/bin/shark-withinfo" - - print "Getting Slave List" - scp_from(opts.shark_host, opts.shark_identity_file, "root", - "/root/spark-ec2/slaves", local_slaves_file) - slaves = map(str.strip, open(local_slaves_file).readlines()) - - print "Restarting standalone scheduler..." - ssh_shark("/root/spark/bin/stop-all.sh") - ensure_spark_stopped_on_slaves(slaves) - time.sleep(30) - ssh_shark("/root/spark/bin/stop-all.sh") - ssh_shark("/root/spark/bin/start-all.sh") - time.sleep(10) + runner = "export SPARK_MEM=%s; /root/shark/bin/shark" % opts.shark_mem # Two modes here: Shark Mem and Shark Disk. If using Shark disk clear buffer # cache in-between each query. If using Shark Mem, used cached tables. @@ -385,8 +495,6 @@ def convert_to_cached(query): contents = [] for i in range(opts.num_trials): - print "Stopping Executors on Slaves....." - ensure_spark_stopped_on_slaves(slaves) print "Query %s : Trial %i" % (opts.query_num, i+1) ssh_shark("%s" % remote_query_file) local_results_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) @@ -416,7 +524,6 @@ def convert_to_cached(query): ssh_shark("rm /mnt/%s_results" % prefix) os.remove(local_results_file) - os.remove(local_slaves_file) os.remove(local_query_file) return results, contents @@ -424,10 +531,10 @@ def convert_to_cached(query): def run_impala_benchmark(opts): impala_host = opts.impala_hosts[0] def ssh_impala(command): - ssh(impala_host, "ubuntu", opts.impala_identity_file, command) + ssh(impala_host, "root", opts.impala_identity_file, command) def clear_buffer_cache_impala(host): - ssh(host, "ubuntu", opts.impala_identity_file, + ssh(host, "root", opts.impala_identity_file, "sudo bash -c \"sync && echo 3 > /proc/sys/vm/drop_caches\"") runner = "impala-shell -r -q" @@ -441,7 +548,7 @@ def clear_buffer_cache_impala(host): remote_tmp_file = "/tmp/%s_tmp" % prefix remote_result_file = "/tmp/%s_results" % prefix - query_file.write("hive -e '%s'\n" % IMPALA_MAP[opts.query_num]) + query_file.write("%s '%s'\n" % (runner, IMPALA_MAP[opts.query_num])) query = QUERY_MAP[opts.query_num][1] if '3c' in opts.query_num: @@ -453,7 +560,6 @@ def clear_buffer_cache_impala(host): # Populate the full buffer cache if running Impala + cached if (not opts.impala_use_hive) and (not opts.clear_buffer_cache): - query = "set mem_limit=68g;" + query query = "select count(*) from rankings;" + query query = "select count(*) from uservisits;" + query @@ -465,12 +571,12 @@ def clear_buffer_cache_impala(host): "%s '%s%s' > %s 2>&1;\n" % (runner, connect_stmt, query, remote_tmp_file)) query_file.write("cat %s |egrep 'Inserted|Time' |grep -v MapReduce >> %s;\n" % ( remote_tmp_file, remote_result_file)) - query_file.write("hive -e '%s';\n" % CLEAN_QUERY) + query_file.write("%s '%s';\n" % (runner, CLEAN_QUERY)) query_file.close() remote_query_file = "/tmp/%s" % query_file_name print >> stderr, "Copying files to Impala" - scp_to(impala_host, opts.impala_identity_file, "ubuntu", + scp_to(impala_host, opts.impala_identity_file, "root", local_query_file, remote_query_file) ssh_impala("chmod 775 %s" % remote_query_file) @@ -486,7 +592,7 @@ def clear_buffer_cache_impala(host): # Collect results local_result_file = os.path.join(LOCAL_TMP_DIR, "%s_results" % prefix) - scp_from(impala_host, opts.impala_identity_file, "ubuntu", + scp_from(impala_host, opts.impala_identity_file, "root", remote_result_file, local_result_file) contents = open(local_result_file).readlines() @@ -768,6 +874,20 @@ def ssh_ret_code(host, user, id_file, cmd): return e.returncode def ensure_spark_stopped_on_slaves(slaves): + stop = False + while not stop: + cmd = "jps | grep ExecutorBackend" + ret_vals = map(lambda s: ssh_ret_code(s, "root", opts.spark_identity_file, cmd), slaves) + print ret_vals + if 0 in ret_vals: + print "Spark is still running on some slaves... sleeping" + cmd = "jps | grep ExecutorBackend | cut -d \" \" -f 1 | xargs -rn1 kill -9" + map(lambda s: ssh_ret_code(s, "root", opts.spark_identity_file, cmd), slaves) + time.sleep(2) + else: + stop = True + +def ensure_shark_stopped_on_slaves(slaves): stop = False while not stop: cmd = "jps | grep ExecutorBackend" @@ -789,8 +909,10 @@ def main(): if opts.impala: results, contents = run_impala_benchmark(opts) + if opts.spark: + results, contents = run_spark_benchmark(opts) if opts.shark: - results, contents = run_shark_benchmark(opts) + results, contents = run_shark_benchmark(opts) if opts.redshift: results = run_redshift_benchmark(opts) if opts.hive: @@ -803,10 +925,16 @@ def main(): fname = "impala_disk" else: fname = "impala_mem" - elif opts.shark and opts.shark_no_cache: - fname = "shark_disk" + elif opts.spark: + if opts.spark_no_cache: + fname = "spark_disk" + else: + fname = "spark_mem" elif opts.shark: - fname = "shark_mem" + if opts.shark_no_cache: + fname = "shark_disk" + else: + fname = "shark_mem" elif opts.redshift: fname = "redshift" elif opts.hive: