From a3537fdba0c6a9847978addbdf7b8310ede4b64f Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 27 Jan 2025 14:33:07 +0000 Subject: [PATCH 1/4] Nextflow workflow to monitor duplicates in RSIDs --- ...monitor_duplicate_clustering_accessions.nf | 127 ++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 accession-monitoring/monitor_duplicate_clustering_accessions.nf diff --git a/accession-monitoring/monitor_duplicate_clustering_accessions.nf b/accession-monitoring/monitor_duplicate_clustering_accessions.nf new file mode 100644 index 00000000..03675da8 --- /dev/null +++ b/accession-monitoring/monitor_duplicate_clustering_accessions.nf @@ -0,0 +1,127 @@ +#!/usr/bin/env nextflow + +nextflow.enable.dsl=2 + +def helpMessage() { + log.info""" + Extract rsids from the mongo database and check if they have duplicates + + Inputs: + --mongodb_uri Address of the mongodb server where the data can be found. + --output_dir Directory where the list of discovered duplicate will be provided + --assembly_accession Assembly accession use to limit the rs ids tested + --chunk_size number of rsids processed in each chunk during the duplicate detection stage + --email_recipient email address that should be contacted if duplicate RS are detected + """ +} + +params.mongodb_uri = null +params.chunk_size = 1000000 +params.help = null + +// Show help message +if (params.help) exit 0, helpMessage() + +// Test input +if (!params.mongodb_uri ) { + if (!params.mongodb_uri) log.warn('Provide the mongodb uri line using --mongodb_uri') + exit 1, helpMessage() +} + + +process export_mongo_cluster_accessions { + label 'med_time', 'default_mem' + + output: + path "dbsnp_rsid_output_file", optional: true, emit: dbsnp_rs_report_filename + path "eva_rsid_output_file", optional: true, emit: eva_rs_report_filename + + script: + """ + mongoexport --uri $params.mongodb_uri --query '{"asm": "$params.assembly_accession"}' --collection dbsnpClusteredVariantEntity --type=csv --fields accession -o dbsnp_rsid_output_file --noHeaderLine 2>&1 + mongoexport --uri $params.mongodb_uri --query '{"asm": "$params.assembly_accession"}' --collection clusteredVariantEntity --type=csv --fields accession -o eva_rsid_output_file --noHeaderLine 2>&1 + """ +} + + +process sort_unique_split_accessions { + + label 'med_time', 'med_mem' + + input: + path dbsnp_rsid + path eva_rsid + + output: + path "accession_chunk-*", emit: accession_chunk + + script: + """ + set -o pipefail + cat $dbsnp_rsid $eva_rsid | sort -u -T . -S 2G | split -a 5 -d -l $params.chunk_size - accession_chunk- + """ +} + + + +process detect_duplicates_in_chunk { + + label 'default_time', 'med_mem' + + maxForks 10 + + input: + each path(accession_chunk) + + output: + path "accession_chunk-*_duplicates", emit: duplicate_accession_chunk + + script: + def duplicate_accession_chunk = accession_chunk + "_duplicates" + """ + java -Xmx6G -jar $params.clustering_jar --spring.config.location=file:$params.clustering_properties \ + --spring.batch.job.names=DUPLICATE_RS_ACC_QC_JOB --parameters.rsAccFile=$accession_chunk \ + --parameters.duplicateRSAccFile=$duplicate_accession_chunk + """ +} + + +process merge_duplicates_and_notify { + + publishDir "$params.output_dir", overwrite: true, mode: "copy" + + input: + path duplicate_accession_chunks + + script: + """ + TIMESTAMP=`date +\\%Y\\%m\\%d\\%H\\%M\\%S` + cat $duplicate_accession_chunks > rs_duplicate_accession_\$TIMESTAMP.out + NB_DUP=`wc -l email <<- EOF + From: eva-noreply@ebi.ac.uk + To: $params.email_recipient + Subject: \$NB_DUP Duplicates RS accession detected \$TIMESTAMP + During the execution of monitor_duplicate_clustering_accessions.nf on \$TIMESTAMP, + \$NB_DUP were detected + + Find the list of accession in + $params.output_dir/rs_duplicate_accession_\$TIMESTAMP.out + EOF + cat email | sendmail $params.email_recipient + fi + """ + +} + +workflow { + main: + export_mongo_cluster_accessions() + sort_unique_split_accessions(export_mongo_cluster_accessions.out.dbsnp_rs_report_filename, export_mongo_cluster_accessions.out.eva_rs_report_filename) + detect_duplicates_in_chunk(sort_unique_split_accessions.out.accession_chunk) + merge_duplicates_and_notify(detect_duplicates_in_chunk.out.duplicate_accession_chunk.collect()) +} + From d2e0e33f35bfe1c4d9de5717afc30ac0c061ec45 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 27 Jan 2025 15:05:47 +0000 Subject: [PATCH 2/4] output duplicate results --- .../monitor_duplicate_clustering_accessions.nf | 3 +++ 1 file changed, 3 insertions(+) diff --git a/accession-monitoring/monitor_duplicate_clustering_accessions.nf b/accession-monitoring/monitor_duplicate_clustering_accessions.nf index 03675da8..96d726a5 100644 --- a/accession-monitoring/monitor_duplicate_clustering_accessions.nf +++ b/accession-monitoring/monitor_duplicate_clustering_accessions.nf @@ -93,6 +93,9 @@ process merge_duplicates_and_notify { input: path duplicate_accession_chunks + output: + path "rs_duplicate_accession_*", emit rs_duplicate_accession + script: """ TIMESTAMP=`date +\\%Y\\%m\\%d\\%H\\%M\\%S` From e06286601b8bc369b29edd551568eca6d43f1e86 Mon Sep 17 00:00:00 2001 From: tcezard Date: Tue, 28 Jan 2025 15:10:38 +0000 Subject: [PATCH 3/4] Make the query optional --- .../monitor_duplicate_clustering_accessions.nf | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/accession-monitoring/monitor_duplicate_clustering_accessions.nf b/accession-monitoring/monitor_duplicate_clustering_accessions.nf index 96d726a5..9300c00c 100644 --- a/accession-monitoring/monitor_duplicate_clustering_accessions.nf +++ b/accession-monitoring/monitor_duplicate_clustering_accessions.nf @@ -9,7 +9,7 @@ def helpMessage() { Inputs: --mongodb_uri Address of the mongodb server where the data can be found. --output_dir Directory where the list of discovered duplicate will be provided - --assembly_accession Assembly accession use to limit the rs ids tested + --assembly_accession Limit the search to rsids associated with specific assembly --chunk_size number of rsids processed in each chunk during the duplicate detection stage --email_recipient email address that should be contacted if duplicate RS are detected """ @@ -25,6 +25,8 @@ if (params.help) exit 0, helpMessage() // Test input if (!params.mongodb_uri ) { if (!params.mongodb_uri) log.warn('Provide the mongodb uri line using --mongodb_uri') + if (!params.output_dir) log.warn('Provide the location for the output file containing the duplicates using --output_dir') + if (!params.email_recipient) log.warn('Provide the email address that should be contacted upon finding duplicates using --email_recipient') exit 1, helpMessage() } @@ -37,9 +39,13 @@ process export_mongo_cluster_accessions { path "eva_rsid_output_file", optional: true, emit: eva_rs_report_filename script: + query = "" + if (params.assembly_accession){ + query = """--query '{"asm": "$params.assembly_accession"}'""" + } """ - mongoexport --uri $params.mongodb_uri --query '{"asm": "$params.assembly_accession"}' --collection dbsnpClusteredVariantEntity --type=csv --fields accession -o dbsnp_rsid_output_file --noHeaderLine 2>&1 - mongoexport --uri $params.mongodb_uri --query '{"asm": "$params.assembly_accession"}' --collection clusteredVariantEntity --type=csv --fields accession -o eva_rsid_output_file --noHeaderLine 2>&1 + mongoexport --uri $params.mongodb_uri $query --collection dbsnpClusteredVariantEntity --type=csv --fields accession -o dbsnp_rsid_output_file --noHeaderLine 2>&1 + mongoexport --uri $params.mongodb_uri $query --collection clusteredVariantEntity --type=csv --fields accession -o eva_rsid_output_file --noHeaderLine 2>&1 """ } From 0f0bbd0ec23a51c8e11c575f0d7fa401a004b535 Mon Sep 17 00:00:00 2001 From: tcezard Date: Tue, 28 Jan 2025 15:23:31 +0000 Subject: [PATCH 4/4] fix emit statement --- accession-monitoring/monitor_duplicate_clustering_accessions.nf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accession-monitoring/monitor_duplicate_clustering_accessions.nf b/accession-monitoring/monitor_duplicate_clustering_accessions.nf index 9300c00c..04515e61 100644 --- a/accession-monitoring/monitor_duplicate_clustering_accessions.nf +++ b/accession-monitoring/monitor_duplicate_clustering_accessions.nf @@ -100,7 +100,7 @@ process merge_duplicates_and_notify { path duplicate_accession_chunks output: - path "rs_duplicate_accession_*", emit rs_duplicate_accession + path "rs_duplicate_accession_*", emit: rs_duplicate_accession script: """