Skip to content

Commit 64637fa

Browse files
authored
[QDP] Integrate Apache Arrow and Parquet for data processing (#680)
* Integrate Apache Arrow & Parquet for data processing * Optimize Arrow Float64Array handling in io * Add chunked Arrow Float64Array support * Refactor encoding to support chunked Arrow Float64Array input * Refactor I/O and encoding documentation to remove zero-copy
1 parent f7ce244 commit 64637fa

File tree

8 files changed

+1640
-31
lines changed

8 files changed

+1640
-31
lines changed

qdp/Cargo.lock

Lines changed: 1133 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

qdp/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ cc = "1.2"
2424
thiserror = "2.0"
2525
# Parallel computing (for CPU preprocessing)
2626
rayon = "1.10"
27+
# Apache Arrow for columnar data format support
28+
arrow = "54"
29+
# Parquet support for Arrow
30+
parquet = "54"
2731

2832
# Release profile optimizations
2933
[profile.release]

qdp/qdp-core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ qdp-kernels = { path = "../qdp-kernels" }
99
thiserror = { workspace = true }
1010
rayon = { workspace = true }
1111
nvtx = { version = "1.3", optional = true }
12+
arrow = { workspace = true }
13+
parquet = { workspace = true }
1214

1315
[lib]
1416
name = "qdp_core"

qdp/qdp-core/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub enum MahoutError {
3333

3434
#[error("DLPack operation failed: {0}")]
3535
DLPack(String),
36+
37+
#[error("I/O error: {0}")]
38+
Io(String),
3639
}
3740

3841
/// Result type alias for Mahout operations

qdp/qdp-core/src/gpu/encodings/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
// Quantum encoding strategies (Strategy Pattern)
1818

1919
use std::sync::Arc;
20+
use arrow::array::Float64Array;
2021
use cudarc::driver::CudaDevice;
2122
use crate::error::Result;
2223
use crate::gpu::memory::GpuStateVector;
@@ -33,6 +34,20 @@ pub trait QuantumEncoder: Send + Sync {
3334
num_qubits: usize,
3435
) -> Result<GpuStateVector>;
3536

37+
/// Encode from chunked Arrow arrays
38+
///
39+
/// Default implementation flattens chunks. (TODO: Encoders can override for true zero-copy.)
40+
fn encode_chunked(
41+
&self,
42+
device: &Arc<CudaDevice>,
43+
chunks: &[Float64Array],
44+
num_qubits: usize,
45+
) -> Result<GpuStateVector> {
46+
// Default: flatten and use regular encode
47+
let data = crate::io::arrow_to_vec_chunked(chunks);
48+
self.encode(device, &data, num_qubits)
49+
}
50+
3651
/// Validate input data before encoding
3752
fn validate_input(&self, data: &[f64], num_qubits: usize) -> Result<()> {
3853
Preprocessor::validate_input(data, num_qubits)

qdp/qdp-core/src/io.rs

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one or more
3+
// contributor license agreements. See the NOTICE file distributed with
4+
// this work for additional information regarding copyright ownership.
5+
// The ASF licenses this file to You under the Apache License, Version 2.0
6+
// (the "License"); you may not use this file except in compliance with
7+
// the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
//! I/O module for reading and writing quantum data
18+
//!
19+
//! This module provides efficient columnar data exchange with the data science ecosystem,
20+
21+
use std::fs::File;
22+
use std::path::Path;
23+
use std::sync::Arc;
24+
25+
use arrow::array::{Array, ArrayRef, Float64Array, RecordBatch};
26+
use arrow::datatypes::{DataType, Field, Schema};
27+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
28+
use parquet::arrow::ArrowWriter;
29+
use parquet::file::properties::WriterProperties;
30+
31+
use crate::error::{MahoutError, Result};
32+
33+
/// Convert Arrow Float64Array to Vec<f64>
34+
///
35+
/// Uses Arrow's internal buffer directly if no nulls, otherwise copies
36+
pub fn arrow_to_vec(array: &Float64Array) -> Vec<f64> {
37+
if array.null_count() == 0 {
38+
array.values().to_vec()
39+
} else {
40+
array.iter().map(|opt| opt.unwrap_or(0.0)).collect()
41+
}
42+
}
43+
44+
/// Convert chunked Arrow Float64Array to Vec<f64>
45+
///
46+
/// Efficiently flattens multiple Arrow arrays into a single Vec
47+
pub fn arrow_to_vec_chunked(arrays: &[Float64Array]) -> Vec<f64> {
48+
let total_len: usize = arrays.iter().map(|a| a.len()).sum();
49+
let mut result = Vec::with_capacity(total_len);
50+
51+
for array in arrays {
52+
if array.null_count() == 0 {
53+
result.extend_from_slice(array.values());
54+
} else {
55+
result.extend(array.iter().map(|opt| opt.unwrap_or(0.0)));
56+
}
57+
}
58+
59+
result
60+
}
61+
62+
/// Reads quantum data from a Parquet file.
63+
///
64+
/// Expects a single column named "data" containing Float64 values.
65+
/// This function performs one copy from Arrow to Vec.
66+
/// use `read_parquet_to_arrow` instead.
67+
///
68+
/// # Arguments
69+
/// * `path` - Path to the Parquet file
70+
///
71+
/// # Returns
72+
/// Vector of f64 values from the first column
73+
///
74+
/// # Example
75+
/// ```no_run
76+
/// use qdp_core::io::read_parquet;
77+
///
78+
/// let data = read_parquet("quantum_data.parquet").unwrap();
79+
/// ```
80+
pub fn read_parquet<P: AsRef<Path>>(path: P) -> Result<Vec<f64>> {
81+
let chunks = read_parquet_to_arrow(path)?;
82+
Ok(arrow_to_vec_chunked(&chunks))
83+
}
84+
85+
/// Writes quantum data to a Parquet file.
86+
///
87+
/// Creates a single column named "data" containing Float64 values.
88+
///
89+
/// # Arguments
90+
/// * `path` - Path to write the Parquet file
91+
/// * `data` - Vector of f64 values to write
92+
/// * `column_name` - Optional column name (defaults to "data")
93+
///
94+
/// # Example
95+
/// ```no_run
96+
/// use qdp_core::io::write_parquet;
97+
///
98+
/// let data = vec![0.5, 0.5, 0.5, 0.5];
99+
/// write_parquet("quantum_data.parquet", &data, None).unwrap();
100+
/// ```
101+
pub fn write_parquet<P: AsRef<Path>>(
102+
path: P,
103+
data: &[f64],
104+
column_name: Option<&str>,
105+
) -> Result<()> {
106+
if data.is_empty() {
107+
return Err(MahoutError::InvalidInput(
108+
"Cannot write empty data to Parquet".to_string(),
109+
));
110+
}
111+
112+
let col_name = column_name.unwrap_or("data");
113+
114+
// Create Arrow schema
115+
let schema = Arc::new(Schema::new(vec![Field::new(
116+
col_name,
117+
DataType::Float64,
118+
false,
119+
)]));
120+
121+
// Create Float64Array from slice
122+
let array = Float64Array::from_iter_values(data.iter().copied());
123+
let array_ref: ArrayRef = Arc::new(array);
124+
125+
// Create RecordBatch
126+
let batch = RecordBatch::try_new(schema.clone(), vec![array_ref]).map_err(|e| {
127+
MahoutError::Io(format!("Failed to create RecordBatch: {}", e))
128+
})?;
129+
130+
// Write to Parquet file
131+
let file = File::create(path.as_ref()).map_err(|e| {
132+
MahoutError::Io(format!("Failed to create Parquet file: {}", e))
133+
})?;
134+
135+
let props = WriterProperties::builder().build();
136+
let mut writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| {
137+
MahoutError::Io(format!("Failed to create Parquet writer: {}", e))
138+
})?;
139+
140+
writer.write(&batch).map_err(|e| {
141+
MahoutError::Io(format!("Failed to write Parquet batch: {}", e))
142+
})?;
143+
144+
writer.close().map_err(|e| {
145+
MahoutError::Io(format!("Failed to close Parquet writer: {}", e))
146+
})?;
147+
148+
Ok(())
149+
}
150+
151+
/// Reads quantum data from a Parquet file as Arrow arrays.
152+
///
153+
/// Returns Arrow arrays directly from Parquet batches.
154+
/// Each element in the returned Vec corresponds to one Parquet batch.
155+
///
156+
/// Directly constructs the Arrow array from Parquet batches
157+
///
158+
/// # Arguments
159+
/// * `path` - Path to the Parquet file
160+
///
161+
/// # Returns
162+
/// Vector of Float64Arrays, one per Parquet batch
163+
pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) -> Result<Vec<Float64Array>> {
164+
let file = File::open(path.as_ref()).map_err(|e| {
165+
MahoutError::Io(format!("Failed to open Parquet file: {}", e))
166+
})?;
167+
168+
let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
169+
MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
170+
})?;
171+
172+
let mut reader = builder.build().map_err(|e| {
173+
MahoutError::Io(format!("Failed to build Parquet reader: {}", e))
174+
})?;
175+
176+
let mut arrays = Vec::new();
177+
178+
while let Some(batch_result) = reader.next() {
179+
let batch = batch_result.map_err(|e| {
180+
MahoutError::Io(format!("Failed to read Parquet batch: {}", e))
181+
})?;
182+
183+
if batch.num_columns() == 0 {
184+
return Err(MahoutError::Io(
185+
"Parquet file has no columns".to_string(),
186+
));
187+
}
188+
189+
let column = batch.column(0);
190+
if !matches!(column.data_type(), DataType::Float64) {
191+
return Err(MahoutError::Io(format!(
192+
"Expected Float64 column, got {:?}",
193+
column.data_type()
194+
)));
195+
}
196+
197+
// Clone the Float64Array (reference-counted, no data copy)
198+
let float_array = column
199+
.as_any()
200+
.downcast_ref::<Float64Array>()
201+
.ok_or_else(|| {
202+
MahoutError::Io("Failed to downcast to Float64Array".to_string())
203+
})?
204+
.clone();
205+
206+
arrays.push(float_array);
207+
}
208+
209+
if arrays.is_empty() {
210+
return Err(MahoutError::Io(
211+
"Parquet file contains no data".to_string(),
212+
));
213+
}
214+
215+
Ok(arrays)
216+
}
217+
218+
/// Writes an Arrow Float64Array to a Parquet file.
219+
///
220+
/// Writes from Arrow format to Parquet.
221+
///
222+
/// # Arguments
223+
/// * `path` - Path to write the Parquet file
224+
/// * `array` - Float64Array to write
225+
/// * `column_name` - Optional column name (defaults to "data")
226+
pub fn write_arrow_to_parquet<P: AsRef<Path>>(
227+
path: P,
228+
array: &Float64Array,
229+
column_name: Option<&str>,
230+
) -> Result<()> {
231+
if array.is_empty() {
232+
return Err(MahoutError::InvalidInput(
233+
"Cannot write empty array to Parquet".to_string(),
234+
));
235+
}
236+
237+
let col_name = column_name.unwrap_or("data");
238+
239+
// Create Arrow schema
240+
let schema = Arc::new(Schema::new(vec![Field::new(
241+
col_name,
242+
DataType::Float64,
243+
false,
244+
)]));
245+
246+
let array_ref: ArrayRef = Arc::new(array.clone());
247+
248+
// Create RecordBatch
249+
let batch = RecordBatch::try_new(schema.clone(), vec![array_ref]).map_err(|e| {
250+
MahoutError::Io(format!("Failed to create RecordBatch: {}", e))
251+
})?;
252+
253+
// Write to Parquet file
254+
let file = File::create(path.as_ref()).map_err(|e| {
255+
MahoutError::Io(format!("Failed to create Parquet file: {}", e))
256+
})?;
257+
258+
let props = WriterProperties::builder().build();
259+
let mut writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| {
260+
MahoutError::Io(format!("Failed to create Parquet writer: {}", e))
261+
})?;
262+
263+
writer.write(&batch).map_err(|e| {
264+
MahoutError::Io(format!("Failed to write Parquet batch: {}", e))
265+
})?;
266+
267+
writer.close().map_err(|e| {
268+
MahoutError::Io(format!("Failed to close Parquet writer: {}", e))
269+
})?;
270+
271+
Ok(())
272+
}

0 commit comments

Comments
 (0)