Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/main/kotlin/org/genspectrum/ingest/proc/ExportSiloNdjson.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.genspectrum.ingest.proc

import org.genspectrum.ingest.entry.MutableEntry
import org.genspectrum.ingest.file.Compression
import org.genspectrum.ingest.file.File
import org.genspectrum.ingest.file.FileType
import org.genspectrum.ingest.silo.SiloEntry
import org.genspectrum.ingest.silo.SiloEntryAlignedSequence
import org.genspectrum.ingest.util.readFile
import org.genspectrum.ingest.util.readNdjson
import org.genspectrum.ingest.util.writeFile
import org.genspectrum.ingest.util.writeNdjson
import java.nio.file.Path

fun exportSiloNdjson(inputMutableEntryFile: File, outputSiloNdjsonPath: Path): File {
require(inputMutableEntryFile.type == FileType.NDJSON)
val outputFile = File(
inputMutableEntryFile.name,
outputSiloNdjsonPath,
inputMutableEntryFile.sorted,
FileType.NDJSON,
Compression.ZSTD
)

val reader = readNdjson<MutableEntry>(readFile(inputMutableEntryFile.path))
val writer = writeNdjson<SiloEntry>(writeFile(outputFile.path))
for (entry in reader) {
val siloEntry = entry.metadata +
transformUnalignedSequences(entry.unalignedNucleotideSequences) +
transformAlignedSequences(entry.alignedNucleotideSequences, entry.nucleotideInsertions) +
transformAlignedSequences(entry.alignedAminoAcidSequences, entry.aminoAcidInsertions)
writer.write(siloEntry)
}
writer.close()

return outputFile
}

private fun transformUnalignedSequences(sequences: Map<String, String?>): Map<String, String?> {
return sequences.map { (segment, sequence) -> "unaligned_$segment" to sequence }.toMap()
}

private fun transformAlignedSequences(
sequences: Map<String, String?>,
insertions: Map<String, List<String>>
): Map<String, SiloEntryAlignedSequence> {
val transformed = mutableMapOf<String, SiloEntryAlignedSequence>()
for ((segment, sequence) in sequences) {
if (sequence != null) {
transformed[segment] = SiloEntryAlignedSequence(sequence, insertions[segment] ?: emptyList())
}
}
return transformed
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.genspectrum.ingest.entry.MutableEntry
import org.genspectrum.ingest.file.Compression
import org.genspectrum.ingest.file.File
import org.genspectrum.ingest.file.FileType
import org.genspectrum.ingest.silo.SiloEntry
import org.genspectrum.ingest.util.readFile
import org.genspectrum.ingest.util.readNdjson
import org.genspectrum.ingest.util.writeFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.genspectrum.ingest.entry.MutableEntry
import org.genspectrum.ingest.file.Compression
import org.genspectrum.ingest.file.File
import org.genspectrum.ingest.file.FileType
import org.genspectrum.ingest.silo.SiloEntry
import org.genspectrum.ingest.util.readFile
import org.genspectrum.ingest.util.readNdjson
import org.genspectrum.ingest.util.writeFile
Expand All @@ -24,10 +25,10 @@ fun extractUnchanged(
.parseObject<ComparisonResult>()
val unchanged = changeComparison.unchanged.toSet()

val reader = readNdjson<MutableEntry>(readFile(inputFile.path))
val writer = writeNdjson<MutableEntry>(writeFile(outputFile.path))
val reader = readNdjson<SiloEntry>(readFile(inputFile.path))
val writer = writeNdjson<SiloEntry>(writeFile(outputFile.path))
for (entry in reader) {
if (unchanged.contains(entry.metadata[idColumn])) {
if (unchanged.contains(entry[idColumn])) {
writer.write(entry)
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/kotlin/org/genspectrum/ingest/silo/SiloEntry.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.genspectrum.ingest.silo

typealias SiloEntry = Map<String, Any?>

data class SiloEntryAlignedSequence(
val sequence: String,
val insertions: List<String>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.genspectrum.ingest.file.FileType
import org.genspectrum.ingest.proc.HashEntry
import org.genspectrum.ingest.proc.compareHashes
import org.genspectrum.ingest.proc.concatFiles
import org.genspectrum.ingest.proc.exportSiloNdjson
import org.genspectrum.ingest.proc.extractAddedOrChanged
import org.genspectrum.ingest.proc.extractUnchanged
import org.genspectrum.ingest.proc.fastaToNdjson
Expand Down Expand Up @@ -98,18 +99,24 @@ fun runSC2GisaidWorkflow(

println("${LocalDateTime.now()}: Finished joinFiles")

val unchangedPath = workdir.resolve("08_unchanged")
val siloPath = workdir.resolve("08_silo_ndjson")
Files.createDirectories(siloPath)
val siloFile = exportSiloNdjsonFile(joinedFilePath, siloPath)

println("${LocalDateTime.now()}: Finished exportSiloNdjsonFile")

val unchangedPath = workdir.resolve("09_unchanged")
Files.createDirectories(unchangedPath)
val unchangedFilePath = extractUnchangedEntries(unchangedPath, previousProcessed, comparisonFilePath)

println("${LocalDateTime.now()}: Finished extractUnchangedEntries")

val unchangedAndNewPath = workdir.resolve("09_unchanged_and_new")
val unchangedAndNewPath = workdir.resolve("10_unchanged_and_new")
Files.createDirectories(unchangedAndNewPath)
val unchangedAndNewFilePath = mergeUnchangedAndNew(
val unchangedAndNewFile = mergeUnchangedAndNew(
outputDirectory = unchangedAndNewPath,
unchangedFilePath = unchangedFilePath,
joinedFilePath = joinedFilePath
joinedFilePath = siloFile
)
val allPangoLineagesFile = mergePangoLineageFiles(
outputDirectory = unchangedAndNewPath,
Expand All @@ -123,7 +130,7 @@ fun runSC2GisaidWorkflow(
Files.createDirectories(finalDestinationPath)
val (finalHashesFile, finalProvisionFile) = moveFinalFiles(
hashesFile = hashesFile,
provisionFile = unchangedAndNewFilePath,
provisionFile = unchangedAndNewFile,
allPangoLineagesFile = allPangoLineagesFile,
directoryPath = finalDestinationPath
)
Expand Down Expand Up @@ -309,6 +316,10 @@ fun extractUnchangedEntries(unchangedPath: Path, previousProcessed: File, compar
return extractUnchanged("gisaidEpiIsl", comparisonFilePath, previousProcessed, unchangedPath)
}

private fun exportSiloNdjsonFile(provisionFile: File, siloNdjsonPath: Path): File {
return exportSiloNdjson(provisionFile, siloNdjsonPath)
}

fun mergeUnchangedAndNew(outputDirectory: Path, unchangedFilePath: File, joinedFilePath: File): File {
val outputFile = File("merged", outputDirectory, false, FileType.NDJSON, Compression.ZSTD)
concatFiles(arrayOf(unchangedFilePath.path, joinedFilePath.path), outputFile.path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.genspectrum.ingest.file.AllPangoLineagesFile
import org.genspectrum.ingest.file.Compression
import org.genspectrum.ingest.file.File
import org.genspectrum.ingest.file.FileType
import org.genspectrum.ingest.proc.exportSiloNdjson
import org.genspectrum.ingest.proc.fastaToNdjson
import org.genspectrum.ingest.proc.joinSC2NextstrainOpenData
import org.genspectrum.ingest.proc.renameFile
Expand Down Expand Up @@ -44,10 +45,16 @@ fun runSC2NextstrainOpenWorkflow(workdir: Path) {

println("${LocalDateTime.now()}: Finished joinFiles")

val siloPath = workdir.resolve("05_silo_ndjson")
Files.createDirectories(siloPath)
val siloFile = exportSiloNdjsonFile(joinedFile, siloPath)

println("${LocalDateTime.now()}: Finished exportSiloNdjsonFile")

val finalDestinationPath = workdir.resolve("00_archive")
Files.createDirectories(finalDestinationPath)
val finalProvisionFile = moveFinalFiles(
provisionFile = joinedFile,
provisionFile = siloFile,
allPangoLineagesFile = allPangoLineagesFile,
directoryPath = finalDestinationPath
)
Expand Down Expand Up @@ -171,6 +178,10 @@ private fun joinFiles(
)
}

private fun exportSiloNdjsonFile(provisionFile: File, siloNdjsonPath: Path): File {
return exportSiloNdjson(provisionFile, siloNdjsonPath)
}

private fun moveFinalFiles(provisionFile: File, allPangoLineagesFile: AllPangoLineagesFile, directoryPath: Path): File {
val zoneId = ZoneId.systemDefault()
val newDataVersion = Instant.now().atZone(zoneId).toEpochSecond()
Expand Down