Skip to content

Conversation

@awaismirza92
Copy link
Collaborator

@awaismirza92 awaismirza92 commented Dec 2, 2025

Closes: #50 & #52

@awaismirza92 awaismirza92 linked an issue Dec 2, 2025 that may be closed by this pull request
@awaismirza92 awaismirza92 self-assigned this Dec 2, 2025
@awaismirza92 awaismirza92 marked this pull request as ready for review December 2, 2025 15:43
@awaismirza92 awaismirza92 requested a review from srnnkls December 2, 2025 15:43
srnnkls

This comment was marked as outdated.

@srnnkls
Copy link
Collaborator

srnnkls commented Dec 3, 2025

Review Summary

Style Reference: Python Style Guide (getml/code17-northstar#18)


Context: Requirements from #42

This PR implements Databricks ingestion, mirroring the pattern established in #42 (Build data preparation infrastructure for feature store notebooks). Issue #42 defines the expected architecture:

integration/{platform}/
├── data/
│   ├── ingestion.py      # GCS → Platform loader
│   ├── preparation.py    # Orchestration module  
│   └── sql/              # Externalized SQL queries
└── tests/

Expected usage pattern from #42:

from integration.{platform}.data import ingestion, preparation

ingestion.load_from_gcs(
    bucket="gs://static.getml.com/datasets/jaffle-shop/",
    destination_schema="RAW"
)

The key expectation: data warehouses/platforms should use their native capabilities to ingest from GCS - not download through Python.


Critical Deviation: Architecture

The current implementation downloads parquet files to local memory via requests.get() + pandas, then converts to Spark DataFrame. This fundamentally misunderstands how Spark/Databricks works.

Aspect Expected (per #42 pattern) Actual Implementation
Data flow GCS → Spark → Delta (direct) GCS → Python memory → pandas → Spark → Delta
Scalability Distributed across cluster Limited by local memory
Dependencies pyspark, databricks-connect + pandas, requests, pyarrow
Performance Native Spark parallelism Single-threaded download

Correct approach:

# Spark reads parquet directly from URL - no local memory needed
spark.read.parquet(source_url).write.format("delta").saveAsTable(target_table)

Deviations from #42 Structure

Requirement from #42 Status in PR
ingestion.py module ✓ Present (but wrong approach)
preparation.py module ✗ Missing (but referenced in README)
sql/ directory ✗ Missing
Integration tests ✗ Missing
pyproject.toml ✗ Uses requirements.txt instead

Issues Summary

Critical (blocking):

  • Architecture fundamentally wrong - must use Spark's native parquet reading

High priority:

  • DEFAULT_PROFILE = "Code17" - hardcoded personal config
  • Relative import in CLI script (from data import ingestion)
  • requirements.txt instead of pyproject.toml (project uses uv)
  • SQL injection potential in schema/catalog interpolation

Medium priority:

  • Cryptic variable names (pdf, sdf)
  • README references non-existent preparation module
  • README includes irrelevant Python version troubleshooting
  • Empty __init__.py without __all__ exports
  • Broad except Exception handling

Recommended Changes

  1. Rewrite ingestion to use Spark native reading:

    def load_table(spark: SparkSession, source_url: str, target_table: str) -> int:
        df = spark.read.parquet(source_url)
        df.write.format("delta").mode("overwrite").saveAsTable(target_table)
        return df.count()
  2. Remove pandas/requests dependencies - they're not needed

  3. Add pyproject.toml with uv-compatible structure

  4. Either add preparation.py or remove references from README

  5. Validate SQL identifiers before interpolation

  6. Use absolute imports throughout

@awaismirza92
Copy link
Collaborator Author

@srnnkls I am done with my responses to your previous comments. Please have a look.

Copy link
Collaborator

@srnnkls srnnkls left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid code overall. Nothing blocking, mostly style and minor suggestions.

Copy link
Collaborator

@srnnkls srnnkls left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One blocking issue: remove the os.environ mutation at line 168. The rest are minor suggestions.

@awaismirza92
Copy link
Collaborator Author

One blocking issue: remove the os.environ mutation at line 168. The rest are minor suggestions.

@srnnkls I have addressed both the blocking issue and the minor suggestions. Have a look again.

Comment on lines 80 to 100
def _validate_sql_identifier(value: str) -> str:
"""
Validate SQL identifier to prevent injection attacks.

Args:
value: Identifier to validate.

Returns:
The validated identifier.

Raises:
ValueError: If identifier contains invalid characters.
"""
if not _IDENTIFIER_PATTERN.fullmatch(value):
msg = (
f"Invalid SQL identifier {value!r}. "
f"Must match pattern: {_IDENTIFIER_PATTERN.pattern!r}"
)
raise ValueError(msg)

return value
Copy link
Collaborator

@srnnkls srnnkls Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from typing import Annotated
from sqlglot import exp
from pydantic import AfterValidator

def _quote_identifier(raw_identifier: str, dialect: str = "databricks") -> str:
    return exp.to_identifier(raw_identifier).sql(dialect=dialect)

SqlIdentifier = Annotated[str, AfterValidator(_quote_identifier)]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added: bcbc393

@awaismirza92
Copy link
Collaborator Author

@srnnkls have a look again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Build data ingestion infrastructure for Databricks notebook

3 participants