diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3e03704..8620498 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -32,14 +32,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install protobuf compiler shell: bash run: sudo apt-get install protobuf-compiler - name: Build Rust code run: cargo build --verbose - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_VERSION }} - name: Install test dependencies @@ -49,5 +49,12 @@ jobs: - name: Generate test data run: | ./scripts/gen-test-data.sh - - name: Run tests + - name: Run Rust tests run: cargo test --verbose + - name: Run Python tests + run: | + python -m venv venv + source venv/bin/activate + pip install -r requirements-in.txt + maturin develop + python -m pytest diff --git a/datafusion_ray/main.py b/scripts/main.py similarity index 100% rename from datafusion_ray/main.py rename to scripts/main.py diff --git a/src/query_stage.rs b/src/query_stage.rs index 084cd72..bce824a 100644 --- a/src/query_stage.rs +++ b/src/query_stage.rs @@ -18,7 +18,7 @@ use crate::context::serialize_execution_plan; use crate::shuffle::{ShuffleCodec, ShuffleReaderExec}; use datafusion::error::Result; -use datafusion::physical_plan::{ExecutionPlan, Partitioning}; +use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, Partitioning}; use datafusion::prelude::SessionContext; use datafusion_proto::bytes::physical_plan_from_bytes_with_extension_codec; use pyo3::prelude::*; @@ -99,10 +99,7 @@ impl QueryStage { /// Get the input partition count. This is the same as the number of concurrent tasks /// when we schedule this query stage for execution pub fn get_input_partition_count(&self) -> usize { - self.plan.children()[0] - .properties() - .output_partitioning() - .partition_count() + self.plan.output_partitioning().partition_count() } pub fn get_output_partition_count(&self) -> usize { diff --git a/testdata/expected-plans/q1.txt b/testdata/expected-plans/q1.txt index 8eaff99..9889d29 100644 --- a/testdata/expected-plans/q1.txt +++ b/testdata/expected-plans/q1.txt @@ -42,7 +42,7 @@ ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "l_return CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2)) -Query Stage #2 (2 -> 1): +Query Stage #2 (1 -> 1): SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2)) diff --git a/testdata/expected-plans/q10.txt b/testdata/expected-plans/q10.txt index 916dcbb..dd81b58 100644 --- a/testdata/expected-plans/q10.txt +++ b/testdata/expected-plans/q10.txt @@ -60,7 +60,7 @@ SortPreservingMergeExec: [revenue@2 DESC], fetch=20 DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name] @@ -117,7 +117,7 @@ ShuffleWriterExec(stage_id=7, output_partitioning=Hash([Column { name: "c_custke CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 2 }, Column { name: "c_phone", index: 3 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 6 }], 2)) -Query Stage #8 (2 -> 1): +Query Stage #8 (1 -> 1): SortPreservingMergeExec: [revenue@2 DESC], fetch=20 ShuffleReaderExec(stage_id=7, input_partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 3 }, Column { name: "c_phone", index: 6 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 7 }], 2)) diff --git a/testdata/expected-plans/q11.txt b/testdata/expected-plans/q11.txt index 4478944..8d822d7 100644 --- a/testdata/expected-plans/q11.txt +++ b/testdata/expected-plans/q11.txt @@ -86,13 +86,13 @@ SortPreservingMergeExec: [value@1 DESC] DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: n_name@1 = ALGERIA, projection=[n_nationkey@0] ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END, required_guarantees=[n_name in (ALGERIA)] -Query Stage #1 (1 -> 2): +Query Stage #1 (2 -> 2): ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey] @@ -120,13 +120,13 @@ ShuffleWriterExec(stage_id=4, output_partitioning=Hash([], 2)) CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "s_nationkey", index: 2 }], 2)) -Query Stage #5 (1 -> 2): +Query Stage #5 (2 -> 2): ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: n_name@1 = ALGERIA, projection=[n_nationkey@0] ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END, required_guarantees=[n_name in (ALGERIA)] -Query Stage #6 (1 -> 2): +Query Stage #6 (2 -> 2): ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey] @@ -167,7 +167,7 @@ ShuffleWriterExec(stage_id=10, output_partitioning=Hash([Column { name: "ps_part CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=9, input_partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2)) -Query Stage #11 (2 -> 1): +Query Stage #11 (1 -> 1): SortPreservingMergeExec: [value@1 DESC] ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2)) diff --git a/testdata/expected-plans/q12.txt b/testdata/expected-plans/q12.txt index f2052fb..681c4c1 100644 --- a/testdata/expected-plans/q12.txt +++ b/testdata/expected-plans/q12.txt @@ -65,7 +65,7 @@ ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "l_shipmo CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2)) -Query Stage #4 (2 -> 1): +Query Stage #4 (1 -> 1): SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST] ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2)) diff --git a/testdata/expected-plans/q13.txt b/testdata/expected-plans/q13.txt index 691f45e..5ddc170 100644 --- a/testdata/expected-plans/q13.txt +++ b/testdata/expected-plans/q13.txt @@ -70,7 +70,7 @@ ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "c_count" CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "c_count", index: 0 }], 2)) -Query Stage #4 (2 -> 1): +Query Stage #4 (1 -> 1): SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC] ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "c_count", index: 0 }], 2)) diff --git a/testdata/expected-plans/q14.txt b/testdata/expected-plans/q14.txt index 81ef8ef..8add1f2 100644 --- a/testdata/expected-plans/q14.txt +++ b/testdata/expected-plans/q14.txt @@ -33,7 +33,7 @@ ProjectionExec: expr=[100 * CAST(sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") T DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[p_partkey, p_type] diff --git a/testdata/expected-plans/q16.txt b/testdata/expected-plans/q16.txt index 5ef333a..07fb019 100644 --- a/testdata/expected-plans/q16.txt +++ b/testdata/expected-plans/q16.txt @@ -48,7 +48,7 @@ SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 CoalesceBatchesExec: target_batch_size=8192 FilterExec: p_brand@1 != Brand#14 AND p_type@2 NOT LIKE SMALL PLATED% AND Use p_size@3 IN (SET) ([Literal { value: Int32(14) }, Literal { value: Int32(6) }, Literal { value: Int32(5) }, Literal { value: Int32(31) }, Literal { value: Int32(49) }, Literal { value: Int32(15) }, Literal { value: Int32(41) }, Literal { value: Int32(47) }]) RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 - ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 5 AND 5 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 31 AND 31 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 49 AND 49 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 15 AND 15 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 41 AND 41 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 47 AND 47 <= p_size_max@5 END), required_guarantees=[p_brand not in (Brand#14), p_size in (5, 41, 49, 15, 6, 31, 47, 14)] + ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 5 AND 5 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 31 AND 31 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 49 AND 49 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 15 AND 15 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 41 AND 41 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 47 AND 47 <= p_size_max@5 END), required_guarantees=[p_brand not in (Brand#14), p_size in (6, 5, 31, 41, 47, 14, 15, 49)] CoalesceBatchesExec: target_batch_size=8192 RepartitionExec: partitioning=Hash([ps_partkey@0], 2), input_partitions=2 ParquetExec: file_groups={ ... }, projection=[ps_partkey, ps_suppkey] @@ -56,17 +56,17 @@ SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: s_comment@1 LIKE %Customer%Complaints%, projection=[s_suppkey@0] ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_comment], predicate=s_comment@6 LIKE %Customer%Complaints% -Query Stage #1 (1 -> 2): +Query Stage #1 (2 -> 2): ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: p_brand@1 != Brand#14 AND p_type@2 NOT LIKE SMALL PLATED% AND Use p_size@3 IN (SET) ([Literal { value: Int32(14) }, Literal { value: Int32(6) }, Literal { value: Int32(5) }, Literal { value: Int32(31) }, Literal { value: Int32(49) }, Literal { value: Int32(15) }, Literal { value: Int32(41) }, Literal { value: Int32(47) }]) - ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 5 AND 5 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 31 AND 31 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 49 AND 49 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 15 AND 15 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 41 AND 41 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 47 AND 47 <= p_size_max@5 END), required_guarantees=[p_brand not in (Brand#14), p_size in (5, 41, 49, 15, 6, 31, 47, 14)] + ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 5 AND 5 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 31 AND 31 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 49 AND 49 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 15 AND 15 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 41 AND 41 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 47 AND 47 <= p_size_max@5 END), required_guarantees=[p_brand not in (Brand#14), p_size in (6, 5, 31, 41, 47, 14, 15, 49)] Query Stage #2 (2 -> 2): ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2)) @@ -107,7 +107,7 @@ ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "p_brand" CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: "p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name: "p_size", index: 2 }], 2)) -Query Stage #7 (2 -> 1): +Query Stage #7 (1 -> 1): SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST] ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name: "p_size", index: 2 }], 2)) diff --git a/testdata/expected-plans/q17.txt b/testdata/expected-plans/q17.txt index 454f0ad..d86d08c 100644 --- a/testdata/expected-plans/q17.txt +++ b/testdata/expected-plans/q17.txt @@ -47,7 +47,7 @@ ProjectionExec: expr=[CAST(sum(lineitem.l_extendedprice)@0 AS Float64) / 7 as av DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: p_brand@1 = Brand#42 AND p_container@2 = LG BAG, projection=[p_partkey@0] diff --git a/testdata/expected-plans/q18.txt b/testdata/expected-plans/q18.txt index 0696af7..468884c 100644 --- a/testdata/expected-plans/q18.txt +++ b/testdata/expected-plans/q18.txt @@ -104,7 +104,7 @@ ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "c_name", CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: "c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: "o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { name: "o_totalprice", index: 4 }], 2)) -Query Stage #7 (2 -> 1): +Query Stage #7 (1 -> 1): SortPreservingMergeExec: [o_totalprice@4 DESC,o_orderdate@3 ASC NULLS LAST], fetch=100 ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: "o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { name: "o_totalprice", index: 4 }], 2)) diff --git a/testdata/expected-plans/q19.txt b/testdata/expected-plans/q19.txt index c98f39e..7b1067d 100644 --- a/testdata/expected-plans/q19.txt +++ b/testdata/expected-plans/q19.txt @@ -35,7 +35,7 @@ ProjectionExec: expr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_disco DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: (p_brand@1 = Brand#21 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND p_size@2 <= 5 OR p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <= 10 OR p_brand@1 = Brand#52 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("LG CASE") }, Literal { value: Utf8("LG BOX") }, Literal { value: Utf8("LG PACK") }, Literal { value: Utf8("LG PKG") }]) AND p_size@2 <= 15) AND p_size@2 >= 1 diff --git a/testdata/expected-plans/q2.txt b/testdata/expected-plans/q2.txt index cb67479..3ac7ebd 100644 --- a/testdata/expected-plans/q2.txt +++ b/testdata/expected-plans/q2.txt @@ -124,21 +124,21 @@ SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 ASC DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "r_regionkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: r_name@1 = ASIA, projection=[r_regionkey@0] ParquetExec: file_groups={ ... }, projection=[r_regionkey, r_name], predicate=r_name@1 = ASIA, pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false ELSE r_name_min@0 <= ASIA AND ASIA <= r_name_max@1 END, required_guarantees=[r_name in (ASIA)] -Query Stage #1 (1 -> 2): +Query Stage #1 (2 -> 2): ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name, n_regionkey] -Query Stage #2 (1 -> 2): +Query Stage #2 (2 -> 2): ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] -Query Stage #3 (1 -> 2): +Query Stage #3 (2 -> 2): ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: p_size@3 = 48 AND p_type@2 LIKE %TIN, projection=[p_partkey@0, p_mfgr@1] @@ -186,17 +186,17 @@ ShuffleWriterExec(stage_id=8, output_partitioning=Hash([Column { name: "p_partke CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=7, input_partitioning=Hash([Column { name: "n_regionkey", index: 9 }], 2)) -Query Stage #9 (1 -> 2): +Query Stage #9 (2 -> 2): ShuffleWriterExec(stage_id=9, output_partitioning=Hash([Column { name: "r_regionkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: r_name@1 = ASIA, projection=[r_regionkey@0] ParquetExec: file_groups={ ... }, projection=[r_regionkey, r_name], predicate=r_name@1 = ASIA, pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false ELSE r_name_min@0 <= ASIA AND ASIA <= r_name_max@1 END, required_guarantees=[r_name in (ASIA)] -Query Stage #10 (1 -> 2): +Query Stage #10 (2 -> 2): ShuffleWriterExec(stage_id=10, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_regionkey] -Query Stage #11 (1 -> 2): +Query Stage #11 (2 -> 2): ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey] @@ -252,7 +252,7 @@ ShuffleWriterExec(stage_id=17, output_partitioning=Hash([Column { name: "p_partk CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=16, input_partitioning=Hash([Column { name: "ps_partkey", index: 1 }, Column { name: "min(partsupp.ps_supplycost)", index: 0 }], 2)) -Query Stage #18 (2 -> 1): +Query Stage #18 (1 -> 1): SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], fetch=100 ShuffleReaderExec(stage_id=17, input_partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2)) diff --git a/testdata/expected-plans/q20.txt b/testdata/expected-plans/q20.txt index 5473093..3ab727e 100644 --- a/testdata/expected-plans/q20.txt +++ b/testdata/expected-plans/q20.txt @@ -76,13 +76,13 @@ SortPreservingMergeExec: [s_name@0 ASC NULLS LAST] DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: n_name@1 = KENYA, projection=[n_nationkey@0] ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name], predicate=n_name@1 = KENYA, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= KENYA AND KENYA <= n_name_max@1 END, required_guarantees=[n_name in (KENYA)] -Query Stage #1 (1 -> 2): +Query Stage #1 (2 -> 2): ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "s_nationkey", index: 3 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_name, s_address, s_nationkey] @@ -95,7 +95,7 @@ ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "s_suppke CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "s_nationkey", index: 3 }], 2)) -Query Stage #3 (1 -> 2): +Query Stage #3 (2 -> 2): ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: p_name@1 LIKE blanched%, projection=[p_partkey@0] @@ -142,7 +142,7 @@ ShuffleWriterExec(stage_id=8, output_partitioning=Hash([], 2)) CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=7, input_partitioning=Hash([Column { name: "ps_suppkey", index: 0 }], 2)) -Query Stage #9 (2 -> 1): +Query Stage #9 (1 -> 1): SortPreservingMergeExec: [s_name@0 ASC NULLS LAST] ShuffleReaderExec(stage_id=8, input_partitioning=Hash([], 2)) diff --git a/testdata/expected-plans/q21.txt b/testdata/expected-plans/q21.txt index dbd5e97..52f1862 100644 --- a/testdata/expected-plans/q21.txt +++ b/testdata/expected-plans/q21.txt @@ -91,7 +91,7 @@ SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS LAST], fetch=100 DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: n_name@1 = ARGENTINA, projection=[n_nationkey@0] @@ -103,7 +103,7 @@ ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "o_orderk FilterExec: o_orderstatus@1 = F, projection=[o_orderkey@0] ParquetExec: file_groups={ ... }, projection=[o_orderkey, o_orderstatus], predicate=o_orderstatus@2 = F, pruning_predicate=CASE WHEN o_orderstatus_null_count@2 = o_orderstatus_row_count@3 THEN false ELSE o_orderstatus_min@0 <= F AND F <= o_orderstatus_max@1 END, required_guarantees=[o_orderstatus in (F)] -Query Stage #2 (1 -> 2): +Query Stage #2 (2 -> 2): ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_name, s_nationkey] @@ -172,7 +172,7 @@ ShuffleWriterExec(stage_id=10, output_partitioning=Hash([Column { name: "s_name" CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=9, input_partitioning=Hash([Column { name: "s_name", index: 0 }], 2)) -Query Stage #11 (2 -> 1): +Query Stage #11 (1 -> 1): SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS LAST], fetch=100 ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "s_name", index: 0 }], 2)) diff --git a/testdata/expected-plans/q22.txt b/testdata/expected-plans/q22.txt index d46d5d5..1e3c4ad 100644 --- a/testdata/expected-plans/q22.txt +++ b/testdata/expected-plans/q22.txt @@ -91,7 +91,7 @@ ShuffleWriterExec(stage_id=4, output_partitioning=Hash([Column { name: "cntrycod CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "cntrycode", index: 0 }], 2)) -Query Stage #5 (2 -> 1): +Query Stage #5 (1 -> 1): SortPreservingMergeExec: [cntrycode@0 ASC NULLS LAST] ShuffleReaderExec(stage_id=4, input_partitioning=Hash([Column { name: "cntrycode", index: 0 }], 2)) diff --git a/testdata/expected-plans/q3.txt b/testdata/expected-plans/q3.txt index 6fd8791..8f4e0c2 100644 --- a/testdata/expected-plans/q3.txt +++ b/testdata/expected-plans/q3.txt @@ -97,7 +97,7 @@ ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name: "l_orderk CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=4, input_partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 2)) -Query Stage #6 (2 -> 1): +Query Stage #6 (1 -> 1): SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST], fetch=10 ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 2 }, Column { name: "o_shippriority", index: 3 }], 2)) diff --git a/testdata/expected-plans/q4.txt b/testdata/expected-plans/q4.txt index 20460e4..2504483 100644 --- a/testdata/expected-plans/q4.txt +++ b/testdata/expected-plans/q4.txt @@ -70,7 +70,7 @@ ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "o_orderp CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "o_orderpriority", index: 0 }], 2)) -Query Stage #4 (2 -> 1): +Query Stage #4 (1 -> 1): SortPreservingMergeExec: [o_orderpriority@0 ASC NULLS LAST] ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "o_orderpriority", index: 0 }], 2)) diff --git a/testdata/expected-plans/q5.txt b/testdata/expected-plans/q5.txt index 5351e06..25c047b 100644 --- a/testdata/expected-plans/q5.txt +++ b/testdata/expected-plans/q5.txt @@ -83,17 +83,17 @@ SortPreservingMergeExec: [revenue@1 DESC] DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "r_regionkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: r_name@1 = AFRICA, projection=[r_regionkey@0] ParquetExec: file_groups={ ... }, projection=[r_regionkey, r_name], predicate=r_name@1 = AFRICA, pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false ELSE r_name_min@0 <= AFRICA AND AFRICA <= r_name_max@1 END, required_guarantees=[r_name in (AFRICA)] -Query Stage #1 (1 -> 2): +Query Stage #1 (2 -> 2): ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name, n_regionkey] -Query Stage #2 (1 -> 2): +Query Stage #2 (2 -> 2): ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }, Column { name: "s_nationkey", index: 1 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey] @@ -167,7 +167,7 @@ ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: "n_name" CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "n_name", index: 0 }], 2)) -Query Stage #12 (2 -> 1): +Query Stage #12 (1 -> 1): SortPreservingMergeExec: [revenue@1 DESC] ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name: "n_name", index: 0 }], 2)) diff --git a/testdata/expected-plans/q7.txt b/testdata/expected-plans/q7.txt index b9e261a..37e3b27 100644 --- a/testdata/expected-plans/q7.txt +++ b/testdata/expected-plans/q7.txt @@ -89,13 +89,13 @@ SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS L DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: n_name@1 = IRAQ OR n_name@1 = GERMANY ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name], predicate=n_name@1 = IRAQ OR n_name@1 = GERMANY, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= IRAQ AND IRAQ <= n_name_max@1 END OR CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= GERMANY AND GERMANY <= n_name_max@1 END, required_guarantees=[n_name in (GERMANY, IRAQ)] -Query Stage #1 (1 -> 2): +Query Stage #1 (2 -> 2): ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: n_name@1 = GERMANY OR n_name@1 = IRAQ @@ -105,7 +105,7 @@ Query Stage #2 (2 -> 2): ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[c_custkey, c_nationkey] -Query Stage #3 (1 -> 2): +Query Stage #3 (2 -> 2): ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey] @@ -176,7 +176,7 @@ ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: "supp_na CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { name: "l_year", index: 2 }], 2)) -Query Stage #12 (2 -> 1): +Query Stage #12 (1 -> 1): SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS LAST,l_year@2 ASC NULLS LAST] ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { name: "l_year", index: 2 }], 2)) diff --git a/testdata/expected-plans/q8.txt b/testdata/expected-plans/q8.txt index f2333a4..d016d84 100644 --- a/testdata/expected-plans/q8.txt +++ b/testdata/expected-plans/q8.txt @@ -114,17 +114,17 @@ SortPreservingMergeExec: [o_year@0 ASC NULLS LAST] DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "r_regionkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: r_name@1 = MIDDLE EAST, projection=[r_regionkey@0] ParquetExec: file_groups={ ... }, projection=[r_regionkey, r_name], predicate=r_name@1 = MIDDLE EAST, pruning_predicate=CASE WHEN r_name_null_count@2 = r_name_row_count@3 THEN false ELSE r_name_min@0 <= MIDDLE EAST AND MIDDLE EAST <= r_name_max@1 END, required_guarantees=[r_name in (MIDDLE EAST)] -Query Stage #1 (1 -> 2): +Query Stage #1 (2 -> 2): ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name] -Query Stage #2 (1 -> 2): +Query Stage #2 (2 -> 2): ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_regionkey] @@ -138,11 +138,11 @@ ShuffleWriterExec(stage_id=4, output_partitioning=Hash([Column { name: "o_orderk FilterExec: o_orderdate@2 >= 1995-01-01 AND o_orderdate@2 <= 1996-12-31 ParquetExec: file_groups={ ... }, projection=[o_orderkey, o_custkey, o_orderdate], predicate=o_orderdate@4 >= 1995-01-01 AND o_orderdate@4 <= 1996-12-31, pruning_predicate=CASE WHEN o_orderdate_null_count@1 = o_orderdate_row_count@2 THEN false ELSE o_orderdate_max@0 >= 1995-01-01 END AND CASE WHEN o_orderdate_null_count@1 = o_orderdate_row_count@2 THEN false ELSE o_orderdate_min@3 <= 1996-12-31 END, required_guarantees=[] -Query Stage #5 (1 -> 2): +Query Stage #5 (2 -> 2): ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey] -Query Stage #6 (1 -> 2): +Query Stage #6 (2 -> 2): ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: p_type@1 = LARGE PLATED STEEL, projection=[p_partkey@0] @@ -230,7 +230,7 @@ ShuffleWriterExec(stage_id=15, output_partitioning=Hash([Column { name: "o_year" CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=14, input_partitioning=Hash([Column { name: "o_year", index: 0 }], 2)) -Query Stage #16 (2 -> 1): +Query Stage #16 (1 -> 1): SortPreservingMergeExec: [o_year@0 ASC NULLS LAST] ShuffleReaderExec(stage_id=15, input_partitioning=Hash([Column { name: "o_year", index: 0 }], 2)) diff --git a/testdata/expected-plans/q9.txt b/testdata/expected-plans/q9.txt index 8f738f4..b26aef8 100644 --- a/testdata/expected-plans/q9.txt +++ b/testdata/expected-plans/q9.txt @@ -82,7 +82,7 @@ SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC] DataFusion Ray Distributed Plan =========== -Query Stage #0 (1 -> 2): +Query Stage #0 (2 -> 2): ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name] @@ -94,11 +94,11 @@ Query Stage #2 (2 -> 2): ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "ps_suppkey", index: 1 }, Column { name: "ps_partkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[ps_partkey, ps_suppkey, ps_supplycost] -Query Stage #3 (1 -> 2): +Query Stage #3 (2 -> 2): ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2)) ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey] -Query Stage #4 (1 -> 2): +Query Stage #4 (2 -> 2): ShuffleWriterExec(stage_id=4, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2)) CoalesceBatchesExec: target_batch_size=8192 FilterExec: p_name@1 LIKE %moccasin%, projection=[p_partkey@0] @@ -166,7 +166,7 @@ ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: "nation" CoalesceBatchesExec: target_batch_size=8192 ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2)) -Query Stage #12 (2 -> 1): +Query Stage #12 (1 -> 1): SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC] ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2)) diff --git a/datafusion_ray/tests/test_context.py b/tests/test_context.py similarity index 67% rename from datafusion_ray/tests/test_context.py rename to tests/test_context.py index 40b2578..6e1b511 100644 --- a/datafusion_ray/tests/test_context.py +++ b/tests/test_context.py @@ -15,12 +15,20 @@ # specific language governing permissions and limitations # under the License. -from datafusion_ray import Context +from datafusion_ray.context import DatafusionRayContext from datafusion import SessionContext -def test(): +def test_basic_query_succeed(): df_ctx = SessionContext() - ctx = Context(df_ctx, False) + ctx = DatafusionRayContext(df_ctx) df_ctx.register_csv("tips", "examples/tips.csv", has_header=True) - ctx.plan("SELECT * FROM tips") + record_batch = ctx.sql("SELECT * FROM tips") + assert record_batch.num_rows == 244 + + +def test_no_result_query(): + df_ctx = SessionContext() + ctx = DatafusionRayContext(df_ctx) + df_ctx.register_csv("tips", "examples/tips.csv", has_header=True) + ctx.sql("CREATE VIEW tips_view AS SELECT * FROM tips")