Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
pull_request:

env:
DEFAULT_PYTHON: "3.12"
DEFAULT_PYTHON: "3.13"
DEFAULT_OS: ubuntu-latest

jobs:
Expand All @@ -30,14 +30,9 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install ".[dev,spark]"
wget https://dlcdn.apache.org/spark/spark-4.0.1/spark-4.0.1-bin-hadoop3.tgz
tar -xzf spark-4.0.1-bin-hadoop3.tgz
export SPARK_HOME=$(pwd)/spark-4.0.1-bin-hadoop3
export PATH=$SPARK_HOME/sbin:$PATH
start-thriftserver.sh
- name: Run pytest with coverage
run: |
CHRONIFY_HIVE_URL=hive://localhost:10000/default pytest -v --cov --cov-report=xml
pytest -v --cov --cov-report=xml
- name: codecov
uses: codecov/codecov-action@v4.2.0
if: ${{ matrix.os == env.DEFAULT_OS && matrix.python-version == env.DEFAULT_PYTHON }}
Expand All @@ -53,11 +48,11 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.12
python-version: 3.13
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[dev]"
python -m pip install ".[dev,pyspark]"
mypy
ruff:
runs-on: ubuntu-latest
Expand Down
143 changes: 122 additions & 21 deletions docs/how_tos/spark_backend.md
Original file line number Diff line number Diff line change
@@ -1,48 +1,52 @@
# Apache Spark Backend

This guide shows how to use Chronify with Apache Spark for processing large time series datasets
that are too large for DuckDB on a single machine.

## Prerequisites

Download Spark from https://spark.apache.org/downloads.html and install it. Spark provides startup
scripts for UNIX operating systems (not Windows).

## Install chronify with Spark support
```
$ pip install chronify --group=pyhive
```bash
pip install chronify[spark]
```

## Installation on a development computer
Installation can be as simple as
```
$ tar -xzf spark-4.0.1-bin-hadoop3.tgz
$ export SPARK_HOME=$(pwd)/spark-4.0.1-bin-hadoop3
```

Start a Thrift server. This allows JDBC clients to send SQL queries to an in-process Spark cluster
running in local mode.
```
$ $SPARK_HOME/sbin/start-thriftserver.sh --master=spark://$(hostname):7077
Installation can be as simple as:
```bash
tar -xzf spark-4.0.0-bin-hadoop3.tgz
export SPARK_HOME=$(pwd)/spark-4.0.0-bin-hadoop3
```

The URL to connect to this server is `hive://localhost:10000/default`

## Installation on an HPC
The chronify development team uses these
[scripts](https://github.com/NREL/HPC/tree/master/applications/spark) to run Spark on NREL's HPC.
The chronify development team uses the
[package](https://github.com/NREL/sparkctl) to run Spark on NLR's HPC.

## Chronify Usage

This example creates a chronify Store with Spark as the backend and then adds a view to a Parquet
file. Chronify will run its normal time checks.

First, create the Parquet file and chronify schema.
**Note:** The Spark backend is designed primarily for reading data from Parquet files via views.
Direct data ingestion (inserting DataFrames) is not supported with Spark.

### Step 1: Create a Parquet file and schema

```python
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
from chronify import DatetimeRange, Store, TableSchema, CsvTableSchema
from chronify import DatetimeRange, Store, TableSchema

initial_time = datetime(2020, 1, 1)
end_time = datetime(2020, 12, 31, 23)
resolution = timedelta(hours=1)
timestamps = pd.date_range(initial_time, end_time, freq=resolution, unit="us")

dfs = []
for i in range(1, 4):
df = pd.DataFrame(
Expand All @@ -55,6 +59,7 @@ for i in range(1, 4):
dfs.append(df)
df = pd.concat(dfs)
df.to_parquet("data.parquet", index=False)

schema = TableSchema(
name="devices",
value_column="value",
Expand All @@ -68,14 +73,25 @@ schema = TableSchema(
)
```

### Step 2: Create a Spark store and add a view

```python
from chronify import Store

store = Store.create_new_hive_store("hive://localhost:10000/default")
store.create_view_from_parquet("data.parquet")
# Create a new Spark store (this will create a local SparkSession)
store = Store.create_new_spark_store()

# Or pass an existing SparkSession:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("chronify").getOrCreate()
# store = Store.create_new_spark_store(session=spark)

# Create a view from the Parquet file
store.create_view_from_parquet("data.parquet", schema)
```

Verify the data:
### Step 3: Verify the data

```python
store.read_table(schema.name).head()
```
Expand All @@ -88,9 +104,94 @@ store.read_table(schema.name).head()
4 2020-01-01 04:00:00 1 0.994851
```

## Registering Ibis tables

If you're using Ibis to load and transform data before passing it to chronify for time validation,
you must use `create_view`. This is useful when integrating with
other packages like dsgrid that use Ibis for data loading and transformation.

### Validation only (recommended for most workflows)

Use `create_view` when you only need time validation and don't need the data to persist
in chronify after the session ends:

```python
import ibis
from chronify import Store, TableSchema, DatetimeRange
from datetime import datetime, timedelta

# Load data with Ibis (can be from any Ibis-supported backend)
conn = ibis.pyspark.connect(spark_session)
table = conn.read_parquet("data.parquet")

# Optionally transform the data (rename columns, filter, etc.)
table = table.rename({"old_timestamp": "timestamp"})

# Create the chronify schema
schema = TableSchema(
name="devices",
value_column="value",
time_config=DatetimeRange(
time_column="timestamp",
start=datetime(2020, 1, 1),
length=8784,
resolution=timedelta(hours=1),
),
time_array_id_columns=["id"],
)

# Register with chronify for time validation only
store = Store.create_new_spark_store(session=spark_session)
store.create_view(schema, table)
```

Both the temp view and schema are registered and available during the session.

### Persistent registration (requires Hive)

Use `create_view` when you need the view and schema to persist across sessions.
This requires Spark to be configured with Hive metastore support.

#### Configuring Hive metastore

To enable Hive support, create your SparkSession with `enableHiveSupport()`:

```python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("chronify") \
.config("spark.sql.warehouse.dir", "/path/to/spark-warehouse") \
.enableHiveSupport() \
.getOrCreate()

store = Store.create_new_spark_store(session=spark)
```

## Time configuration mapping

The primary use case for Spark is to map datasets that are larger than can be processed by DuckDB
on one computer. In such a workflow a user would call
on one computer. In such a workflow a user would call:

```python
store.map_table_time_config(src_table_name, dst_schema, output_file="mapped_data.parquet")
```

This writes the mapped data to a Parquet file that can then be used with any backend.

## Backend differences

The Spark backend differs from DuckDB and SQLite in the following ways:

| Feature | DuckDB/SQLite | Spark |
|---------|---------------|-------|
| Data ingestion (`ingest_table`) | Yes | No |
| View from Parquet | Yes | Yes |
| Register Ibis table (`create_view`) | Yes | Yes |
| Delete rows | Yes | No |
| In-memory storage | Yes | No (views only) |
| Time mapping to Parquet | Yes | Yes |

For smaller datasets that fit in memory, DuckDB offers better performance and full feature support.
Use Spark when working with datasets too large for a single machine or when integrating with
existing Spark infrastructure.
27 changes: 11 additions & 16 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,20 @@ classifiers = [
]
dependencies = [
"duckdb ~= 1.1.0",
"duckdb_engine",
"ibis-framework[duckdb,sqlite] >= 9.0",
"loguru",
"pandas >= 2.2, < 3",
"pyarrow",
"pydantic >= 2.7, < 3",
"pytz",
"rich",
"sqlalchemy == 2.0.37",
"tzdata",
# Required by pyhive
"future",
"python-dateutil",
]

[project.optional-dependencies]
spark = [
"thrift",
"thrift_sasl",
"ibis-framework[pyspark]",
"pyspark == 4.0.0",
]

dev = [
Expand All @@ -63,11 +59,6 @@ dev = [
"sphinx-tabs~=3.4",
]

[project.entry-points."sqlalchemy.dialects"]
hive = "pyhive.sqlalchemy_hive:HiveDialect"
"hive.http" = "pyhive.sqlalchemy_hive:HiveHTTPDialect"
"hive.https" = "pyhive.sqlalchemy_hive:HiveHTTPSDialect"

[project.urls]
Documentation = "https://github.com/NREL/chronify#readme"
Issues = "https://github.com/NREL/chronify/issues"
Expand All @@ -77,11 +68,16 @@ Source = "https://github.com/NREL/chronify"
files = [
"src",
]
exclude = [
"src/chronify/_vendor/*",
]
strict = true

[[tool.mypy.overrides]]
module = [
"ibis.*",
"pyarrow.*",
"pyspark.*",
]
ignore_missing_imports = true

[tool.pytest.ini_options]
pythonpath = "src"
minversion = "6.0"
Expand All @@ -99,7 +95,6 @@ exclude = [
"dist",
"env",
"venv",
"src/chronify/_vendor/*",
]

line-length = 99
Expand Down
8 changes: 0 additions & 8 deletions src/chronify/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import importlib.metadata as metadata
import sys

from chronify._vendor.kyuubi import TCLIService
from chronify._vendor.kyuubi import pyhive
from chronify.exceptions import (
ChronifyExceptionBase,
ConflictingInputsError,
Expand Down Expand Up @@ -60,8 +57,3 @@
)

__version__ = metadata.metadata("chronify")["Version"]


# Make pyhive importable as if it were installed separately.
sys.modules["pyhive"] = pyhive
sys.modules["TCLIService"] = TCLIService
Loading