diff --git a/flows/mimic_omop.json b/flows/mimic_omop.json index efb8d8d..841f8b0 100644 --- a/flows/mimic_omop.json +++ b/flows/mimic_omop.json @@ -2,237 +2,227 @@ "name": "MIMIC_OMOP", "description": "Load MIMIC data into OMOP CDM format", "edges": [ - { - "id": "reactflow__edge-89534f59-2a71-4088-8c42-eabb6ae6435fsource_89534f59-2a71-4088-8c42-eabb6ae6435f_object-cb55047a-bd77-4049-9250-9f37200fc4e9target_cb55047a-bd77-4049-9250-9f37200fc4e9_any", - "source": "89534f59-2a71-4088-8c42-eabb6ae6435f", - "target": "cb55047a-bd77-4049-9250-9f37200fc4e9", - "selected": false, - "sourceHandle": "source_89534f59-2a71-4088-8c42-eabb6ae6435f_object", - "targetHandle": "target_cb55047a-bd77-4049-9250-9f37200fc4e9_any" - }, - { - "id": "reactflow__edge-89534f59-2a71-4088-8c42-eabb6ae6435fsource_89534f59-2a71-4088-8c42-eabb6ae6435f_object-1267dc31-e42f-4fd5-b6bc-0155f7fa152atarget_1267dc31-e42f-4fd5-b6bc-0155f7fa152a_any", - "source": "89534f59-2a71-4088-8c42-eabb6ae6435f", - "target": "1267dc31-e42f-4fd5-b6bc-0155f7fa152a", - "selected": false, - "sourceHandle": "source_89534f59-2a71-4088-8c42-eabb6ae6435f_object", - "targetHandle": "target_1267dc31-e42f-4fd5-b6bc-0155f7fa152a_any" - }, - { - "id": "reactflow__edge-cb55047a-bd77-4049-9250-9f37200fc4e9source_cb55047a-bd77-4049-9250-9f37200fc4e9_object-1267dc31-e42f-4fd5-b6bc-0155f7fa152atarget_1267dc31-e42f-4fd5-b6bc-0155f7fa152a_any", - "source": "cb55047a-bd77-4049-9250-9f37200fc4e9", - "target": "1267dc31-e42f-4fd5-b6bc-0155f7fa152a", - "selected": false, - "sourceHandle": "source_cb55047a-bd77-4049-9250-9f37200fc4e9_object", - "targetHandle": "target_1267dc31-e42f-4fd5-b6bc-0155f7fa152a_any" - }, - { - "id": "reactflow__edge-1267dc31-e42f-4fd5-b6bc-0155f7fa152asource_1267dc31-e42f-4fd5-b6bc-0155f7fa152a_object-27f3ea40-5402-4503-b244-1a8ce91ec60atarget_27f3ea40-5402-4503-b244-1a8ce91ec60a_any", - "source": "1267dc31-e42f-4fd5-b6bc-0155f7fa152a", - "target": "27f3ea40-5402-4503-b244-1a8ce91ec60a", - "selected": false, - "sourceHandle": "source_1267dc31-e42f-4fd5-b6bc-0155f7fa152a_object", - "targetHandle": "target_27f3ea40-5402-4503-b244-1a8ce91ec60a_any" - }, - { - "id": "reactflow__edge-89534f59-2a71-4088-8c42-eabb6ae6435fsource_89534f59-2a71-4088-8c42-eabb6ae6435f_object-27f3ea40-5402-4503-b244-1a8ce91ec60atarget_27f3ea40-5402-4503-b244-1a8ce91ec60a_any", - "source": "89534f59-2a71-4088-8c42-eabb6ae6435f", - "target": "27f3ea40-5402-4503-b244-1a8ce91ec60a", - "selected": false, - "sourceHandle": "source_89534f59-2a71-4088-8c42-eabb6ae6435f_object", - "targetHandle": "target_27f3ea40-5402-4503-b244-1a8ce91ec60a_any" - }, - { - "id": "reactflow__edge-ab8bb824-e5f2-4d35-9885-291f7387b184source_ab8bb824-e5f2-4d35-9885-291f7387b184_object-0044e283-1c05-417c-b88c-9b2dd07f6f23target_0044e283-1c05-417c-b88c-9b2dd07f6f23_any", - "source": "ab8bb824-e5f2-4d35-9885-291f7387b184", - "target": "0044e283-1c05-417c-b88c-9b2dd07f6f23", - "selected": false, - "sourceHandle": "source_ab8bb824-e5f2-4d35-9885-291f7387b184_object", - "targetHandle": "target_0044e283-1c05-417c-b88c-9b2dd07f6f23_any" - }, - { - "id": "reactflow__edge-0044e283-1c05-417c-b88c-9b2dd07f6f23source_0044e283-1c05-417c-b88c-9b2dd07f6f23_object-4fd78512-3896-472c-be32-a303f915a0aetarget_4fd78512-3896-472c-be32-a303f915a0ae_any", - "source": "0044e283-1c05-417c-b88c-9b2dd07f6f23", - "target": "4fd78512-3896-472c-be32-a303f915a0ae", - "selected": false, - "sourceHandle": "source_0044e283-1c05-417c-b88c-9b2dd07f6f23_object", - "targetHandle": "target_4fd78512-3896-472c-be32-a303f915a0ae_any" - }, - { - "id": "reactflow__edge-4fd78512-3896-472c-be32-a303f915a0aesource_4fd78512-3896-472c-be32-a303f915a0ae_object-ef8a714c-4c71-4318-af85-20de8699a4edtarget_ef8a714c-4c71-4318-af85-20de8699a4ed_any", - "source": "4fd78512-3896-472c-be32-a303f915a0ae", - "target": "ef8a714c-4c71-4318-af85-20de8699a4ed", - "selected": false, - "sourceHandle": "source_4fd78512-3896-472c-be32-a303f915a0ae_object", - "targetHandle": "target_ef8a714c-4c71-4318-af85-20de8699a4ed_any" - }, - { - "id": "reactflow__edge-89534f59-2a71-4088-8c42-eabb6ae6435fsource_89534f59-2a71-4088-8c42-eabb6ae6435f_object-ab8bb824-e5f2-4d35-9885-291f7387b184target_ab8bb824-e5f2-4d35-9885-291f7387b184_any", - "source": "89534f59-2a71-4088-8c42-eabb6ae6435f", - "target": "ab8bb824-e5f2-4d35-9885-291f7387b184", - "selected": false, - "sourceHandle": "source_89534f59-2a71-4088-8c42-eabb6ae6435f_object", - "targetHandle": "target_ab8bb824-e5f2-4d35-9885-291f7387b184_any" - }, - { - "id": "reactflow__edge-89534f59-2a71-4088-8c42-eabb6ae6435fsource_89534f59-2a71-4088-8c42-eabb6ae6435f_object-0044e283-1c05-417c-b88c-9b2dd07f6f23target_0044e283-1c05-417c-b88c-9b2dd07f6f23_any", - "source": "89534f59-2a71-4088-8c42-eabb6ae6435f", - "target": "0044e283-1c05-417c-b88c-9b2dd07f6f23", - "selected": false, - "sourceHandle": "source_89534f59-2a71-4088-8c42-eabb6ae6435f_object", - "targetHandle": "target_0044e283-1c05-417c-b88c-9b2dd07f6f23_any" - }, - { - "id": "reactflow__edge-89534f59-2a71-4088-8c42-eabb6ae6435fsource_89534f59-2a71-4088-8c42-eabb6ae6435f_object-4fd78512-3896-472c-be32-a303f915a0aetarget_4fd78512-3896-472c-be32-a303f915a0ae_any", - "source": "89534f59-2a71-4088-8c42-eabb6ae6435f", - "target": "4fd78512-3896-472c-be32-a303f915a0ae", - "selected": false, - "sourceHandle": "source_89534f59-2a71-4088-8c42-eabb6ae6435f_object", - "targetHandle": "target_4fd78512-3896-472c-be32-a303f915a0ae_any" - }, - { - "id": "reactflow__edge-89534f59-2a71-4088-8c42-eabb6ae6435fsource_89534f59-2a71-4088-8c42-eabb6ae6435f_object-ef8a714c-4c71-4318-af85-20de8699a4edtarget_ef8a714c-4c71-4318-af85-20de8699a4ed_any", - "source": "89534f59-2a71-4088-8c42-eabb6ae6435f", - "target": "ef8a714c-4c71-4318-af85-20de8699a4ed", - "selected": false, - "sourceHandle": "source_89534f59-2a71-4088-8c42-eabb6ae6435f_object", - "targetHandle": "target_ef8a714c-4c71-4318-af85-20de8699a4ed_any" - }, - { - "id": "reactflow__edge-27f3ea40-5402-4503-b244-1a8ce91ec60asource_27f3ea40-5402-4503-b244-1a8ce91ec60a_object-ab8bb824-e5f2-4d35-9885-291f7387b184target_ab8bb824-e5f2-4d35-9885-291f7387b184_any", - "source": "27f3ea40-5402-4503-b244-1a8ce91ec60a", - "target": "ab8bb824-e5f2-4d35-9885-291f7387b184", - "selected": false, - "sourceHandle": "source_27f3ea40-5402-4503-b244-1a8ce91ec60a_object", - "targetHandle": "target_ab8bb824-e5f2-4d35-9885-291f7387b184_any" - } + { + "id": "reactflow__edge-c248a872-32c7-4d0a-998e-c34970383258source_c248a872-32c7-4d0a-998e-c34970383258_object-91b93d8e-5a1c-4f7b-9b2e-9691f05de35ctarget_91b93d8e-5a1c-4f7b-9b2e-9691f05de35c_any", + "source": "c248a872-32c7-4d0a-998e-c34970383258", + "target": "91b93d8e-5a1c-4f7b-9b2e-9691f05de35c", + "sourceHandle": "source_c248a872-32c7-4d0a-998e-c34970383258_object", + "targetHandle": "target_91b93d8e-5a1c-4f7b-9b2e-9691f05de35c_any" + }, + { + "id": "reactflow__edge-91b93d8e-5a1c-4f7b-9b2e-9691f05de35csource_91b93d8e-5a1c-4f7b-9b2e-9691f05de35c_object-214d4e9b-4b2b-49ad-86ff-81d6c491cb5dtarget_214d4e9b-4b2b-49ad-86ff-81d6c491cb5d_any", + "source": "91b93d8e-5a1c-4f7b-9b2e-9691f05de35c", + "target": "214d4e9b-4b2b-49ad-86ff-81d6c491cb5d", + "sourceHandle": "source_91b93d8e-5a1c-4f7b-9b2e-9691f05de35c_object", + "targetHandle": "target_214d4e9b-4b2b-49ad-86ff-81d6c491cb5d_any" + }, + { + "id": "reactflow__edge-214d4e9b-4b2b-49ad-86ff-81d6c491cb5dsource_214d4e9b-4b2b-49ad-86ff-81d6c491cb5d_object-a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842btarget_a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842b_any", + "source": "214d4e9b-4b2b-49ad-86ff-81d6c491cb5d", + "target": "a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842b", + "selected": false, + "sourceHandle": "source_214d4e9b-4b2b-49ad-86ff-81d6c491cb5d_object", + "targetHandle": "target_a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842b_any" + }, + { + "id": "reactflow__edge-a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842bsource_a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842b_object-1dd178ef-10b5-48a5-ad22-fbf83468c06ftarget_1dd178ef-10b5-48a5-ad22-fbf83468c06f_any", + "source": "a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842b", + "target": "1dd178ef-10b5-48a5-ad22-fbf83468c06f", + "selected": false, + "sourceHandle": "source_a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842b_object", + "targetHandle": "target_1dd178ef-10b5-48a5-ad22-fbf83468c06f_any" + }, + { + "id": "reactflow__edge-1dd178ef-10b5-48a5-ad22-fbf83468c06fsource_1dd178ef-10b5-48a5-ad22-fbf83468c06f_object-e14c18c7-2e0c-4329-9a38-88c3bb2889c3target_e14c18c7-2e0c-4329-9a38-88c3bb2889c3_any", + "source": "1dd178ef-10b5-48a5-ad22-fbf83468c06f", + "target": "e14c18c7-2e0c-4329-9a38-88c3bb2889c3", + "selected": false, + "sourceHandle": "source_1dd178ef-10b5-48a5-ad22-fbf83468c06f_object", + "targetHandle": "target_e14c18c7-2e0c-4329-9a38-88c3bb2889c3_any" + }, + { + "id": "reactflow__edge-e14c18c7-2e0c-4329-9a38-88c3bb2889c3source_e14c18c7-2e0c-4329-9a38-88c3bb2889c3_object-9300bf89-0bf6-49b0-ab42-2bdd9747cb56target_9300bf89-0bf6-49b0-ab42-2bdd9747cb56_any", + "source": "e14c18c7-2e0c-4329-9a38-88c3bb2889c3", + "target": "9300bf89-0bf6-49b0-ab42-2bdd9747cb56", + "selected": false, + "sourceHandle": "source_e14c18c7-2e0c-4329-9a38-88c3bb2889c3_object", + "targetHandle": "target_9300bf89-0bf6-49b0-ab42-2bdd9747cb56_any" + } ], "nodes": [ { - "id": "cb55047a-bd77-4049-9250-9f37200fc4e9", + "id": "c248a872-32c7-4d0a-998e-c34970383258", "data": { "name": "load_mimic_data", - "description": "Describe the task of node python_node_1", - "python_code": "def initialize_id_sequence(conn):\n conn.execute('CREATE SEQUENCE IF NOT EXISTS main.id_sequence;')\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef make_table_name(file_path):\n basename = os.path.basename(file_path)\n table_name = os.path.splitext(basename)[0]\n pathname = os.path.dirname(file_path)\n dirname = os.path.basename(pathname)\n return f\"mimiciv_{dirname}.{table_name}\", dirname\n\ndef exec(myinput):\n logger = get_run_logger()\n if (flow_action_type in ['mimic_to_database','mimic_to_duckdb']) and load_mimic_vocab == \"True\":\n logger.info('Start loading mimic data')\n with duckdb.connect(duckdb_file_name) as conn:\n # initialize_id_sequence\n initialize_id_sequence(conn)\n # create tables\n create_table(conn, MIMICCreateSql)\n # copy csv tables\n csv_files = glob.glob(os.path.join(mimic_dir, \"**/*.csv*\"), recursive=True)\n for file_path in csv_files:\n table_name, dir_name = make_table_name(file_path)\n if not dir_name in ['hosp','icu']:\n continue\n try:\n sql_qry = f\"\"\"COPY {table_name} FROM '{file_path}' (ESCAPE '\\\"', HEADER);\"\"\"\n conn.execute(sql_qry)\n print(f\"Loading {table_name} done\")\n except Exception as e:\n print(f\"Error loading {file_path}: {str(e)}\")\n raise Exception()\n return \"Load mimic data successful\"\n" + "error": false, + "result": "{\n \"error\": false,\n \"errorMessage\": null,\n \"nodeName\": \"load_mimic_data\",\n \"length\": 26,\n \"type\": \"\"\n}", + "description": "Describe the task of node python_node_0", + "python_code": "def initialize_id_sequence(conn):\n conn.execute('CREATE SEQUENCE IF NOT EXISTS main.id_sequence;')\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef make_table_name(file_path):\n basename = os.path.basename(file_path)\n table_name = os.path.splitext(basename)[0]\n pathname = os.path.dirname(file_path)\n dirname = os.path.basename(pathname)\n return f\"mimiciv_{dirname}.{table_name}\", dirname\n\ndef exec(myinput):\n logger = get_run_logger()\n if (flow_action_type in ['mimic_to_database','mimic_to_duckdb']) and load_mimic_vocab == \"True\":\n logger.info('Start loading mimic data')\n with duckdb.connect(duckdb_file_name) as conn:\n # initialize_id_sequence\n initialize_id_sequence(conn)\n # create tables\n create_table(conn, MIMICCreateSql)\n # copy csv tables\n csv_files = glob.glob(os.path.join(mimic_dir, \"**/*.csv*\"), recursive=True)\n for file_path in csv_files:\n table_name, dir_name = make_table_name(file_path)\n if not dir_name in ['hosp','icu']:\n continue\n try:\n sql_qry = f\"\"\"COPY {table_name} FROM '{file_path}' (ESCAPE '\\\"', HEADER);\"\"\"\n conn.execute(sql_qry)\n print(f\"Loading {table_name} done\")\n except Exception as e:\n print(f\"Error loading {file_path}: {str(e)}\")\n raise Exception()\n return \"Load mimic data successful\"\n", + "errorMessage": null }, "type": "python_node", "width": 350, "height": 210, "dragging": false, - "position": { "x": -2830, "y": -950 }, + "position": { + "x": 130, + "y": 110 + }, "selected": false, "dragHandle": "", "sourcePosition": "right", "targetPosition": "left", - "positionAbsolute": { "x": -2830, "y": -950 } + "positionAbsolute": { + "x": 130, + "y": 110 + } }, { - "id": "1267dc31-e42f-4fd5-b6bc-0155f7fa152a", + "id": "91b93d8e-5a1c-4f7b-9b2e-9691f05de35c", "data": { "name": "load_vocab", - "description": "Describe the task of node python_node_3", - "python_code": "def execute_raw_sql_from_file(conn, dir_root, sql_files):\n for sql_file in sql_files:\n with open(os.path.join(dir_root, sql_file), 'r') as file:\n query = file.read()\n conn.execute(query)\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef create_schema(conn, schema_name:str):\n conn.execute(f\"\"\"\n DROP SCHEMA IF EXISTS {schema_name} CASCADE ;\n CREATE SCHEMA {schema_name} ;\n \"\"\")\n\ndef exec(myinput):\n logger = get_run_logger()\n\n if (flow_action_type in ['mimic_to_database','mimic_to_duckdb']) and load_mimic_vocab == \"True\":\n logger.info('Start loading vocab')\n with duckdb.connect(duckdb_file_name) as conn:\n create_schema(conn, 'mimic_staging')\n create_table(conn, CreateVocabTable)\n # insert vocab tables\n csv_files = glob.glob(os.path.join(vocab_dir, \"**/*.csv*\"), recursive=True)\n for file in csv_files:\n if 'CONCEPT_CPT4' in file:\n continue\n csv_name = os.path.basename(file).split('.')[0].lower()\n table_name = '_'.join(['mimic_staging.tmp', csv_name])\n try:\n query = f\"\"\"\n TRUNCATE TABLE {table_name};\n COPY {table_name} FROM '{file}' (DATEFORMAT '%Y%m%d', DELIMITER '\\t', FORMAT CSV, HEADER, QUOTE '\"',ESCAPE '\\\"');\n \"\"\"\n conn.execute(query)\n print(f\"Loading {table_name} done\")\n except Exception as e:\n print(f\"Error loading {file}: {str(e)}\")\n raise Exception()\n # generate_custom_vocab\n\n CustomVocabSqls = ['create_voc_from_tmp.sql', 'custom_mapping_load.sql', 'custom_vocabularies.sql', 'vocabulary_cleanup.sql']\n try: \n execute_raw_sql_from_file(conn, CustomVocabDir, CustomVocabSqls)\n except Exception as e:\n print(f\"Error generating custom vocabulories: {str(e)}\")\n raise Exception()" + "error": false, + "result": "{\n \"error\": false,\n \"errorMessage\": null,\n \"nodeName\": \"load_vocab\",\n \"length\": null,\n \"type\": \"\"\n}", + "description": "Describe the task of node python_node_1", + "python_code": "def execute_raw_sql_from_file(conn, dir_root, sql_files):\n for sql_file in sql_files:\n with open(os.path.join(dir_root, sql_file), 'r') as file:\n query = file.read()\n conn.execute(query)\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef create_schema(conn, schema_name:str):\n conn.execute(f\"\"\"\n DROP SCHEMA IF EXISTS {schema_name} CASCADE ;\n CREATE SCHEMA {schema_name} ;\n \"\"\")\n\ndef exec(myinput):\n logger = get_run_logger()\n\n if (flow_action_type in ['mimic_to_database','mimic_to_duckdb']) and load_mimic_vocab == \"True\":\n logger.info('Start loading vocab')\n with duckdb.connect(duckdb_file_name) as conn:\n create_schema(conn, 'mimic_staging')\n create_table(conn, CreateVocabTable)\n # insert vocab tables\n csv_files = glob.glob(os.path.join(vocab_dir, \"**/*.csv*\"), recursive=True)\n for file in csv_files:\n if 'CONCEPT_CPT4' in file:\n continue\n csv_name = os.path.basename(file).split('.')[0].lower()\n table_name = '_'.join(['mimic_staging.tmp', csv_name])\n try:\n query = f\"\"\"\n TRUNCATE TABLE {table_name};\n COPY {table_name} FROM '{file}' (DATEFORMAT '%Y%m%d', DELIMITER '\\t', FORMAT CSV, HEADER, QUOTE '\"',ESCAPE '\\\"');\n \"\"\"\n conn.execute(query)\n print(f\"Loading {table_name} done\")\n except Exception as e:\n print(f\"Error loading {file}: {str(e)}\")\n raise Exception()\n # generate_custom_vocab\n\n CustomVocabSqls = ['create_voc_from_tmp.sql', 'custom_mapping_load.sql', 'custom_vocabularies.sql', 'vocabulary_cleanup.sql']\n try: \n execute_raw_sql_from_file(conn, CustomVocabDir, CustomVocabSqls)\n except Exception as e:\n print(f\"Error generating custom vocabulories: {str(e)}\")\n raise Exception()", + "errorMessage": null }, "type": "python_node", "width": 350, "height": 210, "dragging": false, - "position": { "x": -2410, "y": -830 }, + "position": { + "x": 580, + "y": 110 + }, "selected": false, "dragHandle": "", "sourcePosition": "right", "targetPosition": "left", - "positionAbsolute": { "x": -2410, "y": -830 } + "positionAbsolute": { + "x": 580, + "y": 110 + } }, { - "id": "27f3ea40-5402-4503-b244-1a8ce91ec60a", + "id": "214d4e9b-4b2b-49ad-86ff-81d6c491cb5d", "data": { "name": "staging_mimic", - "description": "Describe the task of node python_node_4", + "description": "Describe the task of node python_node_2", "python_code": "import time\n\ndef execute_raw_sql_from_file(conn, dir_root, sql_files):\n for sql_file in sql_files:\n print(sql_file)\n with open(os.path.join(dir_root, sql_file), 'r') as file:\n query = file.read()\n conn.execute(query)\n\ndef create_schema(conn, schema_name:str):\n conn.execute(f\"\"\"\n DROP SCHEMA IF EXISTS {schema_name} CASCADE ;\n CREATE SCHEMA {schema_name} ;\n \"\"\")\n\ndef staging_mimic_data(conn, StagDir, StagSql):\n create_schema(conn, 'mimic_etl')\n execute_raw_sql_from_file(conn, StagDir, StagSql)\n\ndef exec(myinput):\n\n StagSql = [\"etl_OMOPCDM_postgresql_5.3_ddl_adapted_no_vocab.sql\", \n \"st_core.sql\",\n \"st_hosp.sql\",\n \"st_icu.sql\",\n \"voc_copy_to_target_dataset.sql\"]\n\n if flow_action_type in ['mimic_to_database','mimic_to_duckdb'] and load_mimic_vocab == \"True\":\n with duckdb.connect(duckdb_file_name) as conn:\n staging_mimic_data(conn, StagDir, StagSql)\n conn.execute(\"DROP SCHEMA mimiciv_hosp CASCADE\")\n conn.execute(\"DROP SCHEMA mimiciv_icu CASCADE\")\n conn.execute(\"DROP SCHEMA mimic_staging CASCADE\")" }, "type": "python_node", - "width": 350, - "height": 210, + "width": 354, + "height": 214, "dragging": false, - "position": { "x": -2000, "y": -700 }, + "position": { + "x": 1020, + "y": 110 + }, "selected": false, "dragHandle": "", "sourcePosition": "right", "targetPosition": "left", - "positionAbsolute": { "x": -2000, "y": -700 } + "positionAbsolute": { + "x": 1020, + "y": 110 + } }, { - "id": "ab8bb824-e5f2-4d35-9885-291f7387b184", + "id": "a73e5ebe-7a0d-4b49-b8d8-ed5e53dc842b", "data": { "name": "etl_transformation", - "description": "Describe the task of node python_node_4", + "description": "Describe the task of node python_node_3", "python_code": "def execute_raw_sql_from_file(conn, dir_root, sql_files):\n for sql_file in sql_files:\n print(sql_file)\n with open(os.path.join(dir_root, sql_file), 'r') as file:\n query = file.read()\n conn.execute(query)\n \ndef exec(myinput):\n logger = get_run_logger()\n\n ETLSqls = ['cdm_location.sql',\n 'cdm_care_site.sql',\n 'cdm_person.sql',\n 'cdm_death.sql',\n 'lk_vis_part_1.sql',\n 'lk_meas_unit.sql',\n # ULLIF(regexp_extract(),'') replace postgres(regexp_match)\n 'lk_meas_chartevents.sql',\n 'lk_meas_labevents.sql',\n 'lk_meas_specimen.sql',\n 'lk_vis_part_2.sql',\n 'cdm_visit_occurrence.sql',\n 'cdm_visit_detail.sql',\n 'lk_cond_diagnoses.sql',\n 'lk_procedure.sql',\n 'lk_observation.sql',\n 'cdm_condition_occurrence.sql',\n 'cdm_procedure_occurrence.sql',\n 'cdm_specimen.sql',\n 'cdm_measurement.sql',\n 'lk_drug.sql',\n 'cdm_drug_exposure.sql',\n 'cdm_device_exposure.sql',\n 'cdm_observation.sql',\n 'cdm_observation_period.sql',\n 'cdm_finalize_person.sql',\n 'cdm_fact_relationship.sql',\n 'cdm_condition_era.sql',\n 'cdm_drug_era.sql',\n 'cdm_dose_era.sql',\n 'ext_d_itemid_to_concept.sql',\n 'cdm_cdm_source.sql',\n 'extra_cdm_tables.sql']\n\n if flow_action_type in ['mimic_to_database','mimic_to_duckdb']:\n with duckdb.connect(duckdb_file_name) as conn:\n logger.info(\"*** Doing ETL transformations ***\")\n try:\n execute_raw_sql_from_file(conn, ETLDir, ETLSqls)\n except Exception as e:\n logger.error(f\"Error tranforming mimic: {str(e)}\")\n raise Exception()" }, "type": "python_node", - "width": 350, - "height": 210, + "width": 354, + "height": 214, "dragging": false, - "position": { "x": -1590, "y": -580 }, + "position": { + "x": 150, + "y": 550 + }, "selected": false, "dragHandle": "", "sourcePosition": "right", "targetPosition": "left", - "positionAbsolute": { "x": -1590, "y": -580 } + "positionAbsolute": { + "x": 150, + "y": 550 + } }, { - "id": "0044e283-1c05-417c-b88c-9b2dd07f6f23", + "id": "9300bf89-0bf6-49b0-ab42-2bdd9747cb56", "data": { - "name": "cdm_tables", - "description": "Describe the task of node python_node_5", + "name": "cdm_table", + "description": "Describe the task of node python_node_4", "python_code": "import duckdb\nimport os, glob\nimport time\nfrom prefect.logging import get_run_logger\n\ndef execute_raw_sql_from_file(conn, dir_root, sql_files):\n for sql_file in sql_files:\n print(sql_file)\n with open(os.path.join(dir_root, sql_file), 'r') as file:\n query = file.read()\n conn.execute(query)\n\ndef create_schema(conn, schema_name:str):\n conn.execute(f\"\"\"\n DROP SCHEMA IF EXISTS {schema_name} CASCADE ;\n CREATE SCHEMA {schema_name} ;\n \"\"\")\n \ndef exec(myinput):\n logger = get_run_logger()\n CdmSqls = ['OMOPCDM_duckdb_5.3_ddl_adapted.sql', 'unload_to_atlas_gen.sql']\n if flow_action_type in ['mimic_to_database','mimic_to_duckdb']:\n logger.info(\"*** Creating final CDM tables and copy data into them ***\")\n with duckdb.connect(duckdb_file_name) as conn:\n try:\n create_schema(conn, 'cdm')\n execute_raw_sql_from_file(conn, CdmDir, CdmSqls)\n except Exception as e:\n logger.error(f\"Error creating final cdm data: {str(e)}\")\n raise Exception()" }, "type": "python_node", - "width": 350, - "height": 210, + "width": 354, + "height": 214, "dragging": false, - "position": { "x": -1170, "y": -460 }, + "position": { + "x": 1540, + "y": 550 + }, "selected": false, "dragHandle": "", "sourcePosition": "right", "targetPosition": "left", - "positionAbsolute": { "x": -1170, "y": -460 } + "positionAbsolute": { + "x": 1540, + "y": 550 + } }, { - "id": "ef8a714c-4c71-4318-af85-20de8699a4ed", + "id": "e14c18c7-2e0c-4329-9a38-88c3bb2889c3", "data": { - "name": "cleanup", - "description": "Describe the task of node python_node_6", - "python_code": "def exec(myinput):\n # time.sleep(600)\n logger = get_run_logger()\n if flow_action_type in ['mimic_to_database']:\n logger.info(\"<--------- Cleaning up DuckDB file --------->\")\n # Get options\n os.remove(duckdb_file_name)\n logger.info(f\"File '{duckdb_file_name}' deleted successfully.\")\n logger.info(\"<--------- Workflow complete --------->\")\n" + "name": "duckdb_database", + "description": "Describe the task of node python_node_5", + "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" }, "type": "python_node", - "width": 350, - "height": 210, + "width": 354, + "height": 214, "dragging": false, - "position": { "x": -350, "y": -250 }, - "selected": true, + "position": { + "x": 1110, + "y": 550 + }, + "selected": false, "dragHandle": "", "sourcePosition": "right", "targetPosition": "left", - "positionAbsolute": { "x": -350, "y": -250 } + "positionAbsolute": { + "x": 1110, + "y": 550 + } }, { - "id": "4fd78512-3896-472c-be32-a303f915a0ae", + "id": "1dd178ef-10b5-48a5-ad22-fbf83468c06f", "data": { - "name": "duckdb_to_database", - "description": "Describe the task of node python_node_7", - "python_code": "import time\nfrom _shared_flow_utils.types import SupportedDatabaseDialects\nfrom _shared_flow_utils.dao.DBDao import DBDao\n\ndef create_table(conn, sql_file, schema_name=None):\n with open(sql_file, 'r') as file:\n sql_qry = file.read()\n if \"@schema_name\" in sql_qry:\n sql_qry = sql_qry.replace('@schema_name', schema_name)\n conn.execute(sql_qry)\n\ndef export_data(duckdb_file_name, schema_name, to_dbdao, overwrite_schema, chunk_size, database_ddl):\n logger = get_run_logger()\n db_credentials = to_dbdao.tenant_configs\n dialect = to_dbdao.dialect\n if to_dbdao.check_schema_exists(schema_name):\n if overwrite_schema:\n to_dbdao.drop_schema(schema_name, cascade=True)\n else:\n logger.error(f\"Schema '{schema_name}'exist! To overwrite the existing schema, set 'Overwrite Schema' to True\")\n raise ValueError()\n to_dbdao.create_schema(schema_name)\n match dialect:\n case SupportedDatabaseDialects.POSTGRES:\n attach_qry = f\"\"\"ATTACH 'host={db_credentials.host} port={db_credentials.port} dbname={db_credentials.databaseName} \n user={db_credentials.adminUser} password={db_credentials.adminPassword.get_secret_value()}' \n AS pg (TYPE POSTGRES, SCHEMA {schema_name});\n \"\"\"\n # Attach Posgres Database\n with duckdb.connect(duckdb_file_name) as conn:\n conn.execute(attach_qry)\n # Creat schema and tables in postgres database\n create_table(conn, database_ddl, schema_name=schema_name)\n tables = conn.execute(f\"SELECT table_name FROM duckdb_tables() WHERE (database_name = 'pg')\").fetchall()\n tables = [x[0] for x in tables]\n for table in tables:\n conn.execute(f\"\"\"\n INSERT INTO pg.{schema_name}.{table}\n SELECT * FROM cdm.{table}; \n \"\"\")\n conn.execute(\"DETACH pg;\")\n\n case SupportedDatabaseDialects.HANA:\n create_sqls = open(database_ddl).read().replace('@schema_name', schema_name).split(';')[:-1]\n for create_sql in create_sqls:\n with to_dbdao.engine.connect() as hana_conn:\n hana_conn.execute(text(create_sql))\n hana_conn.commit()\n tables = to_dbdao.get_table_names(schema=schema_name)\n for table in tables:\n tmp = 0\n for chunk, percent in read_table_chunks(duckdb_file_name, table, chunk_size=chunk_size): \n if percent != tmp: \n flag = True\n tmp = percent \n else:\n flag = False \n if not chunk.empty:\n insert_to_hana_direct(to_dbdao, chunk, schema_name, table)\n if flag: \n logger.info(f\"{int(percent)}% of table '{table}' is inserted\")\n logger.info(f\"100% of table '{table}' is inserted\")\n\n\ndef read_table_chunks(duckdb_file_name, table, chunk_size):\n with duckdb.connect(duckdb_file_name) as conn:\n count = conn.execute(f\"SELECT COUNT(*) FROM cdm.{table}\").fetchone()[0]\n for offset in range(0, count, chunk_size):\n chunk = conn.execute(f\"\"\"\n SELECT * FROM cdm.{table}\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\").df()\n percent = (offset/count * 100)//10 * 10\n yield chunk, percent\n\ndef insert_to_hana_direct(to_dbdao, chunk, schema_name, table):\n with to_dbdao.engine.connect() as hana_conn:\n # Use Upper case for HANA\n chunk.columns = chunk.columns.str.upper()\n # Replace np.nan with None\n chunk.replace([np.nan], [None], inplace=True)\n columns = chunk.columns.tolist()\n columns_str = ', '.join(f'\"{col}\"' for col in columns)\n placeholders = ','.join(f':{col}' for col in columns)\n insert_stmt = f'INSERT INTO {schema_name}.{table} ({columns_str}) VALUES ({placeholders})'\n data = chunk.to_dict('records')\n hana_conn.execute(text(insert_stmt), data)\n hana_conn.commit()\n to_dbdao.engine.dispose()\n \ndef exec(myinput):\n logger = get_run_logger()\n\n to_dbdao = DBDao(use_cache_db=eval(use_cache_db), database_code=database_code)\n dialect = to_dbdao.dialect\n database_ddl = PostgresDDL if SupportedDatabaseDialects.POSTGRES else HANADDL\n if flow_action_type in ['mimic_to_database','duckdb_to_database']:\n logger.info(\"<--------- Exporting CDM tables to Database --------->\")\n export_data(duckdb_file_name=duckdb_file_name, schema_name=schema_name, to_dbdao=to_dbdao, overwrite_schema=eval(overwrite_schema), chunk_size=int(chunk_size), database_ddl=database_ddl)" + "name": "cleanup", + "description": "Describe the task of node python_node_6", + "python_code": "def exec(myinput):\n # time.sleep(600)\n logger = get_run_logger()\n if flow_action_type in ['mimic_to_database']:\n logger.info(\"<--------- Cleaning up DuckDB file --------->\")\n # Get options\n os.remove(duckdb_file_name)\n logger.info(f\"File '{duckdb_file_name}' deleted successfully.\")\n logger.info(\"<--------- Workflow complete --------->\")\n" }, "type": "python_node", "width": 354, "height": 214, "dragging": false, - "position": { "x": -760, "y": -340 }, + "position": { + "x": 630, + "y": 550 + }, "selected": false, "dragHandle": "", "sourcePosition": "right", "targetPosition": "left", - "positionAbsolute": { "x": -760, "y": -340 } + "positionAbsolute": { + "x": 630, + "y": 550 + } } ], "variables": [ @@ -276,15 +266,42 @@ "key": "duckdb_file_name", "value": "/app/mimic_omop/mimic/mimic_omop_duckdb" }, - { "key": "mimic_dir", "value": "/app/mimic_omop/mimic" }, - { "key": "vocab_dir", "value": "/app/mimic_omop/vocab" }, - { "key": "load_mimic_vocab", "value": "True" }, - { "key": "database_code", "value": "alpdev_pg" }, - { "key": "schema_name", "value": "dataflow_ui_mimic" }, - { "key": "overwrite_schema", "value": "True" }, - { "key": "use_cache_db", "value": "False" }, - { "key": "chunk_size", "value": "10000" }, - { "key": "flow_action_type", "value": "mimic_to_database" } + { + "key": "mimic_dir", + "value": "/app/mimic_omop/mimic" + }, + { + "key": "vocab_dir", + "value": "/app/mimic_omop/vocab" + }, + { + "key": "load_mimic_vocab", + "value": "True" + }, + { + "key": "database_code", + "value": "alpdev_pg" + }, + { + "key": "schema_name", + "value": "dataflow_ui_mimic" + }, + { + "key": "overwrite_schema", + "value": "True" + }, + { + "key": "use_cache_db", + "value": "False" + }, + { + "key": "chunk_size", + "value": "10000" + }, + { + "key": "flow_action_type", + "value": "mimic_to_database" + } ], "importLibs": [ "from prefect.logging import get_run_logger",