From 837149cf4a554258efdc511910724774ce73d7e2 Mon Sep 17 00:00:00 2001 From: Tmonster Date: Wed, 4 Sep 2024 14:11:29 +0200 Subject: [PATCH 01/31] add 500GB runs --- _report/index.Rmd | 20 ++++++++++++++++++-- _report/report.R | 3 +++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/_report/index.Rmd b/_report/index.Rmd index b92250ca..ac7f6f48 100644 --- a/_report/index.Rmd +++ b/_report/index.Rmd @@ -131,6 +131,16 @@ loop_benchplot(dt_join, report_name="join", syntax.dict=join.syntax.dict, except ![](./groupby/G1_1e9_1e2_0_0_advanced.png) +#### 500 GB {.active} + +##### **basic questions** + +![](./groupby/G1_1e10_1e4_10_0_basic.png) + +##### **advanced questions** + +![](./groupby/G1_1e10_1e4_10_0_advanced.png) + ### join {.tabset .tabset-fade .tabset-pills} #### 0.5 GB @@ -145,7 +155,7 @@ loop_benchplot(dt_join, report_name="join", syntax.dict=join.syntax.dict, except ![](./join/J1_1e7_NA_0_0_advanced.png) --> -#### 5 GB {.active} +#### 5 GB ##### **basic questions** @@ -158,12 +168,18 @@ loop_benchplot(dt_join, report_name="join", syntax.dict=join.syntax.dict, except ![](./join/J1_1e8_NA_0_0_advanced.png) --> -#### 50 GB +#### 50 GB {.active} ##### **basic questions** ![](./join/J1_1e9_NA_0_0_basic.png) +#### 500 GB + +##### **basic questions** + +![](./join/J1_1e10_NA_10_0_basic.png) + +###### 5 GB {.active} -#### 5 GB +####### **basic questions** -##### **basic questions** +![](./join/J1_1e8_NA_0_0_basic_small.png) -![](./join/J1_1e8_NA_0_0_basic.png) +###### 50 GB +####### **basic questions** - +### X-Large (128 cores / 256GB memory) -#### 50 GB {.active} +#### Task {.tabset .tabset-fade .tabset-pills} -##### **basic questions** +##### groupby {.tabset .tabset-fade .tabset-pills} + +###### 0.5 GB + +####### **basic questions** + +![](./groupby/G1_1e7_1e2_0_0_basic_xlarge.png) + +####### **advanced questions** -![](./join/J1_1e9_NA_0_0_basic.png) +![](./groupby/G1_1e7_1e2_0_0_advanced_xlarge.png) + +###### 5 GB + +####### **basic questions** + +![](./groupby/G1_1e8_1e2_0_0_basic_xlarge.png) + +####### **advanced questions** + +![](./groupby/G1_1e8_1e2_0_0_advanced_xlarge.png) + +###### 50 GB {.active} + +####### **basic questions** + +![](./groupby/G1_1e9_1e2_0_0_basic_xlarge.png) + +####### **advanced questions** + +![](./groupby/G1_1e9_1e2_0_0_advanced_xlarge.png) + +##### join {.tabset .tabset-fade .tabset-pills} + +###### 0.5 GB + +####### **basic questions** + +![](./join/J1_1e7_NA_0_0_basic_xlarge.png) + +###### 5 GB {.active} + +####### **basic questions** + +![](./join/J1_1e8_NA_0_0_basic_xlarge.png) + +###### 50 GB + +####### **basic questions** + +![](./join/J1_1e9_NA_0_0_basic_xlarge.png) #### 500 GB @@ -183,8 +229,6 @@ loop_benchplot(dt_join, report_name="join", syntax.dict=join.syntax.dict, except --- From a6dbf35a491a4859d36abb5af943a2c196688a01 Mon Sep 17 00:00:00 2001 From: Tmonster Date: Thu, 12 Sep 2024 10:55:33 +0200 Subject: [PATCH 27/31] add new line to path.env --- path.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/path.env b/path.env index 86335799..d4368ee1 100644 --- a/path.env +++ b/path.env @@ -1,4 +1,4 @@ export JULIA_HOME=/opt/julia-1.9.2 export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export PATH=$PATH:$JULIA_HOME/bin -export MOUNT_POINT=$HOME/db-benchmark-metal \ No newline at end of file +export MOUNT_POINT=$HOME/db-benchmark-metal From 7563d4a2ad6a0cb1609049429f69acdb1d428e14 Mon Sep 17 00:00:00 2001 From: Tmonster Date: Thu, 12 Sep 2024 14:55:16 +0200 Subject: [PATCH 28/31] add datafusion ability to go off disk --- datafusion/groupby-datafusion.py | 8 ++++++++ datafusion/join-datafusion.py | 10 ++++++++++ 2 files changed, 18 insertions(+) diff --git a/datafusion/groupby-datafusion.py b/datafusion/groupby-datafusion.py index 9d736b93..ee399376 100755 --- a/datafusion/groupby-datafusion.py +++ b/datafusion/groupby-datafusion.py @@ -32,6 +32,7 @@ def ans_shape(batches): on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] +mount_point = os.environ["MOUNT_POINT"] src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % data_name, flush=True) @@ -40,7 +41,14 @@ def ans_shape(batches): data = pacsv.read_csv(src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) + ctx = df.SessionContext() +if on_disk: + runtime = df.RuntimeConfig().with_temp_file_path(f"{mount_point}/datafusion/") + config = (df.SessionConfig()) + ctx = df.SessionContext(config, runtime) + + ctx.register_record_batches("x", [data.to_batches()]) in_rows = data.num_rows diff --git a/datafusion/join-datafusion.py b/datafusion/join-datafusion.py index 2f4c83d3..d81dfad8 100755 --- a/datafusion/join-datafusion.py +++ b/datafusion/join-datafusion.py @@ -32,6 +32,9 @@ def ans_shape(batches): on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] +machine_type = os.environ["MACHINE_TYPE"] +mount_point = os.environ["MOUNT_POINT"] + src_jn_x = os.path.join("data", data_name + ".csv") y_data_name = join_to_tbls(data_name) src_jn_y = [os.path.join("data", y_data_name[0] + ".csv"), os.path.join("data", y_data_name[1] + ".csv"), os.path.join("data", y_data_name[2] + ".csv")] @@ -41,10 +44,17 @@ def ans_shape(batches): print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) scale_factor = data_name.replace("J1_","")[:4].replace("_", "") on_disk = 'TRUE' if float(scale_factor) >= 1e10 else 'FALSE' +on_disk = 'TRUE' if machine_type == 'small' else 'FALSE' ctx = df.SessionContext() +if on_disk: + runtime = df.RuntimeConfig().with_temp_file_path(f"{mount_point}/datafusion/") + config = (df.SessionConfig()) + ctx = df.SessionContext(config, runtime) + + x_data = pacsv.read_csv(src_jn_x, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) ctx.register_record_batches("x", [x_data.to_batches()]) small_data = pacsv.read_csv(src_jn_y[0], convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) From 0b0e43f07d8b75740382770f1fb84703bc1d3e9a Mon Sep 17 00:00:00 2001 From: Tmonster Date: Fri, 13 Sep 2024 10:40:42 +0200 Subject: [PATCH 29/31] some updates --- collapse/groupby-collapse.R | 2 ++ collapse/join-collapse.R | 4 ++++ dask/setup-dask.sh | 2 +- duckdb/join-duckdb.R | 28 ++++++++++++++++------------ polars/groupby-polars.py | 2 ++ polars/join-polars.py | 2 ++ 6 files changed, 27 insertions(+), 13 deletions(-) diff --git a/collapse/groupby-collapse.R b/collapse/groupby-collapse.R index 9cdc09b4..4e0e2b5d 100755 --- a/collapse/groupby-collapse.R +++ b/collapse/groupby-collapse.R @@ -15,10 +15,12 @@ fun = "group_by" cache = TRUE data_name = Sys.getenv("SRC_DATANAME") +machine_type = Sys.getenv("MACHINE_TYPE") src_grp = file.path("data", paste(data_name, "csv", sep=".")) cat(sprintf("loading dataset %s\n", data_name)) on_disk = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e10 +on_disk = on_disk || (machine_type == "small" && as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e9) x = data.table::fread(src_grp, showProgress=FALSE, stringsAsFactors=TRUE, na.strings="", data.table=FALSE) print(nrow(x)) diff --git a/collapse/join-collapse.R b/collapse/join-collapse.R index 534cbe83..c0b4feb8 100755 --- a/collapse/join-collapse.R +++ b/collapse/join-collapse.R @@ -15,12 +15,16 @@ cache = TRUE on_disk = FALSE data_name = Sys.getenv("SRC_DATANAME") +machine_type = Sys.getenv("MACHINE_TYPE") src_jn_x = file.path("data", paste(data_name, "csv", sep=".")) y_data_name = join_to_tbls(data_name) src_jn_y = setNames(file.path("data", paste(y_data_name, "csv", sep=".")), names(y_data_name)) stopifnot(length(src_jn_y)==3L) cat(sprintf("loading datasets %s\n", paste(c(data_name, y_data_name), collapse=", "))) +on_disk = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e10 +on_disk = on_disk || (machine_type == "small" && as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e9) + x = data.table::fread(src_jn_x, showProgress=FALSE, stringsAsFactors=TRUE, data.table=FALSE, na.strings="") data.table::setDF(x) JN = lapply(sapply(simplify=FALSE, src_jn_y, data.table::fread, showProgress=FALSE, stringsAsFactors=TRUE, data.table=FALSE, na.strings=""), as.data.frame) diff --git a/dask/setup-dask.sh b/dask/setup-dask.sh index c6fac985..3c24ab07 100755 --- a/dask/setup-dask.sh +++ b/dask/setup-dask.sh @@ -1,7 +1,7 @@ #!/bin/bash set -e -virtualenv dask/py-dask --python=python3.10 +virtualenv dask/py-dask --python=python3.12 source dask/py-dask/bin/activate # install binaries diff --git a/duckdb/join-duckdb.R b/duckdb/join-duckdb.R index afa4af1e..644c0554 100755 --- a/duckdb/join-duckdb.R +++ b/duckdb/join-duckdb.R @@ -33,7 +33,7 @@ if (on_disk) { print("using disk memory-mapped data storage") con = dbConnect(duckdb::duckdb(), dbdir=duckdb_join_db) if (machine_type == "small") { - invisible(dbExecute(con, "pragma memory_limit='50GB'")) + invisible(dbExecute(con, "pragma memory_limit='40GB'")) } } else { print("using in-memory data storage") @@ -46,7 +46,6 @@ if (less_cores) { } invisible(dbExecute(con, sprintf("PRAGMA THREADS=%d", ncores))) -invisible(dbExecute(con, "SET memory_limit='220GB'")) git = dbGetQuery(con, "SELECT source_id FROM pragma_version()")[[1L]] invisible({ @@ -103,8 +102,13 @@ question = "small inner on int" # q1 fun = "inner_join" +table_type = "" +if (on_disk) { + table_type = "TEMP" +} + t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, small.id4 AS small_id4, v2 FROM x JOIN small USING (id1)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, small.id4 AS small_id4, v2 FROM x JOIN small USING (id1)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -112,7 +116,7 @@ chkt = system.time(chk<-dbGetQuery(con, "SELECT SUM(v1) AS v1, SUM(v2) AS v2 FRO write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, small.id4 AS small_id4, v2 FROM x JOIN small USING (id1)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, small.id4 AS small_id4, v2 FROM x JOIN small USING (id1)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -127,7 +131,7 @@ fun = "inner_join" t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 AS medium_id5, v2 FROM x JOIN medium USING (id2)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 AS medium_id5, v2 FROM x JOIN medium USING (id2)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -135,7 +139,7 @@ chkt = system.time(chk<-dbGetQuery(con, "SELECT SUM(v1) AS v1, SUM(v2) AS v2 FRO write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 AS medium_id5, v2 FROM x JOIN medium USING (id2)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 AS medium_id5, v2 FROM x JOIN medium USING (id2)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -149,7 +153,7 @@ question = "medium outer on int" # q3 fun = "left_join" t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 AS medium_id5, v2 FROM x LEFT JOIN medium USING (id2)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 AS medium_id5, v2 FROM x LEFT JOIN medium USING (id2)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -157,7 +161,7 @@ chkt = system.time(chk<-dbGetQuery(con, "SELECT SUM(v1) AS v1, SUM(v2) AS v2 FRO write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 AS medium_id5, v2 FROM x LEFT JOIN medium USING (id2)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id4 AS medium_id4, medium.id5 AS medium_id5, v2 FROM x LEFT JOIN medium USING (id2)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -171,7 +175,7 @@ question = "medium inner on factor" # q4 fun = "inner_join" t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id2 AS medium_id2, medium.id4 AS medium_id4, v2 FROM x JOIN medium USING (id5)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id2 AS medium_id2, medium.id4 AS medium_id4, v2 FROM x JOIN medium USING (id5)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -179,7 +183,7 @@ chkt = system.time(chk<-dbGetQuery(con, "SELECT SUM(v1) AS v1, SUM(v2) AS v2 FRO write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id2 AS medium_id2, medium.id4 AS medium_id4, v2 FROM x JOIN medium USING (id5)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, medium.id1 AS medium_id1, medium.id2 AS medium_id2, medium.id4 AS medium_id4, v2 FROM x JOIN medium USING (id5)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -193,7 +197,7 @@ question = "big inner on int" # q5 fun = "inner_join" t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, big.id1 AS big_id1, big.id2 AS big_id2, big.id4 AS big_id4, big.id5 AS big_id5, big.id6 AS big_id6, v2 FROM x JOIN big USING (id3)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, big.id1 AS big_id1, big.id2 AS big_id2, big.id4 AS big_id4, big.id5 AS big_id5, big.id6 AS big_id6, v2 FROM x JOIN big USING (id3)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() @@ -201,7 +205,7 @@ chkt = system.time(chk<-dbGetQuery(con, "SELECT SUM(v1) AS v1, SUM(v2) AS v2 FRO write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) t = system.time({ - dbExecute(con, "CREATE TEMP TABLE ans AS SELECT x.*, big.id1 AS big_id1, big.id2 AS big_id2, big.id4 AS big_id4, big.id5 AS big_id5, big.id6 AS big_id6, v2 FROM x JOIN big USING (id3)") + dbExecute(con, sprintf("CREATE %s TABLE ans AS SELECT x.*, big.id1 AS big_id1, big.id2 AS big_id2, big.id4 AS big_id4, big.id5 AS big_id5, big.id6 AS big_id6, v2 FROM x JOIN big USING (id3)", table_type)) print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) })[["elapsed"]] m = memory_usage() diff --git a/polars/groupby-polars.py b/polars/groupby-polars.py index df60d489..e5ee24d7 100755 --- a/polars/groupby-polars.py +++ b/polars/groupby-polars.py @@ -20,6 +20,7 @@ mount_point = os.environ["MOUNT_POINT"] data_name = os.environ["SRC_DATANAME"] +machine_type = os.environ["MACHINE_TYPE"] src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % data_name, flush=True) @@ -29,6 +30,7 @@ scale_factor = data_name.replace("G1_","")[:4].replace("_", "") on_disk = 'TRUE' if float(scale_factor) >= 1e10 else 'FALSE' +on_disk = 'TRUE' if on_disk or (machine_type == "small" and float(scale_factor) >= 1e9) else 'FALSE' in_rows = x.shape[0] x.write_ipc(f"{mount_point}/polars/tmp.ipc") diff --git a/polars/join-polars.py b/polars/join-polars.py index dd50fcc1..cf3e7d16 100755 --- a/polars/join-polars.py +++ b/polars/join-polars.py @@ -18,6 +18,7 @@ on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] +machine_type = os.environ["MACHINE_TYPE"] src_jn_x = os.path.join("data", data_name + ".csv") y_data_name = join_to_tbls(data_name) src_jn_y = [os.path.join("data", y_data_name[0] + ".csv"), os.path.join("data", y_data_name[1] + ".csv"), os.path.join("data", y_data_name[2] + ".csv")] @@ -27,6 +28,7 @@ scale_factor = data_name.replace("J1_","")[:4].replace("_", "") on_disk = 'TRUE' if float(scale_factor) >= 1e10 else 'FALSE' +on_disk = 'TRUE' if on_disk or (machine_type == "small" and float(scale_factor) >= 1e9) else 'FALSE' print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) From 31c1f298aa61c74cd6b9bf2c55ffd715fa1f2309 Mon Sep 17 00:00:00 2001 From: Tmonster Date: Wed, 18 Sep 2024 10:24:05 +0200 Subject: [PATCH 30/31] remove code to run 500GB join. --- _control/data.csv | 5 ++--- _run/partitioned_run.sh | 17 ----------------- _run/run_join_xl.sh | 23 ----------------------- _utils/download_data.sh | 31 ------------------------------- collapse/groupby-collapse.R | 2 -- collapse/join-collapse.R | 2 -- datafusion/join-datafusion.py | 2 -- duckdb/groupby-duckdb.R | 5 ----- duckdb/join-duckdb.R | 4 ---- polars/groupby-polars.py | 2 -- polars/join-polars.py | 4 ---- run.conf | 6 ++---- run.sh | 5 ----- 13 files changed, 4 insertions(+), 104 deletions(-) delete mode 100755 _run/run_join_xl.sh delete mode 100644 _utils/download_data.sh diff --git a/_control/data.csv b/_control/data.csv index 4a874f63..864104d9 100644 --- a/_control/data.csv +++ b/_control/data.csv @@ -14,12 +14,11 @@ groupby,G1_1e9_1e1_0_0,1e9,1e1,0,0,1 groupby,G1_1e9_2e0_0_0,1e9,2e0,0,0,1 groupby,G1_1e9_1e2_0_1,1e9,1e2,0,1,1 groupby,G1_1e9_1e2_5_0,1e9,1e2,5,0,1 -groupby,G1_1e10_1e4_10_0,1e10,1e4,10,0,1 +groupby,G1_1e10_1e4_10_0,1e10,1e4,10,0,1 join,J1_1e7_NA_0_0,1e7,NA,0,0,1 join,J1_1e7_NA_5_0,1e7,NA,5,0,1 join,J1_1e7_NA_0_1,1e7,NA,0,1,1 join,J1_1e8_NA_0_0,1e8,NA,0,0,1 join,J1_1e8_NA_5_0,1e8,NA,5,0,1 join,J1_1e8_NA_0_1,1e8,NA,0,1,1 -join,J1_1e9_NA_0_0,1e9,NA,0,0,1 -join,J1_1e10_NA_0_0,1e10,NA,0,0,1 \ No newline at end of file +join,J1_1e9_NA_0_0,1e9,NA,0,0,1 \ No newline at end of file diff --git a/_run/partitioned_run.sh b/_run/partitioned_run.sh index 9675d644..f29ba9b5 100644 --- a/_run/partitioned_run.sh +++ b/_run/partitioned_run.sh @@ -5,22 +5,5 @@ ./_run/run_groupby_xl.sh -./_run/run_join_xl.sh - - -# rename the png files to reflect the machine type that -# benchmark was run on. -for f in public/groupby/*; do - ext=".png"; - [[ $f =~ \. ]]; - mv "$f" "${f%%.*}"_$MACHINE_TYPE$ext; -done -for f in public/join/*; do - ext=".png"; - [[ $f =~ \. ]]; - mv "$f" "${f%%.*}"_$MACHINE_TYPE$ext; -done - - # call code to rename images \ No newline at end of file diff --git a/_run/run_join_xl.sh b/_run/run_join_xl.sh deleted file mode 100755 index 56e772a6..00000000 --- a/_run/run_join_xl.sh +++ /dev/null @@ -1,23 +0,0 @@ -# get join large (500GB dataset) -aws s3 cp s3://duckdb-blobs/data/db-benchmark-data/join-500gb.duckdb data/join-500gb.duckdb - - -# expand groupby-small datasets to csv -duckdb data/join-500gb.duckdb -c "copy x to 'data/J1_NA_0_0.csv' (FORMAT CSV)" -duckdb data/join-500gb.duckdb -c "copy big to 'data/J1_1e10_0_0.csv' (FORMAT CSV)" -duckdb data/join-500gb.duckdb -c "copy medium to 'data/J1_1e7_0_0.csv' (FORMAT CSV)" -duckdb data/join-500gb.duckdb -c "copy small to 'data/J1_1e4_0_0.csv' (FORMAT CSV)" - - -cp _control/data_join_xlarge.csv _control/data.csv - -echo "Running join x-large (500GB)" -./run.sh - -### -echo "done..." -echo "removing data files" -rm data/*.csv -rm data/*.duckdb - - diff --git a/_utils/download_data.sh b/_utils/download_data.sh deleted file mode 100644 index f011ad9a..00000000 --- a/_utils/download_data.sh +++ /dev/null @@ -1,31 +0,0 @@ - -# get small data -wget https://duckdb-blobs.s3.amazonaws.com/data/db-benchmark-data/groupby_small.duckdb -~/duckdb groupby_small.duckdb -c "copy G1_1e7_1e2_0_0 to 'G1_1e7_1e2_0_0.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e7_1e1_0_0 to 'G1_1e7_1e1_0_0.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e7_2e0_0_0 to 'G1_1e7_2e0_0_0.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e7_1e2_0_1 to 'G1_1e7_1e2_0_1.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e7_1e2_5_0 to 'G1_1e7_1e2_5_0.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e8_1e2_0_0 to 'G1_1e8_1e2_0_0.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e8_1e1_0_0 to 'G1_1e8_1e1_0_0.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e8_2e0_0_0 to 'G1_1e8_2e0_0_0.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e8_1e2_0_1 to 'G1_1e8_1e2_0_1.csv' (FORMAT CSV)" -~/duckdb groupby_small.duckdb -c "copy G1_1e8_1e2_5_0 to 'G1_1e8_1e2_5_0.csv' (FORMAT CSV)" - -wget https://duckdb-blobs.s3.amazonaws.com/data/db-benchmark-data/join_small.duckdb - -# get large data -wget https://duckdb-blobs.s3.amazonaws.com/data/db-benchmark-data/join_large.duckdb - -wget https://duckdb-blobs.s3.amazonaws.com/data/db-benchmark-data/groupby_large.duckdb -~/duckdb -c "copy G1_1e9_1e2_0_0 to 'G1_1e9_1e2_0_0.csv' (FORMAT CSV)" -~/duckdb -c "copy G1_1e9_1e1_0_0 to 'G1_1e9_1e1_0_0.csv' (FORMAT CSV)" -~/duckdb -c "copy G1_1e9_2e0_0_0 to 'G1_1e9_2e0_0_0.csv' (FORMAT CSV)" -~/duckdb -c "copy G1_1e9_1e2_0_1 to 'G1_1e9_1e2_0_1.csv' (FORMAT CSV)" -~/duckdb -c "copy G1_1e9_1e2_5_0 to 'G1_1e9_1e2_5_0.csv' (FORMAT CSV)" - -# get 500GB data -wget https://duckdb-blobs.s3.amazonaws.com/data/db-benchmark-data/join-500gb.duckdb - -# ??? -wget https://duckdb-blobs.s3.amazonaws.com/data/db-benchmark-data/groupby-500gb.duckdb \ No newline at end of file diff --git a/collapse/groupby-collapse.R b/collapse/groupby-collapse.R index 4e0e2b5d..9cdc09b4 100755 --- a/collapse/groupby-collapse.R +++ b/collapse/groupby-collapse.R @@ -15,12 +15,10 @@ fun = "group_by" cache = TRUE data_name = Sys.getenv("SRC_DATANAME") -machine_type = Sys.getenv("MACHINE_TYPE") src_grp = file.path("data", paste(data_name, "csv", sep=".")) cat(sprintf("loading dataset %s\n", data_name)) on_disk = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e10 -on_disk = on_disk || (machine_type == "small" && as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e9) x = data.table::fread(src_grp, showProgress=FALSE, stringsAsFactors=TRUE, na.strings="", data.table=FALSE) print(nrow(x)) diff --git a/collapse/join-collapse.R b/collapse/join-collapse.R index c0b4feb8..4dc18cc7 100755 --- a/collapse/join-collapse.R +++ b/collapse/join-collapse.R @@ -15,7 +15,6 @@ cache = TRUE on_disk = FALSE data_name = Sys.getenv("SRC_DATANAME") -machine_type = Sys.getenv("MACHINE_TYPE") src_jn_x = file.path("data", paste(data_name, "csv", sep=".")) y_data_name = join_to_tbls(data_name) src_jn_y = setNames(file.path("data", paste(y_data_name, "csv", sep=".")), names(y_data_name)) @@ -23,7 +22,6 @@ stopifnot(length(src_jn_y)==3L) cat(sprintf("loading datasets %s\n", paste(c(data_name, y_data_name), collapse=", "))) on_disk = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e10 -on_disk = on_disk || (machine_type == "small" && as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e9) x = data.table::fread(src_jn_x, showProgress=FALSE, stringsAsFactors=TRUE, data.table=FALSE, na.strings="") data.table::setDF(x) diff --git a/datafusion/join-datafusion.py b/datafusion/join-datafusion.py index d81dfad8..ee106bd2 100755 --- a/datafusion/join-datafusion.py +++ b/datafusion/join-datafusion.py @@ -32,7 +32,6 @@ def ans_shape(batches): on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] -machine_type = os.environ["MACHINE_TYPE"] mount_point = os.environ["MOUNT_POINT"] src_jn_x = os.path.join("data", data_name + ".csv") @@ -44,7 +43,6 @@ def ans_shape(batches): print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) scale_factor = data_name.replace("J1_","")[:4].replace("_", "") on_disk = 'TRUE' if float(scale_factor) >= 1e10 else 'FALSE' -on_disk = 'TRUE' if machine_type == 'small' else 'FALSE' ctx = df.SessionContext() diff --git a/duckdb/groupby-duckdb.R b/duckdb/groupby-duckdb.R index 1a2705bc..e193dd44 100755 --- a/duckdb/groupby-duckdb.R +++ b/duckdb/groupby-duckdb.R @@ -16,21 +16,16 @@ fun = "group_by" cache = TRUE data_name = Sys.getenv("SRC_DATANAME") -machine_type = Sys.getenv("MACHINE_TYPE", "large") src_grp = file.path("data", paste(data_name, "csv", sep=".")) cat(sprintf("loading dataset %s\n", data_name)) db_file = sprintf('%s-%s-%s.db', solution, task, data_name) on_disk = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e10 -on_disk = on_disk || (as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e9 && machine_type == "small") uses_NAs = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][4L])>0 if (on_disk) { print("using disk memory-mapped data storage") con = dbConnect(duckdb::duckdb(), dbdir=db_file) - if (machine_type == "small") { - invisible(dbExecute(con, "pragma memory_limit='50GB'")) - } } else { print("using in-memory data storage") con = dbConnect(duckdb::duckdb()) diff --git a/duckdb/join-duckdb.R b/duckdb/join-duckdb.R index 644c0554..aad3a787 100755 --- a/duckdb/join-duckdb.R +++ b/duckdb/join-duckdb.R @@ -15,7 +15,6 @@ solution = "duckdb" cache = TRUE data_name = Sys.getenv("SRC_DATANAME") -machine_type = Sys.getenv("MACHINE_TYPE", "large") src_jn_x = file.path("data", paste(data_name, "csv", sep=".")) y_data_name = join_to_tbls(data_name) src_jn_y = setNames(file.path("data", paste(y_data_name, "csv", sep=".")), names(y_data_name)) @@ -32,9 +31,6 @@ uses_NAs = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][4L])>0 if (on_disk) { print("using disk memory-mapped data storage") con = dbConnect(duckdb::duckdb(), dbdir=duckdb_join_db) - if (machine_type == "small") { - invisible(dbExecute(con, "pragma memory_limit='40GB'")) - } } else { print("using in-memory data storage") con = dbConnect(duckdb::duckdb()) diff --git a/polars/groupby-polars.py b/polars/groupby-polars.py index e5ee24d7..df60d489 100755 --- a/polars/groupby-polars.py +++ b/polars/groupby-polars.py @@ -20,7 +20,6 @@ mount_point = os.environ["MOUNT_POINT"] data_name = os.environ["SRC_DATANAME"] -machine_type = os.environ["MACHINE_TYPE"] src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % data_name, flush=True) @@ -30,7 +29,6 @@ scale_factor = data_name.replace("G1_","")[:4].replace("_", "") on_disk = 'TRUE' if float(scale_factor) >= 1e10 else 'FALSE' -on_disk = 'TRUE' if on_disk or (machine_type == "small" and float(scale_factor) >= 1e9) else 'FALSE' in_rows = x.shape[0] x.write_ipc(f"{mount_point}/polars/tmp.ipc") diff --git a/polars/join-polars.py b/polars/join-polars.py index cf3e7d16..3f05f84b 100755 --- a/polars/join-polars.py +++ b/polars/join-polars.py @@ -18,17 +18,13 @@ on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] -machine_type = os.environ["MACHINE_TYPE"] src_jn_x = os.path.join("data", data_name + ".csv") y_data_name = join_to_tbls(data_name) src_jn_y = [os.path.join("data", y_data_name[0] + ".csv"), os.path.join("data", y_data_name[1] + ".csv"), os.path.join("data", y_data_name[2] + ".csv")] if len(src_jn_y) != 3: raise Exception("Something went wrong in preparing files used for join") - -scale_factor = data_name.replace("J1_","")[:4].replace("_", "") on_disk = 'TRUE' if float(scale_factor) >= 1e10 else 'FALSE' -on_disk = 'TRUE' if on_disk or (machine_type == "small" and float(scale_factor) >= 1e9) else 'FALSE' print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) diff --git a/run.conf b/run.conf index 94678ced..f151bf28 100644 --- a/run.conf +++ b/run.conf @@ -3,8 +3,6 @@ export RUN_TASKS="groupby join" # solution, used in init-setup-iteration.R export RUN_SOLUTIONS="collapse data.table juliads juliadf dplyr pandas pydatatable spark dask clickhouse polars R-arrow duckdb duckdb-latest datafusion" -export MACHINE_TYPE="small" - # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=false # force run, ignore if same version was run already @@ -18,5 +16,5 @@ export DO_REPORT=true export DO_PUBLISH=false # logging and timing files -export CSV_LOGS_FILE="logs_${MACHINE_TYPE}.csv" -export CSV_TIME_FILE="time_${MACHINE_TYPE}.csv" +export CSV_LOGS_FILE="logs.csv" +export CSV_TIME_FILE="time.csv" diff --git a/run.sh b/run.sh index 97584d05..77337e0b 100755 --- a/run.sh +++ b/run.sh @@ -30,11 +30,6 @@ else fi -if [[ $MACHINE_TYPE != 'small' && $MACHINE_TYPE != 'medium' && $MACHINE_TYPE != 'large' ]]; then - echo "Machine type is not valid. Must be \`small\` \`medium\` or \`large\` " - exit 1 -fi - # ensure directories exists mkdir -p ./out From 30fa0ed29489de1483fe98bb1a9482e22270bcd2 Mon Sep 17 00:00:00 2001 From: Tmonster Date: Wed, 18 Sep 2024 10:27:37 +0200 Subject: [PATCH 31/31] add code to fix report --- _report/index.Rmd | 128 +++++++++++----------------------------------- _report/report.R | 1 - 2 files changed, 31 insertions(+), 98 deletions(-) diff --git a/_report/index.Rmd b/_report/index.Rmd index 8725e559..b45a74c3 100644 --- a/_report/index.Rmd +++ b/_report/index.Rmd @@ -97,135 +97,69 @@ loop_benchplot(dt_join, report_name="join", syntax.dict=join.syntax.dict, except ``` -## Machine Type {.tabset .tabset-fade .tabset-pills} +## Task {.tabset .tabset-fade .tabset-pills} -### Small (32 cores / 64GB memory) +### groupby {.tabset .tabset-fade .tabset-pills} -#### Task {.tabset .tabset-fade .tabset-pills} +#### 0.5 GB -##### groupby {.tabset .tabset-fade .tabset-pills} - -###### 0.5 GB - -####### **basic questions** - -![](./groupby/G1_1e7_1e2_0_0_basic_small.png) - -####### **advanced questions** - -![](./groupby/G1_1e7_1e2_0_0_advanced_small.png) - -###### 5 GB - -####### **basic questions** - -![](./groupby/G1_1e8_1e2_0_0_basic_small.png) - -####### **advanced questions** - -![](./groupby/G1_1e8_1e2_0_0_advanced_small.png) - -###### 50 GB {.active} - -####### **basic questions** - -![](./groupby/G1_1e9_1e2_0_0_basic_small.png) - -####### **advanced questions** - -![](./groupby/G1_1e9_1e2_0_0_advanced_small.png) - -###### 500 GB - -####### **basic questions** - -![](./groupby/G1_1e10_1e4_10_0_basic_small.png) - -####### **advanced questions** - -![](./groupby/G1_1e10_1e4_0_0_advanced_small.png) - -##### join {.tabset .tabset-fade .tabset-pills} - -###### 0.5 GB - -####### **basic questions** - -![](./join/J1_1e7_NA_0_0_basic_small.png) - -###### 5 GB {.active} - -####### **basic questions** - -![](./join/J1_1e8_NA_0_0_basic_small.png) - -###### 50 GB - -####### **basic questions** - -![](./join/J1_1e9_NA_0_0_basic_small.png) - -### X-Large (128 cores / 256GB memory) - -#### Task {.tabset .tabset-fade .tabset-pills} - -##### groupby {.tabset .tabset-fade .tabset-pills} +##### **basic questions** -###### 0.5 GB +![](./groupby/G1_1e7_1e2_0_0_basic.png) -####### **basic questions** +##### **advanced questions** -![](./groupby/G1_1e7_1e2_0_0_basic_xlarge.png) +![](./groupby/G1_1e7_1e2_0_0_advanced.png) -####### **advanced questions** +#### 5 GB -![](./groupby/G1_1e7_1e2_0_0_advanced_xlarge.png) +##### **basic questions** -###### 5 GB +![](./groupby/G1_1e8_1e2_0_0_basic.png) -####### **basic questions** +##### **advanced questions** -![](./groupby/G1_1e8_1e2_0_0_basic_xlarge.png) +![](./groupby/G1_1e8_1e2_0_0_advanced.png) -####### **advanced questions** +#### 50 GB {.active} -![](./groupby/G1_1e8_1e2_0_0_advanced_xlarge.png) +##### **basic questions** -###### 50 GB {.active} +![](./groupby/G1_1e9_1e2_0_0_basic.png) -####### **basic questions** +##### **advanced questions** -![](./groupby/G1_1e9_1e2_0_0_basic_xlarge.png) +![](./groupby/G1_1e9_1e2_0_0_advanced.png) -####### **advanced questions** +#### 500 GB -![](./groupby/G1_1e9_1e2_0_0_advanced_xlarge.png) +##### **basic questions** -##### join {.tabset .tabset-fade .tabset-pills} +![](./groupby/G1_1e10_1e4_10_0_basic.png) -###### 0.5 GB +##### **advanced questions** -####### **basic questions** +![](./groupby/G1_1e10_1e4_0_0_advanced.png) -![](./join/J1_1e7_NA_0_0_basic_xlarge.png) +### join {.tabset .tabset-fade .tabset-pills} -###### 5 GB {.active} +#### 0.5 GB -####### **basic questions** +##### **basic questions** -![](./join/J1_1e8_NA_0_0_basic_xlarge.png) +![](./join/J1_1e7_NA_0_0_basic.png) -###### 50 GB +#### 5 GB {.active} -####### **basic questions** +##### **basic questions** -![](./join/J1_1e9_NA_0_0_basic_xlarge.png) +![](./join/J1_1e8_NA_0_0_basic.png) -#### 500 GB +#### 50 GB ##### **basic questions** -![](./join/J1_1e10_NA_10_0_basic.png) +![](./join/J1_1e9_NA_0_0_basic.png)