Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

22 changes: 0 additions & 22 deletions .kokoro/presubmit/integration-regular-sessions-enabled.cfg

This file was deleted.

9 changes: 0 additions & 9 deletions google/cloud/spanner_dbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,6 @@ def __init__(self, instance, database=None, read_only=False, **kwargs):

self.is_closed = False
self._autocommit = False
# indicator to know if the session pool used by
# this connection should be cleared on the
# connection close
self._own_pool = True
self._read_only = read_only
self._staleness = None
self.request_priority = None
Expand Down Expand Up @@ -443,9 +439,6 @@ def close(self):
if self._spanner_transaction_started and not self._read_only:
self._transaction.rollback()

if self._own_pool and self.database:
self.database._sessions_manager._pool.clear()

self.is_closed = True

@check_not_closed
Expand Down Expand Up @@ -830,7 +823,5 @@ def connect(
database_id, pool=pool, database_role=database_role, logger=logger
)
conn = Connection(instance, database, **kwargs)
if pool is not None:
conn._own_pool = False

return conn
50 changes: 19 additions & 31 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
from google.cloud.spanner_v1.batch import MutationGroups
from google.cloud.spanner_v1.keyset import KeySet
from google.cloud.spanner_v1.merged_result_set import MergedResultSet
from google.cloud.spanner_v1.pool import BurstyPool
from google.cloud.spanner_v1.session import Session
from google.cloud.spanner_v1.database_sessions_manager import (
DatabaseSessionsManager,
Expand Down Expand Up @@ -122,9 +121,11 @@ class Database(object):

:type pool: concrete subclass of
:class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`.
:param pool: (Optional) session pool to be used by database. If not
passed, the database will construct an instance of
:class:`~google.cloud.spanner_v1.pool.BurstyPool`.
:param pool: (Deprecated) session pool to be used by database. Session
pools are deprecated as multiplexed sessions are now used for
all operations by default. If not passed, the database will
construct an internal pool instance for backward compatibility.
New code should not pass a pool argument.

:type logger: :class:`logging.Logger`
:param logger: (Optional) a custom logger that is used if `log_commit_stats`
Expand Down Expand Up @@ -198,16 +199,21 @@ def __init__(
self._proto_descriptors = proto_descriptors
self._channel_id = 0 # It'll be created when _spanner_api is created.

if pool is None:
pool = BurstyPool(database_role=database_role)
# Session pools are deprecated. Multiplexed sessions are now used for all operations.
# The pool parameter is kept for backward compatibility but is ignored.
if pool is not None:
from warnings import warn

warn(
"The 'pool' parameter is deprecated and ignored. "
"Multiplexed sessions are now used for all operations.",
DeprecationWarning,
stacklevel=2,
)

self._pool = pool
pool.bind(self)
is_experimental_host = self._instance.experimental_host is not None

self._sessions_manager = DatabaseSessionsManager(
self, pool, is_experimental_host
)
self._sessions_manager = DatabaseSessionsManager(self, is_experimental_host)

@classmethod
def from_pb(cls, database_pb, instance, pool=None):
Expand Down Expand Up @@ -861,14 +867,8 @@ def session(self, labels=None, database_role=None):
# If role is specified in param, then that role is used
# instead.
role = database_role or self._database_role
is_multiplexed = False
if self.sessions_manager._use_multiplexed(
transaction_type=TransactionType.READ_ONLY
):
is_multiplexed = True
return Session(
self, labels=labels, database_role=role, is_multiplexed=is_multiplexed
)
# Always use multiplexed sessions
return Session(self, labels=labels, database_role=role, is_multiplexed=True)

def snapshot(self, **kw):
"""Return an object which wraps a snapshot.
Expand Down Expand Up @@ -1430,12 +1430,6 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if isinstance(exc_val, NotFound):
# If NotFound exception occurs inside the with block
# then we validate if the session still exists.
if not self._session.exists():
self._session = self._database._pool._new_session()
self._session.create()
self._database.sessions_manager.put_session(self._session)


Expand Down Expand Up @@ -1471,12 +1465,6 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if isinstance(exc_val, NotFound):
# If NotFound exception occurs inside the with block
# then we validate if the session still exists.
if not self._session.exists():
self._session = self._database._pool._new_session()
self._session.create()
self._database.sessions_manager.put_session(self._session)


Expand Down
96 changes: 15 additions & 81 deletions google/cloud/spanner_v1/database_sessions_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from os import getenv
from datetime import timedelta
from threading import Event, Lock, Thread
from time import sleep, time
Expand Down Expand Up @@ -41,31 +40,21 @@ class DatabaseSessionsManager(object):
transaction type using :meth:`get_session`, and returned to the session manager
using :meth:`put_session`.

The sessions returned by the session manager depend on the configured environment variables
and the provided session pool (see :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`).
Multiplexed sessions are used for all transaction types.

:type database: :class:`~google.cloud.spanner_v1.database.Database`
:param database: The database to manage sessions for.

:type pool: :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`
:param pool: The pool to get non-multiplexed sessions from.
:type is_experimental_host: bool
:param is_experimental_host: Whether the database is using an experimental host.
"""

# Environment variables for multiplexed sessions
_ENV_VAR_MULTIPLEXED = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
_ENV_VAR_MULTIPLEXED_PARTITIONED = (
"GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
)
_ENV_VAR_MULTIPLEXED_READ_WRITE = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"

# Intervals for the maintenance thread to check and refresh the multiplexed session.
_MAINTENANCE_THREAD_POLLING_INTERVAL = timedelta(minutes=10)
_MAINTENANCE_THREAD_REFRESH_INTERVAL = timedelta(days=7)

def __init__(self, database, pool, is_experimental_host: bool = False):
def __init__(self, database, is_experimental_host: bool = False):
self._database = database
self._pool = pool
self._is_experimental_host = is_experimental_host

# Declare multiplexed session attributes. When a multiplexed session for the
# database session manager is created, a maintenance thread is initialized to
Expand All @@ -81,17 +70,16 @@ def __init__(self, database, pool, is_experimental_host: bool = False):
self._multiplexed_session_terminate_event: Event = Event()

def get_session(self, transaction_type: TransactionType) -> Session:
"""Returns a session for the given transaction type from the database session manager.
"""Returns a multiplexed session for the given transaction type.

:type transaction_type: :class:`TransactionType`
:param transaction_type: The type of transaction (ignored, multiplexed
sessions support all transaction types).

:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a session for the given transaction type.
:returns: a multiplexed session.
"""

session = (
self._get_multiplexed_session()
if self._use_multiplexed(transaction_type) or self._is_experimental_host
else self._pool.get()
)
session = self._get_multiplexed_session()

add_span_event(
get_current_span(),
Expand All @@ -104,21 +92,18 @@ def get_session(self, transaction_type: TransactionType) -> Session:
def put_session(self, session: Session) -> None:
"""Returns the session to the database session manager.

For multiplexed sessions, this is a no-op since they can handle
multiple concurrent transactions and don't need to be returned to a pool.

:type session: :class:`~google.cloud.spanner_v1.session.Session`
:param session: The session to return to the database session manager.
"""

add_span_event(
get_current_span(),
"Returning session",
{"id": session.session_id, "multiplexed": session.is_multiplexed},
)

# No action is needed for multiplexed sessions: the session
# pool is only used for managing non-multiplexed sessions,
# since they can only process one transaction at a time.
if not session.is_multiplexed:
self._pool.put(session)
# Multiplexed sessions don't need to be returned to a pool

def _get_multiplexed_session(self) -> Session:
"""Returns a multiplexed session from the database session manager.
Expand Down Expand Up @@ -225,54 +210,3 @@ def _maintain_multiplexed_session(session_manager_ref) -> None:
manager._multiplexed_session = manager._build_multiplexed_session()

session_created_time = time()

@classmethod
def _use_multiplexed(cls, transaction_type: TransactionType) -> bool:
"""Returns whether to use multiplexed sessions for the given transaction type.

Multiplexed sessions are enabled for read-only transactions if:
* _ENV_VAR_MULTIPLEXED != 'false'.

Multiplexed sessions are enabled for partitioned transactions if:
* _ENV_VAR_MULTIPLEXED_PARTITIONED != 'false'.

Multiplexed sessions are enabled for read/write transactions if:
* _ENV_VAR_MULTIPLEXED_READ_WRITE != 'false'.

:type transaction_type: :class:`TransactionType`
:param transaction_type: the type of transaction

:rtype: bool
:returns: True if multiplexed sessions should be used for the given transaction
type, False otherwise.

:raises ValueError: if the transaction type is not supported.
"""

if transaction_type is TransactionType.READ_ONLY:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED)

elif transaction_type is TransactionType.PARTITIONED:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED_PARTITIONED)

elif transaction_type is TransactionType.READ_WRITE:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED_READ_WRITE)

raise ValueError(f"Transaction type {transaction_type} is not supported.")

@classmethod
def _getenv(cls, env_var_name: str) -> bool:
"""Returns the value of the given environment variable as a boolean.

True unless explicitly 'false' (case-insensitive).
All other values (including unset) are considered true.

:type env_var_name: str
:param env_var_name: the name of the boolean environment variable

:rtype: bool
:returns: True unless the environment variable is set to 'false', False otherwise.
"""

env_var_value = getenv(env_var_name, "true").lower().strip()
return env_var_value != "false"
14 changes: 11 additions & 3 deletions google/cloud/spanner_v1/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ def database(

:type pool: concrete subclass of
:class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`.
:param pool: (Optional) session pool to be used by database.
:param pool: (Optional) Deprecated. Session pools are no longer used.
Multiplexed sessions are now used for all operations.

:type logger: :class:`logging.Logger`
:param logger: (Optional) a custom logger that is used if `log_commit_stats`
Expand Down Expand Up @@ -488,13 +489,21 @@ def database(
:rtype: :class:`~google.cloud.spanner_v1.database.Database`
:returns: a database owned by this instance.
"""
if pool is not None:
from warnings import warn

warn(
"The 'pool' parameter is deprecated and ignored. "
"Multiplexed sessions are now used for all operations.",
DeprecationWarning,
stacklevel=2,
)

if not enable_interceptors_in_tests:
return Database(
database_id,
self,
ddl_statements=ddl_statements,
pool=pool,
logger=logger,
encryption_config=encryption_config,
database_dialect=database_dialect,
Expand All @@ -507,7 +516,6 @@ def database(
database_id,
self,
ddl_statements=ddl_statements,
pool=pool,
logger=logger,
encryption_config=encryption_config,
database_dialect=database_dialect,
Expand Down
Loading
Loading