-
Notifications
You must be signed in to change notification settings - Fork 4
Description
DuckDb supports inserts of data into it as a staging platform prior with data then being copied to a remote postgres server efficiently. Currently, we use Java/Python/Raw SQL inserts which naturally hits issues when results are buggy due to some error causing them to be incomplete and/or messy.
Having a copy from an existing relational schema is appealing and this duckdb file can optionally be shared by sites directly, in addition to CSV files. However, if only CSV files are shared this would not be an issue for a study co-ordinator as they can create an indiviudal site's results schema from the DuckDb database.
Advantages:
- DuckDb is already a hard requirement of Andromeda/HADES
- This automatically creates a local results model during study execution
- Data integrity errors can be found prior to insert into the main schema
- Certainly much faster at inserting that raw insert queries, potentially faster than using postgres COPY because data can be streamed directly from DuckDb
- Allows a convenient method for sites to explore large data sets in SQL before they transmit results
- Always creates a results schema that enforces referential integrity of results (meaning that we never have to do any annoying data wrangling)
- Because constraints are enforced at the DuckDb layer we can switch of constraints on postgres and rapidly insert large data sets
- Packages that work in an incremental/exploratory manner can also benefit from using this approach (e.g. CohortDiagnostics, where you want to play with different potential candidate phenotypes).
- Migrations of results data can happen at the DuckDb local level and not in a main results schema
Disadvantages:
- Arguably adds complexity and a hard duckdb dependency to the HADES stack (in a new way that may be hard to revert/move away from)
- Means that the CSV approach is no longer used (though this policy can be adjusted)
- Could be challenging for sites to debug if something goes wrong (compared to just fixing a CSV file)
- Packages will need to implement a duckdb schema creation script
- If the DuckDb schema and the postgres results schema don't align then the insert will fail in a way that will probably be confusing and painful for users
Having two modes of operation for export is fine.
Scenario 1: ZIP of CSVs Scenario 2: DuckDB file
----------------------------------- -----------------------------------
Sites produce results CSVs Sites produce DuckDB file
│ │
ZIP file shared File shared with central
│ │
Load into DuckDB via read_csv_auto() Attach site DuckDB files
│ │
Merge & deduplicate across sites Merge & deduplicate across sites
│ │
Transparency: HIGH Transparency: MEDIUM
Load speed: MEDIUM Load speed: HIGH
└───────────────────────────────────────────────────┘
Single bulk insert into Postgres
Implementation
The chatbot suggests this implementation for inserts using just DuckDb/DBI:
#' Copy data from a DuckDB table into an existing Postgres table
#'
#' Uses DuckDB's native Postgres connector to stream data
#' directly from a DuckDB database file into an existing Postgres table,
#' without loading the data into R memory.
#'
#' @param duckdbPath Path to the DuckDB database file.
#' @param postgresConnectionString PostgreSQL connection string:
#' "postgresql://user:password@host:port/dbname"
#' @param duckdbTable Name of the table in DuckDB to copy from.
#' @param postgresSchema Schema in the Postgres database containing the target table.
#' @param postgresTable Name of the target table in Postgres.
#' @param verbose Logical; if TRUE, prints progress messages.
#'
#' @return Invisibly returns TRUE if copy completed successfully, otherwise errors.
#' @export
#'
#' @examples
#' \dontrun{
#' copyDuckdbToPostgres(
#' duckdbPath = "federated_results.duckdb",
#' postgresConnectionString = "postgresql://user:pass@localhost:5432/results",
#' duckdbTable = "target_comparator_nesting_cohort",
#' postgresSchema = "public",
#' postgresTable = "target_comparator_nesting_cohort"
#' )
#' }
copyDuckdbToPostgres <- function(duckdbPath,
postgresConnectionString,
duckdbTable,
postgresSchema,
postgresTable,
verbose = TRUE) {
if (verbose) message("Connecting to DuckDB: ", duckdbPath)
con <- DBI::dbConnect(duckdb::duckdb(), duckdbPath, read_only = FALSE)
on.exit({
try(DBI::dbDisconnect(con, shutdown = TRUE), silent = TRUE)
}, add = TRUE)
# Ensure Postgres extension is installed & loaded in DuckDB
DBI::dbExecute(con, "INSTALL postgres;")
DBI::dbExecute(con, "LOAD postgres;")
if (verbose) message("Attaching Postgres database via DuckDB...")
DBI::dbExecute(
con,
sprintf("ATTACH '%s' AS psql (TYPE postgres);", postgresConnectionString)
)
# Build the SQL for streaming copy
copySql <- sprintf(
"INSERT INTO psql.%s.%s SELECT * FROM main.%s;",
postgresSchema,
postgresTable,
duckdbTable
)
if (verbose) {
message("Copying data from DuckDB table '", duckdbTable,
"' to Postgres table '", postgresSchema, ".", postgresTable, "'...")
}
DBI::dbExecute(con, copySql)
if (verbose) message("✅ Copy complete.")
invisible(TRUE)
}
This is significantly simpler than our current insert approach.
For copying csvs the chatbot implements this:
#' Insert CSV data into an existing DuckDB table
#'
#' Reads one or more CSV files and appends their data into
#' pre-existing tables in a DuckDB database. The table names in DuckDB
#' must match the intended target for each CSV.
#'
#' @param duckdbPath Path to the DuckDB database file.
#' @param csvFiles Character vector of CSV file paths to import.
#' @param tableNames Character vector of target table names in DuckDB
#' (same length as csvFiles). By default, derived from CSV base filenames
#' without extension.
#' @param verbose Logical; if TRUE, prints progress messages.
#'
#' @return Invisibly returns TRUE if all loads succeed, otherwise errors.
#' @export
#'
#' @examples
#' \dontrun{
#' insertCsvToDuckdb(
#' duckdbPath = "staging.duckdb",
#' csvFiles = c("data/target_comparator_nesting_cohort.csv",
#' "data/cohort_method_result.csv")
#' )
#' }
insertCsvToDuckdb <- function(duckdbPath,
csvFiles,
tableNames = tools::file_path_sans_ext(basename(csvFiles)),
verbose = TRUE) {
if (length(csvFiles) != length(tableNames)) {
stop("csvFiles and tableNames must have the same length.")
}
# Connect to the DuckDB database file
if (verbose) message("Connecting to DuckDB: ", duckdbPath)
con <- DBI::dbConnect(duckdb::duckdb(), duckdbPath, read_only = FALSE)
on.exit({
try(DBI::dbDisconnect(con, shutdown = TRUE), silent = TRUE)
}, add = TRUE)
# Loop through and insert each CSV
for (i in seq_along(csvFiles)) {
csv_path <- csvFiles[i]
tbl <- tableNames[i]
if (verbose) message("Loading: ", csv_path, " → ", tbl)
# Ensure table exists in DuckDB
if (!(tbl %in% DBI::dbListTables(con))) {
stop("Table '", tbl, "' does not exist in DuckDB. Please create it before inserting.")
}
# Insert data directly from CSV without reading into R
insert_sql <- sprintf(
"INSERT INTO \"%s\" SELECT * FROM read_csv_auto('%s');",
tbl, normalizePath(csv_path, winslash = "/")
)
DBI::dbExecute(con, insert_sql)
if (verbose) message("Inserted data from ", csv_path, " into ", tbl)
}
if (verbose) message("✅ All CSV files loaded into DuckDB successfully.")
invisible(TRUE)
}
Similarly, a function to insert all the tables from a given path into the spec:
#' Insert CSV files from a directory into an existing DuckDB schema
#'
#' Uses a results model specification to locate matching CSV files in a directory
#' and append their contents into the corresponding DuckDB tables. The DuckDB
#' tables must already exist and have schemas matching the OHDSI specification.
#'
#' @param duckdbPath Path to the DuckDB database file.
#' @param resultsSpecDf A data frame of the results data model specification.
#' Must have column `tableName`.
#' @param csvDir Directory containing the CSV files. File names are expected to
#' match the `tableName` from the specification, with a `.csv` extension.
#' @param verbose Logical; If TRUE, prints progress messages.
#'
#' @return Invisibly returns TRUE on success.
#' @export
#'
#' @examples
#' \dontrun{
#' specDf <- read.csv("resultsDataModelSpecification.csv", stringsAsFactors = FALSE)
#' insertResultsCsvFromDirToDuckdb(
#' duckdbPath = "central_staging.duckdb",
#' resultsSpecDf = specDf,
#' csvDir = "unzipped_site_data"
#' )
#' }
insertResultsCsvFromDirToDuckdb <- function(duckdbPath,
resultsSpecDf,
csvDir,
verbose = TRUE) {
if (!requireNamespace("duckdb", quietly = TRUE)) {
stop("The 'duckdb' package is required but not installed.")
}
if (!requireNamespace("DBI", quietly = TRUE)) {
stop("The 'DBI' package is required but not installed.")
}
if (!"tableName" %in% colnames(resultsSpecDf)) {
stop("Specification data frame must contain a 'tableName' column.")
}
# Normalize directory path
csvDir <- normalizePath(csvDir, winslash = "/", mustWork = TRUE)
# Connect to DuckDB
if (verbose) message("Connecting to DuckDB: ", duckdbPath)
con <- DBI::dbConnect(duckdb::duckdb(), duckdbPath, read_only = FALSE)
on.exit({
try(DBI::dbDisconnect(con, shutdown = TRUE), silent = TRUE)
}, add = TRUE)
# For each table in spec
for (tbl in unique(resultsSpecDf$tableName)) {
csvPath <- file.path(csvDir, paste0(tbl, ".csv"))
if (!file.exists(csvPath)) {
if (verbose) message("Skipping table '", tbl, "' — no CSV found.")
next
}
if (!(tbl %in% DBI::dbListTables(con))) {
stop("Table '", tbl, "' does not exist in DuckDB. Please create it before inserting.")
}
if (verbose) message("Inserting ", csvPath, " into table '", tbl, "'.")
# Append data directly inside DuckDB — no R memory overhead
insertSql <- sprintf(
"INSERT INTO \"%s\" SELECT * FROM read_csv_auto('%s');",
tbl, normalizePath(csvPath, winslash = "/")
)
DBI::dbExecute(con, insertSql)
if (verbose) message("✅ Finished inserting into '", tbl, "'.")
}
if (verbose) message("All matching CSV files inserted.")
invisible(TRUE)
}
This removes all the headaches of type checking prior to inserting data, which removes a significant amount of code from this package.