-
Notifications
You must be signed in to change notification settings - Fork 0
Update mimic_omop template #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…5-892d-13aec8b34de6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Updates the MIMIC_OMOP flow template, primarily intended to remove redundant/extra graph edges in the MIMIC→OMOP pipeline.
Changes:
- Simplifies the flow graph by reducing edges to a linear chain between key nodes.
- Re-exports/rewrites large parts of the template JSON (new node IDs/positions and some node renames).
- Introduces/relocates DuckDB→DB export logic into the
duckdb_databasenode.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "python_code": "def exec(myinput):\n # time.sleep(600)\n logger = get_run_logger()\n if flow_action_type in ['mimic_to_database']:\n logger.info(\"<--------- Cleaning up DuckDB file --------->\")\n # Get options\n os.remove(duckdb_file_name)\n logger.info(f\"File '{duckdb_file_name}' deleted successfully.\")\n logger.info(\"<--------- Workflow complete --------->\")\n" | ||
| "name": "duckdb_database", | ||
| "description": "Describe the task of node python_node_5", | ||
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the duckdb_database node, database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL always selects PostgresDDL because SupportedDatabaseDialects.POSTGRES is a truthy enum value. This will break HANA exports. Use the actual runtime dialect (e.g., dialect == SupportedDatabaseDialects.POSTGRES) to choose the correct DDL file.
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" | |
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if dialect == SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" |
| "python_code": "def exec(myinput):\n # time.sleep(600)\n logger = get_run_logger()\n if flow_action_type in ['mimic_to_database']:\n logger.info(\"<--------- Cleaning up DuckDB file --------->\")\n # Get options\n os.remove(duckdb_file_name)\n logger.info(f\"File '{duckdb_file_name}' deleted successfully.\")\n logger.info(\"<--------- Workflow complete --------->\")\n" | ||
| "name": "duckdb_database", | ||
| "description": "Describe the task of node python_node_5", | ||
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The duckdb_database node’s python_code uses np.nan and text(...), but neither numpy nor text are imported in the node code or provided via importLibs (currently only get_run_logger, duckdb, os, glob). This will raise NameError when the HANA branch runs. Add the missing imports (e.g., import numpy as np and the appropriate text import) either to importLibs or inside the node.
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" | |
| "python_code": "import time\nimport duckdb\nimport numpy as np\nfrom sqlalchemy import text\nfrom prefect.logging import get_run_logger\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" |
| "python_code": "def exec(myinput):\n # time.sleep(600)\n logger = get_run_logger()\n if flow_action_type in ['mimic_to_database']:\n logger.info(\"<--------- Cleaning up DuckDB file --------->\")\n # Get options\n os.remove(duckdb_file_name)\n logger.info(f\"File '{duckdb_file_name}' deleted successfully.\")\n logger.info(\"<--------- Workflow complete --------->\")\n" | ||
| "name": "duckdb_database", | ||
| "description": "Describe the task of node python_node_5", | ||
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The duckdb_database node uses eval(use_cache_db) and eval(overwrite_schema) to parse booleans. If these variables are user-configurable, eval enables arbitrary code execution. Replace eval(...) with safe boolean parsing (e.g., strict string comparison, json.loads, or an explicit mapping of allowed values).
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" | |
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n\n\ndef _parse_bool(value):\n if isinstance(value, bool):\n return value\n if isinstance(value, str):\n value_normalized = value.strip().lower()\n if value_normalized in (\"true\", \"1\", \"yes\", \"y\", \"on\"):\n return True\n if value_normalized in (\"false\", \"0\", \"no\", \"n\", \"off\"):\n return False\n raise ValueError(f\"Invalid boolean value: {value!r}\")\n\n\ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=_parse_bool(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=_parse_bool(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" |
| "id": "e14c18c7-2e0c-4329-9a38-88c3bb2889c3", | ||
| "data": { | ||
| "name": "cleanup", | ||
| "description": "Describe the task of node python_node_6", | ||
| "python_code": "def exec(myinput):\n # time.sleep(600)\n logger = get_run_logger()\n if flow_action_type in ['mimic_to_database']:\n logger.info(\"<--------- Cleaning up DuckDB file --------->\")\n # Get options\n os.remove(duckdb_file_name)\n logger.info(f\"File '{duckdb_file_name}' deleted successfully.\")\n logger.info(\"<--------- Workflow complete --------->\")\n" | ||
| "name": "duckdb_database", | ||
| "description": "Describe the task of node python_node_5", | ||
| "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR description says "Remove extra edges", but this template update also renames nodes (e.g., duckdb_to_database -> duckdb_database, cdm_tables -> cdm_table) and changes which node performs the DB export vs cleanup. If that’s intended, please update the PR description to reflect these additional changes; otherwise consider reverting the non-edge modifications.
…-a92d-9cc36d4f66de
Remove extra edges