Skip to content

Commit 56c7a61

Browse files
authored
29202 - Add affiliation/account ID & refactor notebook (bcgov#3629)
* Update .env sample configuration * Add affiliation info and optimize notebook with batch query function * Move ENGINE_NAMES to engine creation cell for clarity
1 parent a65df93 commit 56c7a61

File tree

2 files changed

+141
-44
lines changed

2 files changed

+141
-44
lines changed

data-tool/notebooks/corps_onboarding_process_flow/.env.sample

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ DATABASE_LEAR_HOST=
1212
DATABASE_LEAR_PORT=
1313
DATABASE_LEAR_NAME=
1414

15+
# --- Auth Database ---
16+
DATABASE_AUTH_USERNAME=
17+
DATABASE_AUTH_PASSWORD=
18+
DATABASE_AUTH_HOST=
19+
DATABASE_AUTH_PORT=
20+
DATABASE_AUTH_NAME=
21+
1522
# --- EXCEL EXPORT ---
1623
EXPORT_OUTPUT_DIR=
1724

data-tool/notebooks/corps_onboarding_process_flow/migration_status_tracking.ipynb

Lines changed: 134 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44
"cell_type": "markdown",
55
"metadata": {},
66
"source": [
7-
"# Migration Status Spreadsheet Notebook (Part 1)\n",
7+
"# Migration Status Spreadsheet Notebook\n",
88
"\n",
99
"## Overview\n",
1010
"This notebook generates the data for the migration tracking spreadsheet.\n",
1111
"\n",
1212
"## What it does\n",
1313
"- Extracts migration data from COLIN Extract database\n",
14-
"- Retrieves filing information from LEAR database \n",
14+
"- Retrieves filing information from LEAR database\n",
15+
"- Retrieves affiliation information from Auth database\n",
1516
"- Merges and exports data to Excel format\n",
1617
"\n",
1718
"## Output\n",
@@ -31,6 +32,15 @@
3132
"%pip install openpyxl"
3233
]
3334
},
35+
{
36+
"cell_type": "markdown",
37+
"metadata": {},
38+
"source": [
39+
"## Import Libraries and Load Configuration\n",
40+
"\n",
41+
"Import required libraries and load environment variables. "
42+
]
43+
},
3444
{
3545
"cell_type": "code",
3646
"execution_count": null,
@@ -55,6 +65,8 @@
5565
" \"corp_type\": \"Type\",\n",
5666
" \"status\": \"Migration Status\",\n",
5767
" \"date\": \"Migrated Date\",\n",
68+
" \"affiliated\": \"Affiliated\",\n",
69+
" \"account\": \"Account ID\",\n",
5870
" \"filings\": \"Filings Done\",\n",
5971
" \"filing_date\": \"Last Filing Date\"\n",
6072
"}\n",
@@ -70,6 +82,8 @@
7082
" COLUMN_NAMES[\"corp_type\"],\n",
7183
" COLUMN_NAMES[\"status\"],\n",
7284
" COLUMN_NAMES[\"date\"],\n",
85+
" COLUMN_NAMES[\"affiliated\"],\n",
86+
" COLUMN_NAMES[\"account\"],\n",
7387
" COLUMN_NAMES[\"filings\"],\n",
7488
" COLUMN_NAMES[\"filing_date\"]\n",
7589
" ],\n",
@@ -83,7 +97,6 @@
8397
"# Configuration\n",
8498
"BATCH_SIZE = CONFIG['batch_size']\n",
8599
"FINAL_EXCEL_FIELDS = CONFIG['final_excel_fields']\n",
86-
"MIG_GROUP_IDS = os.getenv('MIG_GROUP_IDS')\n",
87100
"MIG_GROUP_IDS = [int(x.strip()) for x in os.getenv('MIG_GROUP_IDS').split(',') if x.strip().isdigit()]\n",
88101
"\n",
89102
"if not MIG_GROUP_IDS:\n",
@@ -100,7 +113,7 @@
100113
"source": [
101114
"## Database Setup\n",
102115
"\n",
103-
"Configure database connections for COLIN Extract and LEAR databases using environment variables."
116+
"Configure database connections using environment variables."
104117
]
105118
},
106119
{
@@ -123,6 +136,13 @@
123136
" 'host': os.getenv(\"DATABASE_LEAR_HOST\"),\n",
124137
" 'port': os.getenv(\"DATABASE_LEAR_PORT\"),\n",
125138
" 'name': os.getenv(\"DATABASE_LEAR_NAME\")\n",
139+
" },\n",
140+
" 'auth': {\n",
141+
" 'username': os.getenv(\"DATABASE_AUTH_USERNAME\"),\n",
142+
" 'password': os.getenv(\"DATABASE_AUTH_PASSWORD\"),\n",
143+
" 'host': os.getenv(\"DATABASE_AUTH_HOST\"),\n",
144+
" 'port': os.getenv(\"DATABASE_AUTH_PORT\"),\n",
145+
" 'name': os.getenv(\"DATABASE_AUTH_NAME\")\n",
126146
" }\n",
127147
"}\n",
128148
"\n",
@@ -173,8 +193,7 @@
173193
" print(f\"{db_key.upper()} unexpected error: {e}\")\n",
174194
" raise\n",
175195
"\n",
176-
"colin_engine = engines['colin_extract']\n",
177-
"lear_engine = engines['lear']\n",
196+
"ENGINE_NAMES = {engine: key for key, engine in engines.items()}\n",
178197
"\n",
179198
"print(\"All database engines ready for use.\")\n"
180199
]
@@ -201,7 +220,7 @@
201220
" mcb.corp_num AS \"{COLUMN_NAMES['corp_num']}\",\n",
202221
" c.admin_email AS \"{COLUMN_NAMES['email']}\",\n",
203222
" cn.corp_name AS \"{COLUMN_NAMES['corp_name']}\",\n",
204-
" cp.corp_type_cd AS \"{COLUMN_NAMES['corp_type']}\",\n",
223+
" c.corp_type_cd AS \"{COLUMN_NAMES['corp_type']}\",\n",
205224
" CASE\n",
206225
" WHEN cp.processed_status = 'COMPLETED' THEN 'Migrated'\n",
207226
" WHEN cp.processed_status IS NULL THEN 'Pending'\n",
@@ -228,17 +247,18 @@
228247
" OR cp.processed_status IS NULL\n",
229248
" )\n",
230249
"ORDER BY\n",
231-
" g.name, \n",
232-
" b.name,\n",
250+
" g.display_name, \n",
251+
" b.display_name,\n",
233252
" CASE\n",
234253
" WHEN cp.processed_status = 'COMPLETED' THEN 0\n",
235254
" ELSE 1\n",
236255
" END, \n",
237-
" cp.create_date DESC;\n",
256+
" cp.create_date DESC,\n",
257+
" cn.corp_name;\n",
238258
"\"\"\"\n",
239259
" \n",
240260
"try:\n",
241-
" with colin_engine.connect() as conn:\n",
261+
" with engines['colin_extract'].connect() as conn:\n",
242262
" colin_extract_df = pd.read_sql(colin_extract_query, conn)\n",
243263
"\n",
244264
" if colin_extract_df.empty:\n",
@@ -255,6 +275,55 @@
255275
" display(colin_extract_df)\n"
256276
]
257277
},
278+
{
279+
"cell_type": "markdown",
280+
"metadata": {},
281+
"source": [
282+
"## Batch Query Function\n",
283+
"A function to perform batch queries across multiple databases."
284+
]
285+
},
286+
{
287+
"cell_type": "code",
288+
"execution_count": 15,
289+
"metadata": {},
290+
"outputs": [],
291+
"source": [
292+
"def batch_query(query_sql, db_engine, batch_size, columns):\n",
293+
" # Get unique corporation numbers from the dataset\n",
294+
" unique_corp_nums = colin_extract_df[COLUMN_NAMES['corp_num']].unique().tolist()\n",
295+
" corp_number_batches = [unique_corp_nums[i:i + batch_size] for i in range(0, len(unique_corp_nums), batch_size)]\n",
296+
" db_name = ENGINE_NAMES.get(db_engine, \"Unknown database\")\n",
297+
" batch_results = []\n",
298+
" \n",
299+
" # Process each batch of corporation numbers\n",
300+
" for batch_idx, current_batch_corp_numbers in enumerate(corp_number_batches):\n",
301+
" if not current_batch_corp_numbers:\n",
302+
" continue\n",
303+
" try:\n",
304+
" with db_engine.connect() as conn:\n",
305+
" df = pd.read_sql(query_sql, conn, params={'identifiers': current_batch_corp_numbers})\n",
306+
" \n",
307+
" # Store results from this batch\n",
308+
" batch_results.append(df)\n",
309+
" print(f\"{db_name} Batch {batch_idx+1}: {len(df)} records fetched\")\n",
310+
" \n",
311+
" except Exception as e:\n",
312+
" print(f\"{db_name} Batch {batch_idx+1}/{len(corp_number_batches)} failed: {e}\")\n",
313+
" continue\n",
314+
" \n",
315+
" # Process combined results\n",
316+
" if batch_results:\n",
317+
" combined_df = pd.concat(batch_results, ignore_index=True)\n",
318+
" combined_df = combined_df.drop_duplicates(COLUMN_NAMES['corp_num'], keep='last')\n",
319+
" print(f\"Total records fetched: {len(combined_df)}\")\n",
320+
" else:\n",
321+
" combined_df = pd.DataFrame(columns=columns)\n",
322+
" print(f\"No records fetched\")\n",
323+
" \n",
324+
" return combined_df"
325+
]
326+
},
258327
{
259328
"cell_type": "markdown",
260329
"metadata": {},
@@ -273,7 +342,7 @@
273342
"lear_combined_query = f\"\"\"\n",
274343
"SELECT \n",
275344
" b.id,\n",
276-
" b.identifier,\n",
345+
" b.identifier AS \"{COLUMN_NAMES['corp_num']}\",\n",
277346
" COALESCE(\n",
278347
" STRING_AGG(f.filing_type, ', ' ORDER BY f.filing_type), \n",
279348
" ''\n",
@@ -287,39 +356,56 @@
287356
"GROUP BY b.id, b.identifier;\n",
288357
"\"\"\"\n",
289358
"\n",
290-
"corp_nums = colin_extract_df[COLUMN_NAMES['corp_num']].unique().tolist()\n",
291-
"batches_identifiers = [corp_nums[i:i + BATCH_SIZE] for i in range(0, len(corp_nums), BATCH_SIZE)]\n",
359+
"lear_combined_df = batch_query(\n",
360+
" query_sql=lear_combined_query,\n",
361+
" db_engine=engines['lear'],\n",
362+
" batch_size=BATCH_SIZE,\n",
363+
" columns=['id', COLUMN_NAMES['corp_num'], COLUMN_NAMES[\"filings\"], COLUMN_NAMES[\"filing_date\"]]\n",
364+
")\n",
292365
"\n",
293-
"# Execute combined query with batch processing\n",
294-
"lear_combined_results = []\n",
295-
"for idx, batch_identifiers in enumerate(batches_identifiers):\n",
296-
" if not batch_identifiers:\n",
297-
" continue\n",
298-
" try:\n",
299-
" with lear_engine.connect() as conn:\n",
300-
" df = pd.read_sql(\n",
301-
" lear_combined_query,\n",
302-
" conn,\n",
303-
" params={\"identifiers\": batch_identifiers}\n",
304-
" )\n",
305-
" \n",
306-
" lear_combined_results.append(df)\n",
307-
" print(f\"Batch {idx+1}: {len(df)} records fetched\")\n",
308-
" except Exception as e:\n",
309-
" print(f\"Batch {idx+1}/{len(batches_identifiers)} failed: {e}\")\n",
310-
" continue\n",
366+
"# Display results\n",
367+
"with pd.option_context('display.max_rows', None):\n",
368+
" display(lear_combined_df)\n"
369+
]
370+
},
371+
{
372+
"cell_type": "markdown",
373+
"metadata": {},
374+
"source": [
375+
"## Get Affiliation Data\n",
376+
"\n",
377+
"Query the Auth database to get affiliation information, including whether corporations are affiliated and their account IDs."
378+
]
379+
},
380+
{
381+
"cell_type": "code",
382+
"execution_count": null,
383+
"metadata": {},
384+
"outputs": [],
385+
"source": [
386+
"auth_query = f\"\"\"\n",
387+
"SELECT\n",
388+
" e.business_identifier AS \"{COLUMN_NAMES['corp_num']}\",\n",
389+
" CASE WHEN a.id IS NOT NULL THEN 'Y' ELSE 'N' END AS \"{COLUMN_NAMES['affiliated']}\",\n",
390+
" a.org_id AS \"{COLUMN_NAMES['account']}\"\n",
391+
"FROM\n",
392+
" entities e\n",
393+
"LEFT JOIN\n",
394+
" affiliations a ON e.id = a.entity_id\n",
395+
"WHERE\n",
396+
" e.business_identifier = ANY(%(identifiers)s)\n",
397+
"\"\"\"\n",
311398
"\n",
312-
"# Process combied results\n",
313-
"if lear_combined_results:\n",
314-
" lear_combined_df = pd.concat(lear_combined_results, ignore_index=True)\n",
315-
" lear_combined_df = lear_combined_df.drop_duplicates('identifier', keep='last')\n",
316-
" print(f\"Total combined records fetched: {len(lear_combined_df)}\")\n",
317-
"else:\n",
318-
" lear_combined_df = pd.DataFrame(columns=['id', 'identifier', 'Filings Done', 'Last Filing Date'])\n",
399+
"auth_combined_df = batch_query(\n",
400+
" query_sql=auth_query,\n",
401+
" db_engine=engines['auth'],\n",
402+
" batch_size=BATCH_SIZE,\n",
403+
" columns=[COLUMN_NAMES['corp_num'], COLUMN_NAMES['affiliated'], COLUMN_NAMES['account']]\n",
404+
")\n",
319405
"\n",
320406
"# Display results\n",
321407
"with pd.option_context('display.max_rows', None):\n",
322-
" display(lear_combined_df)\n"
408+
" display(auth_combined_df)"
323409
]
324410
},
325411
{
@@ -328,7 +414,7 @@
328414
"source": [
329415
"## Merge Data\n",
330416
"\n",
331-
"Merge COLIN Extract migration data with LEAR filing data into a merged dataset."
417+
"Combine data from COLIN Extract, LEAR, and Auth databases into a merged dataset."
332418
]
333419
},
334420
{
@@ -340,12 +426,16 @@
340426
"try:\n",
341427
" result = (colin_extract_df\n",
342428
" .merge(lear_combined_df, \n",
343-
" left_on=COLUMN_NAMES['corp_num'], \n",
344-
" right_on='identifier', \n",
345-
" how='left'))\n",
429+
" on=COLUMN_NAMES['corp_num'], \n",
430+
" how='left')\n",
431+
" .merge(auth_combined_df,\n",
432+
" on=COLUMN_NAMES['corp_num'],\n",
433+
" how='left') \n",
434+
" )\n",
346435
" \n",
347436
" # Select final fields\n",
348437
" merged_df = result[FINAL_EXCEL_FIELDS]\n",
438+
" \n",
349439
" print(f\"Data merged successfully: {len(merged_df)} rows\")\n",
350440
" \n",
351441
"except Exception as e:\n",

0 commit comments

Comments
 (0)