Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b4e2cf0
docs: update type annotation
altaurog Apr 8, 2024
bc0b2b5
refactor: wrap adaptor in backend interface
altaurog May 5, 2024
5515f36
refactor: move plumbing to backend
altaurog May 5, 2024
60f60d9
refactor: move error handling to common function
altaurog May 5, 2024
38c33cc
test: use separate fixture for client encoding
altaurog May 6, 2024
5965a07
test: remove direct psycopg2 references
altaurog May 6, 2024
ebcbf3b
test: remove broken tox python2.7 test environment
altaurog May 6, 2024
13498d6
chore: remove explicit psycopg2 dependency
altaurog May 6, 2024
d47e73c
feat: psycopg3 backend
altaurog May 6, 2024
c22e62a
refactor: better backend detection
altaurog May 13, 2024
a546192
refactor: simplify
altaurog May 13, 2024
e2be2a6
test: test available backends
altaurog May 13, 2024
e44bbdc
feat: add PyGreSQL backend
altaurog May 13, 2024
bc85ec1
feat: add pg8000 backend
altaurog May 13, 2024
64ddbbe
test: add all backends to tox
altaurog Dec 24, 2025
f96621a
test: add ability to run tests without temporary tables
altaurog Nov 23, 2025
57f225b
test: refactor: use super()
altaurog Nov 23, 2025
102615f
test: recombine extension and table setup
altaurog Dec 21, 2025
0a547d6
test: port dsql branch
altaurog Dec 21, 2025
9b2a7ad
Merge branch 'master' into feat/backend
altaurog Dec 24, 2025
558ed29
test: minor refactor in db creation
altaurog Dec 24, 2025
1fd9f52
test: silence linter warnings
altaurog Dec 24, 2025
93dc1b5
test: allow testing with psycopg2 or psycopg3
altaurog Dec 24, 2025
9ffbc86
test: update docker compose test
altaurog Dec 24, 2025
b7faad9
docs: list supported db adaptors
altaurog Dec 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ PostgreSQL with `binary copy`_.

Features
---------
* Support for multiple db adaptors
* Support for many data types
* Support for multi-dimensional array types
* Support for schema and schema search path
Expand All @@ -42,15 +43,15 @@ Quickstart

from datetime import datetime
from pgcopy import CopyManager
import psycopg2
import psycopg
cols = ('id', 'timestamp', 'location', 'temperature')
now = datetime.now()
records = [
(0, now, 'Jerusalem', 72.2),
(1, now, 'New York', 75.6),
(2, now, 'Moscow', 54.3),
]
conn = psycopg2.connect(database='weather_db')
conn = psycopg.connect(database='weather_db')
mgr = CopyManager(conn, 'measurements_table', cols)
mgr.copy(records)

Expand All @@ -59,6 +60,14 @@ Quickstart

.. home-end

Supported Adaptors
-------------------

* psycopg2_
* psycopg_
* pg8000_
* PyGreSQL_

Supported datatypes
-------------------

Expand Down Expand Up @@ -96,6 +105,10 @@ Documentation

.. _binary copy: http://www.postgresql.org/docs/9.3/static/sql-copy.html
.. _psycopg2: https://pypi.org/project/psycopg2/
.. _psycopg: https://pypi.org/project/psycopg/
.. _pg8000: https://pypi.org/project/pg8000/
.. _PyGreSQL: https://pypi.org/project/PyGreSQL/

.. _pytz: https://pypi.org/project/pytz/
.. _pytest: https://pypi.org/project/pytest/
.. _Tox: https://tox.readthedocs.io/en/latest/
3 changes: 1 addition & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '2'
services:
pgcopy:
build:
Expand All @@ -15,7 +14,7 @@ services:
- pgsql

pgsql:
image: pgvector/pgvector:pg12
image: pgvector/pgvector:pg18
environment:
- POSTGRES_DB=pgcopy_test
- POSTGRES_USER=postgres
Expand Down
6 changes: 3 additions & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.10-slim-buster
FROM python:3.14-slim

ARG DEBIAN_FRONTEND=noninteractive

Expand All @@ -7,13 +7,13 @@ RUN \
apt-get install -y --no-install-recommends \
gcc \
libpq-dev \
netcat \
netcat-traditional \
python3 \
python3-dev \
python3-pip \
python3-setuptools

RUN pip3 install pytest==8.1.1
RUN pip3 install pytest psycopg
COPY ./ /opt/install
WORKDIR /opt/install
RUN pip3 install .
Expand Down
17 changes: 9 additions & 8 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ To install::

Dependencies
""""""""""""
pgcopy requires pytz_ and the psycopg2_ db adapter.
pytest_ is required to run the tests.
pgcopy requires Python3, pytz_, and a db adaptor. The supported adaptors are:

Due to technical problems with binary distributions, `psycopg2 versions
2.8 and later`_ have separate packages for binary install. This complicates
installation in some situations, as it requires the dev tools to build psycopg2.
* psycopg2_
* psycopg_
* pg8000_
* PyGreSQL_

If you do not want to build psycopg2 for each installation, the recommended
approach is to create a psycopg2 wheel for distribution to production machines
pytest_ and one of psycopg2_ or psycopg_ is required to run the tests.

Compatibility
"""""""""""""
Expand All @@ -29,7 +28,9 @@ PostgreSQL versions 13 -- 18, as well as `Aurora DSQL`_
Please upgrade to Python 3.

.. _psycopg2: https://pypi.org/project/psycopg2/
.. _psycopg: https://pypi.org/project/psycopg/
.. _pg8000: https://pypi.org/project/pg8000/
.. _PyGreSQL: https://pypi.org/project/PyGreSQL/
.. _pytz: https://pypi.org/project/pytz/
.. _pytest: https://pypi.org/project/pytest/
.. _psycopg2 versions 2.8 and later: https://www.psycopg.org/docs/install#change-in-binary-packages-between-psycopg-2-7-and-2-8
.. _Aurora DSQL: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/what-is-aurora-dsql.html
5 changes: 5 additions & 0 deletions docs/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ database:
* ``POSTGRES_PASSWORD``


One of psycopg2_ or psycopg_ is required to run the tests. The test suite
will automatically discover all supported db adaptors.

For more thorough testing, tox_ with tox-docker_ will run tests on python
versions 3.9 -- 3.14 and postgresql versions 13 -- 18::

Expand All @@ -37,3 +40,5 @@ boto3 must be installed and ``POSTGRES_HOST`` set to the dsql endpoint.
.. _pytest: https://pypi.org/project/pytest/
.. _tox: https://tox.wiki
.. _tox-docker: https://tox-docker.readthedocs.io
.. _psycopg2: https://pypi.org/project/psycopg2/
.. _psycopg: https://pypi.org/project/psycopg/
254 changes: 254 additions & 0 deletions pgcopy/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
"psycopg backends"
import codecs
import collections
import contextlib
import importlib
import os

from .errors import UnsupportedConnectionError
from .thread import RaisingThread


def for_connection(conn):
sources = [cls.__module__.split(".")[0] for cls in conn.__class__.mro()]
if "psycopg2" in sources:
return Psycopg2Backend(conn)
if "psycopg" in sources:
return Psycopg3Backend(conn)
if "pgdb" in sources:
return PyGreSQLBackend(conn)
if "pg8000" in sources:
return Pg8000Backend(conn)
message = f"{conn.__class__.__name__} is not a supported connection type"
raise UnsupportedConnectionError(message)


def copy_sql(schema, table, columns):
column_list = '", "'.join(columns)
cmd = 'COPY "{0}"."{1}" ("{2}") FROM STDIN WITH BINARY'
return cmd.format(schema, table, column_list)


class Psycopg2Backend:
def __init__(self, conn):
self.conn = conn
self.adaptor = importlib.import_module("psycopg2")
self.adaptor.extras = importlib.import_module("psycopg2.extras")

def get_encoding(self):
return self.adaptor.extensions.encodings[self.conn.encoding]

def namedtuple_cursor(self):
factory = self.adaptor.extras.NamedTupleCursor
return self.conn.cursor(cursor_factory=factory)

def copy(self, schema, table, columns, fobject_factory):
sql = copy_sql(schema, table, columns)
return Psycopg2Copy(self.conn, sql, fobject_factory)

def threading_copy(self, schema, table, columns):
sql = copy_sql(schema, table, columns)
return Psycopg2ThreadingCopy(self.conn, sql)


class Psycopg2Copy:
def __init__(self, conn, sql, fobject_factory):
self.conn = conn
self.sql = sql
self.datastream = fobject_factory()

def __enter__(self):
return self.datastream

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.datastream.seek(0)
self.copystream()
self.datastream.close()

def copystream(self):
with self.conn.cursor() as cur:
cur.copy_expert(self.sql, self.datastream)


class Psycopg2ThreadingCopy:
def __init__(self, conn, sql):
self.conn = conn
self.sql = sql
r_fd, w_fd = os.pipe()
self.rstream = os.fdopen(r_fd, "rb")
self.wstream = os.fdopen(w_fd, "wb")

def __enter__(self):
self.copy_thread = RaisingThread(target=self.copystream)
self.copy_thread.start()
return self.wstream

def __exit__(self, exc_type, exc_val, exc_tb):
self.wstream.close()
self.copy_thread.join()

def copystream(self):
with self.conn.cursor() as cur:
cur.copy_expert(self.sql, self.rstream)


class Psycopg3Backend:
def __init__(self, conn):
self.conn = conn
self.adaptor = importlib.import_module("psycopg")

def get_encoding(self):
return self.conn.info.encoding

def namedtuple_cursor(self):
factory = self.adaptor.rows.namedtuple_row
return self.conn.cursor(row_factory=factory)

@contextlib.contextmanager
def copy(self, schema, table, columns, _):
sql = copy_sql(schema, table, columns)
with self.conn.cursor() as cur:
with cur.copy(sql) as copy:
yield copy

@contextlib.contextmanager
def threading_copy(self, schema, table, columns):
sql = copy_sql(schema, table, columns)
with self.conn.cursor() as cur:
with cur.copy(sql) as copy:
yield copy


class PyGreSQLBackend:
def __init__(self, conn):
self.conn = conn

def get_encoding(self):
with self.conn.cursor() as cur:
cur.execute("SHOW client_encoding")
row = cur.fetchone()
return codecs.lookup(row.client_encoding).name

def namedtuple_cursor(self):
return self.conn.cursor()

def copy(self, schema, table, columns, fobject_factory):
return PyGreSQLCopy(self.conn, schema, table, columns, fobject_factory)


class PyGreSQLCopy:
def __init__(self, conn, schema, table, columns, fobject_factory):
self.conn = conn
self.table = f"{schema}.{table}"
self.columns = columns
self.datastream = fobject_factory()

def __enter__(self):
return self.datastream

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.datastream.seek(0)
self.copystream()
self.datastream.close()

def copystream(self):
with self.conn.cursor() as cur:
cur.copy_from(
self.datastream,
self.table,
format="binary",
columns=self.columns,
)


class PyGreSQLThreadingCopy:
def __init__(self, conn, schema, table, columns):
self.conn = conn
self.table = f"{schema}.{table}"
self.columns = columns
r_fd, w_fd = os.pipe()
self.rstream = os.fdopen(r_fd, "rb")
self.wstream = os.fdopen(w_fd, "wb")

def __enter__(self):
self.copy_thread = RaisingThread(target=self.copystream)
self.copy_thread.start()
return self.wstream

def __exit__(self, exc_type, exc_val, exc_tb):
self.wstream.close()
self.copy_thread.join()

def copystream(self):
with self.conn.cursor() as cur:
cur.copy_from(
self.rstream,
self.table,
format="binary",
columns=self.columns,
)


class Pg8000Backend:
NamedTupleCursor = None

def __init__(self, conn):
self.conn = conn

def get_encoding(self):
with contextlib.closing(self.namedtuple_cursor()) as cur:
cur.execute("SHOW client_encoding")
row = cur.fetchone()
return codecs.lookup(row.client_encoding).name

def namedtuple_cursor(self):
if not Pg8000Backend.NamedTupleCursor:
cur = self.conn.cursor()
Cursor = cur.__class__
cur.close()

class NamedTupleCursor(Cursor):
def __next__(self):
val = super().__next__()
context = self._context
if context is None:
return val # raised an error already
rowclass = getattr(context, "_pgcopy_row_class", None)
if not rowclass:
columns = context.columns
if columns is None or len(columns) == 0:
return val # probably also raised an error
column_names = [col["name"] for col in columns]
rowclass = collections.namedtuple("Row", column_names)
context._pgcopy_row_class = rowclass
return rowclass(*val)

Pg8000Backend.NamedTupleCursor = NamedTupleCursor
return Pg8000Backend.NamedTupleCursor(self.conn)

def copy(self, schema, table, columns, fobject_factory):
sql = copy_sql(schema, table, columns)
return Pg8000Copy(self.conn, sql, fobject_factory)


class Pg8000Copy:
def __init__(self, conn, sql, fobject_factory):
self.conn = conn
self.sql = sql
self.datastream = fobject_factory()

def __enter__(self):
return self.datastream

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.datastream.seek(0)
self.copystream()
self.datastream.close()

def copystream(self):
cur = self.conn.cursor()
cur.execute(self.sql, stream=self.datastream)
cur.close()
Loading
Loading