Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion google/cloud/sqlalchemy_spanner/requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ def get_isolation_levels(self, _):
Returns:
dict: isolation levels description.
"""
return {"default": "SERIALIZABLE", "supported": ["SERIALIZABLE", "AUTOCOMMIT"]}
return {
"default": "SERIALIZABLE",
"supported": ["SERIALIZABLE", "REPEATABLE READ", "AUTOCOMMIT"],
}

@property
def precision_numerics_enotation_large(self):
Expand Down
56 changes: 48 additions & 8 deletions google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)
from google.api_core.client_options import ClientOptions
from google.auth.credentials import AnonymousCredentials
from google.cloud.spanner_v1 import Client
from google.cloud.spanner_v1 import Client, TransactionOptions
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.sql import elements
from sqlalchemy import ForeignKeyConstraint, types, TypeDecorator, PickleType
Expand Down Expand Up @@ -218,6 +218,16 @@ def pre_exec(self):
if request_tag:
self.cursor.request_tag = request_tag

ignore_transaction_warnings = self.execution_options.get(
"ignore_transaction_warnings"
)
if ignore_transaction_warnings is not None:
conn = self._dbapi_connection.connection
if conn is not None and hasattr(conn, "_connection_variables"):
conn._connection_variables[
"ignore_transaction_warnings"
] = ignore_transaction_warnings

def fire_sequence(self, seq, type_):
"""Builds a statement for fetching next value of the sequence."""
return self._execute_scalar(
Expand Down Expand Up @@ -777,6 +787,7 @@ class SpannerDialect(DefaultDialect):
encoding = "utf-8"
max_identifier_length = 256
_legacy_binary_type_literal_encoding = "utf-8"
_default_isolation_level = "SERIALIZABLE"

execute_sequence_format = list

Expand Down Expand Up @@ -828,12 +839,11 @@ def default_isolation_level(self):
Returns:
str: default isolation level.
"""
return "SERIALIZABLE"
return self._default_isolation_level

@default_isolation_level.setter
def default_isolation_level(self, value):
"""Default isolation level should not be changed."""
pass
self._default_isolation_level = value

def _check_unicode_returns(self, connection, additional_tests=None):
"""Ensure requests are returning Unicode responses."""
Expand Down Expand Up @@ -1682,15 +1692,21 @@ def set_isolation_level(self, conn_proxy, level):
spanner_dbapi.connection.Connection,
]
):
Database connection proxy object or the connection iself.
Database connection proxy object or the connection itself.
level (string): Isolation level.
"""
if isinstance(conn_proxy, spanner_dbapi.Connection):
conn = conn_proxy
else:
conn = conn_proxy.connection

conn.autocommit = level == "AUTOCOMMIT"
if level == "AUTOCOMMIT":
conn.autocommit = True
else:
if isinstance(level, str):
level = self._string_to_isolation_level(level)
conn.isolation_level = level
conn.autocommit = False

def get_isolation_level(self, conn_proxy):
"""Get the connection isolation level.
Expand All @@ -1702,7 +1718,7 @@ def get_isolation_level(self, conn_proxy):
spanner_dbapi.connection.Connection,
]
):
Database connection proxy object or the connection iself.
Database connection proxy object or the connection itself.

Returns:
str: the connection isolation level.
Expand All @@ -1712,7 +1728,31 @@ def get_isolation_level(self, conn_proxy):
else:
conn = conn_proxy.connection

return "AUTOCOMMIT" if conn.autocommit else "SERIALIZABLE"
if conn.autocommit:
return "AUTOCOMMIT"

level = conn.isolation_level
if level == TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED:
level = TransactionOptions.IsolationLevel.SERIALIZABLE
if isinstance(level, TransactionOptions.IsolationLevel):
level = self._isolation_level_to_string(level)

return level

def _string_to_isolation_level(self, name):
try:
# SQLAlchemy guarantees that the isolation level string will:
# 1. Be all upper case.
# 2. Contain spaces instead of underscores.
# We change the spaces into underscores to get the enum value.
return TransactionOptions.IsolationLevel[name.replace(" ", "_")]
except KeyError:
raise ValueError("Invalid isolation level name '%s'" % name)

def _isolation_level_to_string(self, level):
# SQLAlchemy expects isolation level names to contain spaces,
# and not underscores, so we remove those before returning.
return level.name.replace("_", " ")

def do_rollback(self, dbapi_connection):
"""
Expand Down
47 changes: 47 additions & 0 deletions samples/isolation_level_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2025 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from sample_helper import run_sample
from model import Singer


# Shows how to set the isolation level for a read/write transaction.
# Spanner supports the following isolation levels:
# - SERIALIZABLE (default)
# - REPEATABLE READ
def isolation_level_sample():
engine = create_engine(
"spanner:///projects/sample-project/"
"instances/sample-instance/"
"databases/sample-database",
# You can set a default isolation level for an engine.
isolation_level="REPEATABLE READ",
echo=True,
)
# You can override the default isolation level of the connection
# by setting it in the execution_options.
with Session(engine.execution_options(isolation_level="SERIALIZABLE")) as session:
singer_id = str(uuid.uuid4())
singer = Singer(id=singer_id, first_name="John", last_name="Doe")
session.add(singer)
session.commit()


if __name__ == "__main__":
run_sample(isolation_level_sample)
5 changes: 5 additions & 0 deletions samples/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ def transaction(session):
_sample(session)


@nox.session()
def isolation_level(session):
_sample(session)


@nox.session()
def stale_read(session):
_sample(session)
Expand Down
28 changes: 28 additions & 0 deletions test/mockserver_tests/isolation_level_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2025 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy import String, BigInteger
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column


class Base(DeclarativeBase):
pass


class Singer(Base):
__tablename__ = "singers"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
name: Mapped[str] = mapped_column(String)
Loading