Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/3-ingest-csv-edge-weights/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
conn_conf = Neo4jConfig.from_docker_env()

# from graflo.db.connection.onto import TigergraphConfig
#
# conn_conf = TigergraphConfig.from_docker_env()

# Alternative: Create config directly or use environment variables
Expand Down
275 changes: 238 additions & 37 deletions graflo/db/connection/onto.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import abc
import logging
import warnings
from pathlib import Path
from strenum import StrEnum
from typing import Any, Dict, Type, TypeVar
Expand All @@ -10,6 +12,8 @@

from graflo.onto import MetaEnum

logger = logging.getLogger(__name__)

# Type variable for DBConfig subclasses
T = TypeVar("T", bound="DBConfig")

Expand Down Expand Up @@ -126,24 +130,174 @@ def effective_schema(self) -> str | None:
return self._get_effective_schema()

@model_validator(mode="after")
def _add_default_port_to_uri(self):
"""Add default port to URI if missing."""
def _normalize_uri(self):
"""Normalize URI: handle URIs without scheme and add default port if missing."""
if self.uri is None:
return self

# Valid URL schemes (common database protocols)
valid_schemes = {
"http",
"https",
"bolt",
"bolt+s",
"bolt+ssc",
"neo4j",
"neo4j+s",
"neo4j+ssc",
"mongodb",
"postgresql",
"postgres",
"mysql",
"nebula",
"redis", # FalkorDB uses redis:// protocol
"rediss", # Redis with SSL
}

# Try to parse as-is first
parsed = urlparse(self.uri)
if parsed.port is not None:

# Check if parsed scheme is actually a valid scheme or if it's a hostname
# urlparse treats "localhost:14240" as scheme="localhost", path="14240"
# We need to detect this case
has_valid_scheme = parsed.scheme.lower() in valid_schemes
has_netloc = bool(parsed.netloc)

# If scheme doesn't look like a valid scheme and we have a colon, treat as host:port
if not has_valid_scheme and ":" in self.uri and not self.uri.startswith("//"):
# Check if it looks like host:port format
parts = self.uri.split(":", 1)
if len(parts) == 2:
potential_host = parts[0]
port_and_rest = parts[1]
# Extract port (may have path/query after it)
port_part = port_and_rest.split("/")[0].split("?")[0].split("#")[0]
try:
# Validate port is numeric
int(port_part)
# If hostname doesn't look like a scheme (contains dots, is localhost, etc.)
# or if the parsed scheme is not in valid schemes, treat as host:port
if (
"." in potential_host
or potential_host.lower() in {"localhost", "127.0.0.1"}
or not has_valid_scheme
):
# Reconstruct as proper URI with default scheme
default_scheme = "http" # Default to http for most DBs
rest = port_and_rest[len(port_part) :] # Everything after port
self.uri = (
f"{default_scheme}://{potential_host}:{port_part}{rest}"
)
parsed = urlparse(self.uri)
except ValueError:
# Not a valid port, treat as regular URI - add scheme if needed
if not has_valid_scheme:
default_scheme = "http"
self.uri = f"{default_scheme}://{self.uri}"
parsed = urlparse(self.uri)
elif not has_valid_scheme and not has_netloc:
# No valid scheme and no netloc - add default scheme
default_scheme = "http"
self.uri = f"{default_scheme}://{self.uri}"
parsed = urlparse(self.uri)

# Add default port if missing
if parsed.port is None:
default_port = self._get_default_port()
if parsed.scheme and parsed.hostname:
# Reconstruct URI with port
port_part = f":{default_port}" if default_port else ""
path_part = parsed.path or ""
query_part = f"?{parsed.query}" if parsed.query else ""
fragment_part = f"#{parsed.fragment}" if parsed.fragment else ""
self.uri = f"{parsed.scheme}://{parsed.hostname}{port_part}{path_part}{query_part}{fragment_part}"

return self

@model_validator(mode="after")
def _extract_port_from_uri(self):
"""Extract port from URI and set it as gs_port for TigerGraph (if applicable).

For TigerGraph 4+, gs_port is the primary port. If URI has a port but gs_port
is not set, automatically extract and set gs_port from URI port.
This simplifies configuration - users can just provide URI with port.
"""
# Only apply to configs that have gs_port field (TigerGraph)
if not hasattr(self, "gs_port"):
return self

if self.uri and self.gs_port is None:
uri_port = self.port # Get port from URI (property from base class)
if uri_port:
try:
self.gs_port = int(uri_port)
logger.debug(
f"Automatically set gs_port={self.gs_port} from URI port"
)
except (ValueError, TypeError):
# Port couldn't be converted to int, skip auto-setting
pass

return self

@model_validator(mode="after")
def _check_port_conflicts(self):
"""Check for port conflicts between URI and separate port fields.

If port is provided both in URI and as a separate field, warn and prefer URI port.
This ensures consistency and avoids confusion.
"""
if self.uri is None:
return self

uri_port = self.port # Get port from URI
if uri_port is None:
return self

# Add default port
default_port = self._get_default_port()
if parsed.scheme and parsed.hostname:
# Reconstruct URI with port
port_part = f":{default_port}" if default_port else ""
path_part = parsed.path or ""
query_part = f"?{parsed.query}" if parsed.query else ""
fragment_part = f"#{parsed.fragment}" if parsed.fragment else ""
self.uri = f"{parsed.scheme}://{parsed.hostname}{port_part}{path_part}{query_part}{fragment_part}"
# Check for port fields in subclasses
# Get model fields to check for port-related fields
port_fields = []

# Check for specific port fields that might exist in subclasses
# Use getattr with None default to avoid AttributeError
if hasattr(self, "gs_port"):
gs_port_val = getattr(self, "gs_port", None)
if gs_port_val is not None:
port_fields.append(("gs_port", gs_port_val))

if hasattr(self, "bolt_port"):
bolt_port_val = getattr(self, "bolt_port", None)
if bolt_port_val is not None:
port_fields.append(("bolt_port", bolt_port_val))

# Check each port field for conflicts
port_conflicts = []
for field_name, field_port in port_fields:
# Compare as strings to handle int vs str differences
if str(field_port) != str(uri_port):
port_conflicts.append((field_name, field_port, uri_port))

# Warn about conflicts and prefer URI port
if port_conflicts:
conflict_msgs = [
f"{field_name}={field_port} (URI has port={uri_port})"
for field_name, field_port, _ in port_conflicts
]
warning_msg = (
f"Port conflict detected: Port specified both in URI ({uri_port}) "
f"and as separate field(s): {', '.join(conflict_msgs)}. "
f"Using port from URI ({uri_port}). Consider removing the separate port field(s)."
)
warnings.warn(warning_msg, UserWarning, stacklevel=2)
logger.warning(warning_msg)

# Update port fields to match URI port (prefer URI)
for field_name, _, _ in port_conflicts:
try:
setattr(self, field_name, int(uri_port))
except (ValueError, AttributeError):
# Field might be read-only or not settable, that's okay
pass

return self

Expand Down Expand Up @@ -546,14 +700,13 @@ class TigergraphConfig(DBConfig):
TigerGraph 4.1+ uses port 14240 (GSQL server) as the primary interface.
Port 9000 (REST++) is for internal use only in TG 4.1+.

For vanilla TigerGraph 4+ installations, you typically only need port 14240.
Both restppPort and gsPort default to 14240 for TG 4+ compatibility.
Standard ports:
- Port 14240: GSQL server (primary interface for all API requests)
- Port 9000: REST++ (internal-only in TG 4.1+)

For custom Docker deployments with port mapping, override the ports:
>>> config = TigergraphConfig(
... uri="http://localhost:9001", # Custom mapped REST++ port
... gs_port=14241, # Custom mapped GSQL port
... )
For custom Docker deployments with port mapping, ports are configured via
environment variables (e.g., TG_WEB, TG_REST) and loaded automatically
when using TigergraphConfig.from_docker_env().
"""

model_config = SettingsConfigDict(
Expand All @@ -562,7 +715,7 @@ class TigergraphConfig(DBConfig):
)

gs_port: int | None = Field(
default=None, description="TigerGraph GSQL port (default: 14240 for TG 4+)"
default=None, description="TigerGraph GSQL port (standard: 14240 for TG 4+)"
)
secret: str | None = Field(
default=None,
Expand All @@ -580,19 +733,27 @@ class TigergraphConfig(DBConfig):
"for cases where certificate hostname doesn't match (e.g., internal deployments with self-signed certs). "
"WARNING: Disabling SSL verification reduces security and should only be used in trusted environments.",
)
max_job_size: int = Field(
default=1000,
description="Maximum size (in characters) for a single SCHEMA_CHANGE JOB. "
"Large jobs (>30k chars) can cause parser failures. The schema change will be split "
"into multiple batches if the estimated size exceeds this limit. Default: 1000.",
)

def _get_default_port(self) -> int:
"""Get default TigerGraph REST++ port.

Note: TigerGraph 4.1+ uses port 14240 (GSQL server) as the primary interface.
Port 9000 (REST++) is for internal use only in TG 4.1+.
However, pyTigerGraph's connection object still needs this port configured
for backward compatibility with older TG versions.

For TigerGraph 4+, it's recommended to explicitly set both port and gs_port
to the publicly accessible GSQL port (typically 14240).
Standard ports:
- Port 14240: GSQL server (primary interface)
- Port 9000: REST++ (internal-only in TG 4.1+)

This method is kept for backward compatibility but should not be relied upon.
Ports should be explicitly configured in TigergraphConfig.
"""
return 14240 # Default to GSQL port for TG 4+ compatibility
return 14240 # Standard GSQL port for TG 4+

def _get_effective_database(self) -> str | None:
"""TigerGraph doesn't have a database level (connection -> schema -> vertices/edges)."""
Expand All @@ -607,11 +768,21 @@ def _get_effective_schema(self) -> str | None:
return self.schema_name

def __init__(self, **data):
"""Initialize TigerGraph config."""
"""Initialize TigerGraph config.

Note: For TigerGraph 4+, gs_port is the primary port (14240).
If URI is provided with a port, it will be automatically set as gs_port
by the _extract_port_from_uri validator.
Standard ports:
- 14240: GSQL server (primary interface)
- 9000: REST++ (internal-only in TG 4.1+)

If port is provided both in URI and as gs_port, the port from URI will be used
and a warning will be issued.
"""
super().__init__(**data)
# Set default gs_port if not provided
if self.gs_port is None:
self.gs_port = 14240
# Port extraction from URI is handled by _extract_port_from_uri validator
# Port conflicts are handled by _check_port_conflicts validator in base class

@classmethod
def from_docker_env(
Expand Down Expand Up @@ -640,22 +811,52 @@ def from_docker_env(

# Map environment variables to config
config_data: Dict[str, Any] = {}
if "TG_REST" in env_vars or "TIGERGRAPH_PORT" in env_vars:
port = env_vars.get("TG_REST") or env_vars.get("TIGERGRAPH_PORT")
hostname = env_vars.get("TIGERGRAPH_HOSTNAME", "localhost")
protocol = env_vars.get("TIGERGRAPH_PROTOCOL", "http")
config_data["uri"] = f"{protocol}://{hostname}:{port}"

if "TG_WEB" in env_vars or "TIGERGRAPH_GS_PORT" in env_vars:
gs_port = env_vars.get("TG_WEB") or env_vars.get("TIGERGRAPH_GS_PORT")
config_data["gs_port"] = int(gs_port) if gs_port else None
# For TigerGraph 4+, use GSQL port (TG_WEB) for both REST++ and GSQL
# TG_REST (port 9000) is internal-only in TG 4.1+
gs_port = env_vars.get("TG_WEB") or env_vars.get("TIGERGRAPH_GS_PORT")
rest_port = env_vars.get("TG_REST") or env_vars.get("TIGERGRAPH_PORT")

# Prefer GSQL port for TigerGraph 4+ compatibility
# Standard ports: 14240 (GSQL), 9000 (REST++)
# Docker may map these to different external ports (e.g., 14241, 9001)
if gs_port:
port = gs_port
config_data["gs_port"] = int(gs_port)
elif rest_port:
port = rest_port
# If only REST port is provided, use it for both (Docker mapping scenario)
config_data["gs_port"] = int(rest_port)
else:
raise ValueError(
"Either TG_WEB or TG_REST must be set in .env file. "
"Standard ports: 14240 (GSQL), 9000 (REST++)."
)

hostname = env_vars.get("TIGERGRAPH_HOSTNAME", "localhost")
protocol = env_vars.get("TIGERGRAPH_PROTOCOL", "http")
config_data["uri"] = f"{protocol}://{hostname}:{port}"

# Set default username if not provided
if "TIGERGRAPH_USERNAME" in env_vars:
config_data["username"] = env_vars["TIGERGRAPH_USERNAME"]
else:
config_data["username"] = "tigergraph" # Default username

# Set password from env vars or use default
if "TIGERGRAPH_PASSWORD" in env_vars or "GSQL_PASSWORD" in env_vars:
config_data["password"] = env_vars.get(
"TIGERGRAPH_PASSWORD"
) or env_vars.get("GSQL_PASSWORD")
else:
# Check environment variable as fallback, default to "tigergraph"
import os

config_data["password"] = (
os.environ.get("GSQL_PASSWORD")
or os.environ.get("TIGERGRAPH_PASSWORD")
or "tigergraph"
)
if "TIGERGRAPH_DATABASE" in env_vars:
config_data["database"] = env_vars["TIGERGRAPH_DATABASE"]

Expand Down
Loading