From 7d53e054ff914a50bf6b984b1db1fafc0b272166 Mon Sep 17 00:00:00 2001 From: Chaoran Chen Date: Tue, 5 Aug 2025 16:35:58 +0200 Subject: [PATCH 1/2] feat: adapt output format to be compatible with SILO 0.8.0 --- .../ingest/proc/ExportSiloNdjson.kt | 54 +++++++++++++++++++ .../ingest/proc/ExtractAddedOrChanged.kt | 7 +-- .../ingest/proc/ExtractUnchanged.kt | 7 +-- .../org/genspectrum/ingest/silo/SiloEntry.kt | 8 +++ .../ingest/workflow/SC2GisaidWorkflow.kt | 21 ++++++-- .../workflow/SC2NextstrainOpenWorkflow.kt | 13 ++++- 6 files changed, 98 insertions(+), 12 deletions(-) create mode 100644 src/main/kotlin/org/genspectrum/ingest/proc/ExportSiloNdjson.kt create mode 100644 src/main/kotlin/org/genspectrum/ingest/silo/SiloEntry.kt diff --git a/src/main/kotlin/org/genspectrum/ingest/proc/ExportSiloNdjson.kt b/src/main/kotlin/org/genspectrum/ingest/proc/ExportSiloNdjson.kt new file mode 100644 index 0000000..b8f761e --- /dev/null +++ b/src/main/kotlin/org/genspectrum/ingest/proc/ExportSiloNdjson.kt @@ -0,0 +1,54 @@ +package org.genspectrum.ingest.proc + +import org.genspectrum.ingest.entry.MutableEntry +import org.genspectrum.ingest.file.Compression +import org.genspectrum.ingest.file.File +import org.genspectrum.ingest.file.FileType +import org.genspectrum.ingest.silo.SiloEntry +import org.genspectrum.ingest.silo.SiloEntryAlignedSequence +import org.genspectrum.ingest.util.readFile +import org.genspectrum.ingest.util.readNdjson +import org.genspectrum.ingest.util.writeFile +import org.genspectrum.ingest.util.writeNdjson +import java.nio.file.Path + +fun exportSiloNdjson(inputMutableEntryFile: File, outputSiloNdjsonPath: Path): File { + require(inputMutableEntryFile.type == FileType.NDJSON) + val outputFile = File( + inputMutableEntryFile.name, + outputSiloNdjsonPath, + inputMutableEntryFile.sorted, + FileType.NDJSON, + Compression.ZSTD + ) + + val reader = readNdjson(readFile(inputMutableEntryFile.path)) + val writer = writeNdjson(writeFile(outputFile.path)) + for (entry in reader) { + val siloEntry = entry.metadata + + transformUnalignedSequences(entry.unalignedNucleotideSequences) + + transformAlignedSequences(entry.alignedNucleotideSequences, entry.nucleotideInsertions) + + transformAlignedSequences(entry.alignedAminoAcidSequences, entry.aminoAcidInsertions) + writer.write(siloEntry) + } + writer.close() + + return outputFile +} + +private fun transformUnalignedSequences(sequences: Map): Map { + return sequences.map { (segment, sequence) -> "unaligned_$segment" to sequence }.toMap() +} + +private fun transformAlignedSequences( + sequences: Map, + insertions: Map> +): Map { + val transformed = mutableMapOf() + for ((segment, sequence) in sequences) { + if (sequence != null) { + transformed[segment] = SiloEntryAlignedSequence(sequence, insertions[segment] ?: emptyList()) + } + } + return transformed +} diff --git a/src/main/kotlin/org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt b/src/main/kotlin/org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt index 73777d6..a29f36f 100644 --- a/src/main/kotlin/org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt +++ b/src/main/kotlin/org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt @@ -5,6 +5,7 @@ import org.genspectrum.ingest.entry.MutableEntry import org.genspectrum.ingest.file.Compression import org.genspectrum.ingest.file.File import org.genspectrum.ingest.file.FileType +import org.genspectrum.ingest.silo.SiloEntry import org.genspectrum.ingest.util.readFile import org.genspectrum.ingest.util.readNdjson import org.genspectrum.ingest.util.writeFile @@ -24,10 +25,10 @@ fun extractAddedOrChanged( .parseObject() val addedOrChanged = (changeComparison.added + changeComparison.changed).toSet() - val reader = readNdjson(readFile(inputFile.path)) - val writer = writeNdjson(writeFile(outputFile.path)) + val reader = readNdjson(readFile(inputFile.path)) + val writer = writeNdjson(writeFile(outputFile.path)) for (entry in reader) { - if (addedOrChanged.contains(entry.metadata[idColumn])) { + if (addedOrChanged.contains(entry[idColumn])) { writer.write(entry) } } diff --git a/src/main/kotlin/org/genspectrum/ingest/proc/ExtractUnchanged.kt b/src/main/kotlin/org/genspectrum/ingest/proc/ExtractUnchanged.kt index 5f0a120..2000c5e 100644 --- a/src/main/kotlin/org/genspectrum/ingest/proc/ExtractUnchanged.kt +++ b/src/main/kotlin/org/genspectrum/ingest/proc/ExtractUnchanged.kt @@ -5,6 +5,7 @@ import org.genspectrum.ingest.entry.MutableEntry import org.genspectrum.ingest.file.Compression import org.genspectrum.ingest.file.File import org.genspectrum.ingest.file.FileType +import org.genspectrum.ingest.silo.SiloEntry import org.genspectrum.ingest.util.readFile import org.genspectrum.ingest.util.readNdjson import org.genspectrum.ingest.util.writeFile @@ -24,10 +25,10 @@ fun extractUnchanged( .parseObject() val unchanged = changeComparison.unchanged.toSet() - val reader = readNdjson(readFile(inputFile.path)) - val writer = writeNdjson(writeFile(outputFile.path)) + val reader = readNdjson(readFile(inputFile.path)) + val writer = writeNdjson(writeFile(outputFile.path)) for (entry in reader) { - if (unchanged.contains(entry.metadata[idColumn])) { + if (unchanged.contains(entry[idColumn])) { writer.write(entry) } } diff --git a/src/main/kotlin/org/genspectrum/ingest/silo/SiloEntry.kt b/src/main/kotlin/org/genspectrum/ingest/silo/SiloEntry.kt new file mode 100644 index 0000000..9fa39d3 --- /dev/null +++ b/src/main/kotlin/org/genspectrum/ingest/silo/SiloEntry.kt @@ -0,0 +1,8 @@ +package org.genspectrum.ingest.silo + +typealias SiloEntry = Map + +data class SiloEntryAlignedSequence( + val sequence: String, + val insertions: List, +) diff --git a/src/main/kotlin/org/genspectrum/ingest/workflow/SC2GisaidWorkflow.kt b/src/main/kotlin/org/genspectrum/ingest/workflow/SC2GisaidWorkflow.kt index a22c0f0..35144b1 100644 --- a/src/main/kotlin/org/genspectrum/ingest/workflow/SC2GisaidWorkflow.kt +++ b/src/main/kotlin/org/genspectrum/ingest/workflow/SC2GisaidWorkflow.kt @@ -12,6 +12,7 @@ import org.genspectrum.ingest.file.FileType import org.genspectrum.ingest.proc.HashEntry import org.genspectrum.ingest.proc.compareHashes import org.genspectrum.ingest.proc.concatFiles +import org.genspectrum.ingest.proc.exportSiloNdjson import org.genspectrum.ingest.proc.extractAddedOrChanged import org.genspectrum.ingest.proc.extractUnchanged import org.genspectrum.ingest.proc.fastaToNdjson @@ -98,18 +99,24 @@ fun runSC2GisaidWorkflow( println("${LocalDateTime.now()}: Finished joinFiles") - val unchangedPath = workdir.resolve("08_unchanged") + val siloPath = workdir.resolve("08_silo_ndjson") + Files.createDirectories(siloPath) + val siloFile = exportSiloNdjsonFile(joinedFilePath, siloPath) + + println("${LocalDateTime.now()}: Finished exportSiloNdjsonFile") + + val unchangedPath = workdir.resolve("09_unchanged") Files.createDirectories(unchangedPath) val unchangedFilePath = extractUnchangedEntries(unchangedPath, previousProcessed, comparisonFilePath) println("${LocalDateTime.now()}: Finished extractUnchangedEntries") - val unchangedAndNewPath = workdir.resolve("09_unchanged_and_new") + val unchangedAndNewPath = workdir.resolve("10_unchanged_and_new") Files.createDirectories(unchangedAndNewPath) - val unchangedAndNewFilePath = mergeUnchangedAndNew( + val unchangedAndNewFile = mergeUnchangedAndNew( outputDirectory = unchangedAndNewPath, unchangedFilePath = unchangedFilePath, - joinedFilePath = joinedFilePath + joinedFilePath = siloFile ) val allPangoLineagesFile = mergePangoLineageFiles( outputDirectory = unchangedAndNewPath, @@ -123,7 +130,7 @@ fun runSC2GisaidWorkflow( Files.createDirectories(finalDestinationPath) val (finalHashesFile, finalProvisionFile) = moveFinalFiles( hashesFile = hashesFile, - provisionFile = unchangedAndNewFilePath, + provisionFile = unchangedAndNewFile, allPangoLineagesFile = allPangoLineagesFile, directoryPath = finalDestinationPath ) @@ -309,6 +316,10 @@ fun extractUnchangedEntries(unchangedPath: Path, previousProcessed: File, compar return extractUnchanged("gisaidEpiIsl", comparisonFilePath, previousProcessed, unchangedPath) } +private fun exportSiloNdjsonFile(provisionFile: File, siloNdjsonPath: Path): File { + return exportSiloNdjson(provisionFile, siloNdjsonPath) +} + fun mergeUnchangedAndNew(outputDirectory: Path, unchangedFilePath: File, joinedFilePath: File): File { val outputFile = File("merged", outputDirectory, false, FileType.NDJSON, Compression.ZSTD) concatFiles(arrayOf(unchangedFilePath.path, joinedFilePath.path), outputFile.path) diff --git a/src/main/kotlin/org/genspectrum/ingest/workflow/SC2NextstrainOpenWorkflow.kt b/src/main/kotlin/org/genspectrum/ingest/workflow/SC2NextstrainOpenWorkflow.kt index 425f983..2ee930a 100644 --- a/src/main/kotlin/org/genspectrum/ingest/workflow/SC2NextstrainOpenWorkflow.kt +++ b/src/main/kotlin/org/genspectrum/ingest/workflow/SC2NextstrainOpenWorkflow.kt @@ -4,6 +4,7 @@ import org.genspectrum.ingest.file.AllPangoLineagesFile import org.genspectrum.ingest.file.Compression import org.genspectrum.ingest.file.File import org.genspectrum.ingest.file.FileType +import org.genspectrum.ingest.proc.exportSiloNdjson import org.genspectrum.ingest.proc.fastaToNdjson import org.genspectrum.ingest.proc.joinSC2NextstrainOpenData import org.genspectrum.ingest.proc.renameFile @@ -44,10 +45,16 @@ fun runSC2NextstrainOpenWorkflow(workdir: Path) { println("${LocalDateTime.now()}: Finished joinFiles") + val siloPath = workdir.resolve("05_silo_ndjson") + Files.createDirectories(siloPath) + val siloFile = exportSiloNdjsonFile(joinedFile, siloPath) + + println("${LocalDateTime.now()}: Finished exportSiloNdjsonFile") + val finalDestinationPath = workdir.resolve("00_archive") Files.createDirectories(finalDestinationPath) val finalProvisionFile = moveFinalFiles( - provisionFile = joinedFile, + provisionFile = siloFile, allPangoLineagesFile = allPangoLineagesFile, directoryPath = finalDestinationPath ) @@ -171,6 +178,10 @@ private fun joinFiles( ) } +private fun exportSiloNdjsonFile(provisionFile: File, siloNdjsonPath: Path): File { + return exportSiloNdjson(provisionFile, siloNdjsonPath) +} + private fun moveFinalFiles(provisionFile: File, allPangoLineagesFile: AllPangoLineagesFile, directoryPath: Path): File { val zoneId = ZoneId.systemDefault() val newDataVersion = Instant.now().atZone(zoneId).toEpochSecond() From 618bf58e6a6385ce3a627e7de99e41785249a8ed Mon Sep 17 00:00:00 2001 From: Chaoran Chen Date: Thu, 14 Aug 2025 13:22:10 +0200 Subject: [PATCH 2/2] Hopefully fix gisaid ingest --- .../org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt b/src/main/kotlin/org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt index a29f36f..945069c 100644 --- a/src/main/kotlin/org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt +++ b/src/main/kotlin/org/genspectrum/ingest/proc/ExtractAddedOrChanged.kt @@ -25,10 +25,10 @@ fun extractAddedOrChanged( .parseObject() val addedOrChanged = (changeComparison.added + changeComparison.changed).toSet() - val reader = readNdjson(readFile(inputFile.path)) - val writer = writeNdjson(writeFile(outputFile.path)) + val reader = readNdjson(readFile(inputFile.path)) + val writer = writeNdjson(writeFile(outputFile.path)) for (entry in reader) { - if (addedOrChanged.contains(entry[idColumn])) { + if (addedOrChanged.contains(entry.metadata[idColumn])) { writer.write(entry) } }