Skip to content

Commit a193538

Browse files
committed
Add Type Map and Fix Tests
1 parent b8b568e commit a193538

File tree

5 files changed

+64
-45
lines changed

5 files changed

+64
-45
lines changed

config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
3737
destination:
3838
ref: Dune
39-
table_name: dune_sync_test_table
39+
table_name: bh2smith.dune_sync_test
4040

4141
- name: cow-solvers
4242
source:

src/destinations/dune.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Destination logic for Dune Analytics."""
22

3+
import io
4+
35
from dune_client.client import DuneClient
46
from dune_client.models import DuneError
57

@@ -59,30 +61,34 @@ def save(self, data: TypedDataFrame) -> int:
5961
6062
"""
6163
try:
62-
# TODO: Determine user name from DuneAPI key?
63-
namespace = "username"
64-
table_name = self.table_name
6564
log.debug("Uploading DF to Dune...")
65+
namespace, table_name = self._get_namespace_and_table_name()
6666
# TODO check first if table exists? Or warn if it did...
6767
self.client.create_table(
6868
namespace,
6969
table_name,
7070
schema=[
71-
{"name": name, "type": dtype, "nullable": "true"}
72-
for name, dtype in data.types.items()
71+
{"name": name, "type": dtype} for name, dtype in data.types.items()
7372
],
7473
)
7574
result = self.client.insert_table(
7675
namespace,
7776
table_name,
78-
# TODO - bytes -> IO[bytes]
79-
data=data.dataframe.to_csv(index=False), # type: ignore
77+
data=io.BytesIO(data.dataframe.to_csv(index=False).encode()),
8078
content_type="text/csv",
8179
)
8280
if not result:
8381
raise RuntimeError("Dune Upload Failed")
82+
log.debug("Inserted DF to Dune, %s", result)
8483
except DuneError as dune_e:
8584
log.error("Dune did not accept our upload: %s", dune_e)
8685
except (ValueError, RuntimeError) as e:
8786
log.error("Data processing error: %s", e)
8887
return len(data)
88+
89+
def _get_namespace_and_table_name(self) -> tuple[str, str]:
90+
"""Split the namespace, table name from the provided table name."""
91+
if "." not in self.table_name:
92+
raise ValueError("Table name must be in the format namespace.table_name")
93+
namespace, table_name = self.table_name.split(".")
94+
return namespace, table_name

src/sources/postgres.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from src.interfaces import Source, TypedDataFrame
1313
from src.logger import log
14+
from src.sources.type_maps import PG_TO_DUNE
1415

1516

1617
def _convert_bytea_to_hex(df: DataFrame) -> DataFrame:
@@ -118,11 +119,17 @@ async def fetch(self) -> TypedDataFrame:
118119
# of SQLAlchemy's synchronous interface.
119120
# The current solution using run_in_executor is a workaround
120121
# that moves the blocking operation to a thread pool.
122+
# First get the column types
123+
with self.engine.connect() as conn:
124+
result = conn.execute(text(self.query_string))
125+
types = {
126+
col.name: PG_TO_DUNE[col.type_code] for col in result.cursor.description
127+
}
121128
df = await loop.run_in_executor(
122129
None, lambda: pd.read_sql_query(self.query_string, con=self.engine)
123130
)
124131
# TODO - extract types and return TypedDataFrame.
125-
return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types={})
132+
return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types=types)
126133

127134
def is_empty(self, data: TypedDataFrame) -> bool:
128135
"""Check if the provided TypedDataFrame is empty.

src/sources/type_maps.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,19 @@
3333
# 1. Notice `DOUBLE_PRECISION` has two pre-images: we chose double
3434
# 2. timestamp with time zone not aligned with timestamp
3535
# 3. Apparently no JSONB support here.
36-
PG_TO_DUNE: dict[type[Any] | NUMERIC, str] = {
37-
BIGINT: "bigint",
38-
INTEGER: "integer",
39-
BYTEA: "varbinary",
40-
DATE: "date",
41-
BOOLEAN: "boolean",
42-
VARCHAR: "varchar",
43-
DOUBLE_PRECISION: "double",
44-
TIMESTAMP: "timestamp", # This doesn't match with above
45-
NUMERIC: "uint256",
36+
PG_TO_DUNE: dict[int, str] = {
37+
16: "boolean",
38+
17: "varbinary",
39+
20: "bigint",
40+
21: "bigint", # smallint
41+
23: "integer",
42+
25: "varchar",
43+
# 26: "oid",
44+
700: "double",
45+
701: "double",
46+
1042: "varchar",
47+
1043: "varchar",
48+
1082: "timestamp",
49+
1114: "timestamp", # This doesn't match with above
50+
1700: "uint256",
4651
}

tests/unit/destinations_test.py

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import os
22
import unittest
3-
from logging import ERROR, WARNING
3+
from logging import DEBUG, ERROR, WARNING
44
from unittest.mock import patch
55

66
import pandas as pd
@@ -26,12 +26,10 @@ def setUpClass(cls):
2626
)
2727
cls.env_patcher.start()
2828

29-
@patch("pandas.core.generic.NDFrame.to_csv", name="Fake csv writer")
3029
@patch("dune_client.api.table.TableAPI.create_table", name="Fake Table Creator")
3130
@patch("dune_client.api.table.TableAPI.insert_table", name="Fake Table Inserter")
32-
@patch("requests.sessions.Session.post")
3331
def test_ensure_index_disabled_when_uploading(
34-
self, mock_to_csv, mock_create_table, mock_insert_table, *_
32+
self, mock_create_table, mock_insert_table, *_
3533
):
3634
mock_create_table.return_value = {
3735
"namespace": "my_user",
@@ -41,24 +39,28 @@ def test_ensure_index_disabled_when_uploading(
4139
"already_existed": False,
4240
"message": "Table created successfully",
4341
}
42+
4443
mock_insert_table.return_value = {"rows_written": 9000, "bytes_written": 90}
4544

4645
dummy_df = TypedDataFrame(
4746
dataframe=pd.DataFrame(
4847
[
49-
{"foo": "bar"},
50-
{"baz": "daz"},
48+
{"foo": "bar", "baz": "one"},
49+
{"foo": "two", "baz": "two"},
5150
]
5251
),
5352
types={"foo": "varchar", "baz": "varchar"},
5453
)
5554
destination = DuneDestination(
5655
api_key=os.getenv("DUNE_API_KEY"),
57-
table_name="foo",
56+
table_name="foo.bar",
5857
request_timeout=10,
5958
)
60-
destination.save(dummy_df)
61-
mock_to_csv.assert_called_once_with(index=False)
59+
with self.assertLogs(level=DEBUG) as logs:
60+
destination.save(dummy_df)
61+
62+
self.assertIn("Uploading DF to Dune", logs.output[0])
63+
self.assertIn("Inserted DF to Dune,", logs.output[1])
6264

6365
@patch("pandas.core.generic.NDFrame.to_csv", name="Fake csv writer")
6466
def test_duneclient_sets_timeout(self, mock_to_csv, *_):
@@ -70,13 +72,12 @@ def test_duneclient_sets_timeout(self, mock_to_csv, *_):
7072
)
7173
assert destination.client.request_timeout == timeout
7274

73-
@patch("dune_client.api.table.TableAPI.upload_csv", name="Fake CSV uploader")
7475
@patch("dune_client.api.table.TableAPI.create_table", name="Fake Table Creator")
7576
@patch("dune_client.api.table.TableAPI.insert_table", name="Fake Table Inserter")
76-
def test_dune_error_handling(
77-
self, mock_dune_upload_csv, mock_create_table, mock_insert_table
78-
):
79-
dest = DuneDestination(api_key="f00b4r", table_name="foo", request_timeout=10)
77+
def test_dune_error_handling(self, mock_create_table, mock_insert_table):
78+
dest = DuneDestination(
79+
api_key="f00b4r", table_name="foo.bar", request_timeout=10
80+
)
8081
df = pd.DataFrame([{"foo": "bar"}])
8182

8283
mock_create_table.return_value = {
@@ -96,13 +97,13 @@ def test_dune_error_handling(
9697
val_err = ValueError("Oops")
9798
runtime_err = RuntimeError("Big Oops")
9899

99-
mock_dune_upload_csv.side_effect = dune_err
100+
mock_create_table.side_effect = dune_err
100101

101102
data = TypedDataFrame(df, {})
102103
with self.assertLogs(level=ERROR) as logs:
103104
dest.save(data)
104105

105-
mock_dune_upload_csv.assert_called_once()
106+
mock_create_table.assert_called_once()
106107

107108
# does this shit really look better just because it's < 88 characters long?
108109
exmsg = (
@@ -111,36 +112,36 @@ def test_dune_error_handling(
111112
)
112113
self.assertIn(exmsg, logs.output[0])
113114

114-
mock_dune_upload_csv.reset_mock()
115-
mock_dune_upload_csv.side_effect = val_err
115+
mock_create_table.reset_mock()
116+
mock_create_table.side_effect = val_err
116117

117118
with self.assertLogs(level=ERROR) as logs:
118119
dest.save(data)
119120

120-
mock_dune_upload_csv.assert_called_once()
121+
mock_create_table.assert_called_once()
121122
expected_message = "Data processing error: Oops"
122123
self.assertIn(expected_message, logs.output[0])
123124

124-
mock_dune_upload_csv.reset_mock()
125-
mock_dune_upload_csv.side_effect = runtime_err
125+
mock_create_table.reset_mock()
126+
mock_create_table.side_effect = runtime_err
126127
with self.assertLogs(level=ERROR) as logs:
127128
dest.save(data)
128129

129-
mock_dune_upload_csv.assert_called_once()
130+
mock_create_table.assert_called_once()
130131
expected_message = "Data processing error: Big Oops"
131132
self.assertIn(expected_message, logs.output[0])
132133

133-
mock_dune_upload_csv.reset_mock()
134+
mock_create_table.reset_mock()
134135

135136
# TIL: reset_mock() doesn't clear side effects....
136-
mock_dune_upload_csv.side_effect = None
137+
mock_create_table.side_effect = None
137138

138-
mock_dune_upload_csv.return_value = None
139+
mock_create_table.return_value = None
139140

140141
with self.assertLogs(level=ERROR) as logs:
141142
dest.save(data)
142143

143-
mock_dune_upload_csv.assert_called_once()
144+
mock_create_table.assert_called_once()
144145
self.assertIn("Dune Upload Failed", logs.output[0])
145146

146147

0 commit comments

Comments
 (0)