From 9170850c380c33b8f86c1bf5adcba32e4b21f75d Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Mon, 9 Feb 2026 10:59:08 +0100 Subject: [PATCH 01/10] Sync metadata --- dataflow_transfer/dataflow_transfer.py | 5 ++ dataflow_transfer/run_classes/generic_runs.py | 82 +++++++++++++++---- dataflow_transfer/utils/filesystem.py | 6 +- 3 files changed, 75 insertions(+), 18 deletions(-) diff --git a/dataflow_transfer/dataflow_transfer.py b/dataflow_transfer/dataflow_transfer.py index ebbae9c..575c05f 100644 --- a/dataflow_transfer/dataflow_transfer.py +++ b/dataflow_transfer/dataflow_transfer.py @@ -34,6 +34,11 @@ def process_run(run_dir, sequencer, config): run.start_transfer(final=False) return + ## Sequencing finished. Copy metadata in the background if not already done. + if run.has_status("sequencing_finished"): + if not run.metadata_synced: + run.sync_metadata() + ## Sequencing finished but transfer not complete. Start final transfer. if not run.final_sync_successful: if run.has_status("sequencing_finished"): diff --git a/dataflow_transfer/run_classes/generic_runs.py b/dataflow_transfer/run_classes/generic_runs.py index d76d0b4..1442fa4 100644 --- a/dataflow_transfer/run_classes/generic_runs.py +++ b/dataflow_transfer/run_classes/generic_runs.py @@ -21,6 +21,14 @@ def __init__(self, run_dir, configuration): ) self.final_file = "" self.transfer_details = self.configuration.get("transfer_details", {}) + self.metadata_rsync_exitcode_file = os.path.join( + self.run_dir, ".metadata_rsync_exitcode" + ) + self.metadata_destination = os.path.join( + self.configuration.get("metadata_archive"), + getattr(self, "run_type", None), + self.run_id, + ) self.final_rsync_exitcode_file = os.path.join( self.run_dir, ".final_rsync_exitcode" ) @@ -42,36 +50,80 @@ def sequencing_ongoing(self): return False return True - def generate_rsync_command(self, is_final_sync=False): - """Generate an rsync command string.""" - destination = ( - self.transfer_details.get("user") - + "@" - + self.transfer_details.get("host") - + ":" - + self.miarka_destination + @property + def metadata_synced(self): + """Check if the metadata rsync was successful by reading the exit code file.""" + return fs.check_exit_status(self.metadata_rsync_exitcode_file) + + def sync_metadata(self): + """Start background rsync transfer for metadata files.""" + # make metadata destination path if it doesn't exist + if not os.path.exists(self.metadata_destination): + os.makedirs(self.metadata_destination) + metadata_rsync_command = self.generate_rsync_command( + remote=False, with_exit_code_file=True ) + + if fs.rsync_is_running(src=self.run_dir, dst=self.metadata_destination): + logger.info( + f"Metadata rsync is already running for {self.run_dir} to destination {self.metadata_destination}. Skipping background metadata sync initiation." + ) + return + try: + fs.submit_background_process(metadata_rsync_command) + logger.info( + f"{self.run_id}: Started metadata rsync to {self.metadata_destination}" + + f" with the following command: '{metadata_rsync_command}'" + ) + except Exception as e: + logger.error(f"Failed to start metadata rsync for {self.run_id}: {e}") + raise e + + def generate_rsync_command(self, remote=False, with_exit_code_file=False): + """Generate an rsync command string.""" + if remote: + destination = ( + self.transfer_details.get("user") + + "@" + + self.transfer_details.get("host") + + ":" + + self.miarka_destination + ) + log_file_option = "--log-file=" + os.path.join( + self.run_dir, "rsync_remote_log.txt" + ) + rsync_options = self.sequencer_config.get("remote_rsync_options", []) + exit_code_file = self.final_rsync_exitcode_file + else: + destination = self.metadata_destination + log_file_option = "--log-file=" + os.path.join( + self.run_dir, "rsync_metadata_log.txt" + ) + rsync_options = self.sequencer_config.get("metadata_rsync_options", []) + exit_code_file = self.metadata_rsync_exitcode_file run_one_bin = self.configuration.get("run_one_path", "run-one") command = [ run_one_bin, "rsync", "-au", - "--log-file=" + os.path.join(self.run_dir, "rsync_remote_log.txt"), - *(self.sequencer_config.get("rsync_options", [])), + log_file_option, + *(rsync_options), self.run_dir, destination, ] command_str = " ".join(command) - if is_final_sync: - command_str += f"; echo $? > {self.final_rsync_exitcode_file}" + if with_exit_code_file: + command_str += f"; echo $? > {exit_code_file}" return command_str def start_transfer(self, final=False): """Start background rsync transfer to storage.""" - transfer_command = self.generate_rsync_command(is_final_sync=final) - if fs.rsync_is_running(src=self.run_dir): + transfer_command = self.generate_rsync_command( + remote=True, with_exit_code_file=final + ) + if fs.rsync_is_running(src=self.run_dir, dst=self.miarka_destination): logger.info( - f"Rsync is already running for {self.run_dir}. Skipping background transfer initiation." + f"Rsync is already running for {self.run_dir} to destination {self.miarka_destination}. Skipping background transfer initiation." ) return try: diff --git a/dataflow_transfer/utils/filesystem.py b/dataflow_transfer/utils/filesystem.py index a02aadb..016159e 100644 --- a/dataflow_transfer/utils/filesystem.py +++ b/dataflow_transfer/utils/filesystem.py @@ -28,9 +28,9 @@ def find_runs(base_dir, ignore_folders=[]): return runs -def rsync_is_running(src): - """Check if rsync is already running for given src.""" - pattern = f"rsync.*{src}" +def rsync_is_running(src, dst): + """Check if rsync is already running for given src and destination.""" + pattern = f"rsync.*{src}.*{dst}" try: subprocess.check_output(["pgrep", "-f", pattern]) return True From b205f2e045fa1c2610758a5ff1296903fd7a4662 Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Mon, 9 Feb 2026 14:34:30 +0100 Subject: [PATCH 02/10] Correct rsync command --- dataflow_transfer/run_classes/generic_runs.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dataflow_transfer/run_classes/generic_runs.py b/dataflow_transfer/run_classes/generic_runs.py index 1442fa4..d11e94b 100644 --- a/dataflow_transfer/run_classes/generic_runs.py +++ b/dataflow_transfer/run_classes/generic_runs.py @@ -57,9 +57,6 @@ def metadata_synced(self): def sync_metadata(self): """Start background rsync transfer for metadata files.""" - # make metadata destination path if it doesn't exist - if not os.path.exists(self.metadata_destination): - os.makedirs(self.metadata_destination) metadata_rsync_command = self.generate_rsync_command( remote=False, with_exit_code_file=True ) @@ -108,8 +105,8 @@ def generate_rsync_command(self, remote=False, with_exit_code_file=False): "-au", log_file_option, *(rsync_options), - self.run_dir, - destination, + self.run_dir + "/", + destination + "/", ] command_str = " ".join(command) if with_exit_code_file: From 3b8820a54c63be01cfa1327f51b4b94d97e3f42b Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Mon, 9 Feb 2026 14:49:01 +0100 Subject: [PATCH 03/10] set appropriate paths for remote and local rsync --- dataflow_transfer/run_classes/generic_runs.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dataflow_transfer/run_classes/generic_runs.py b/dataflow_transfer/run_classes/generic_runs.py index d11e94b..60d44f4 100644 --- a/dataflow_transfer/run_classes/generic_runs.py +++ b/dataflow_transfer/run_classes/generic_runs.py @@ -79,6 +79,7 @@ def sync_metadata(self): def generate_rsync_command(self, remote=False, with_exit_code_file=False): """Generate an rsync command string.""" if remote: + source = self.run_dir destination = ( self.transfer_details.get("user") + "@" @@ -92,7 +93,8 @@ def generate_rsync_command(self, remote=False, with_exit_code_file=False): rsync_options = self.sequencer_config.get("remote_rsync_options", []) exit_code_file = self.final_rsync_exitcode_file else: - destination = self.metadata_destination + source = self.run_dir + "/" + destination = self.metadata_destination + "/" log_file_option = "--log-file=" + os.path.join( self.run_dir, "rsync_metadata_log.txt" ) @@ -105,8 +107,8 @@ def generate_rsync_command(self, remote=False, with_exit_code_file=False): "-au", log_file_option, *(rsync_options), - self.run_dir + "/", - destination + "/", + source, + destination, ] command_str = " ".join(command) if with_exit_code_file: From 20da27761951e302cd95079fe95b4042073ea53d Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Mon, 9 Feb 2026 15:44:03 +0100 Subject: [PATCH 04/10] Fix/add tests --- dataflow_transfer/tests/test_filesystem.py | 4 +- dataflow_transfer/tests/test_run_classes.py | 127 +++++++++++++++++--- 2 files changed, 110 insertions(+), 21 deletions(-) diff --git a/dataflow_transfer/tests/test_filesystem.py b/dataflow_transfer/tests/test_filesystem.py index 2812b82..463e16e 100644 --- a/dataflow_transfer/tests/test_filesystem.py +++ b/dataflow_transfer/tests/test_filesystem.py @@ -69,12 +69,12 @@ class TestRsyncIsRunning: @patch("subprocess.check_output") def test_rsync_running(self, mock_check_output): mock_check_output.return_value = b"12345" - assert rsync_is_running("/some/path") is True + assert rsync_is_running("/some/path", "/dst/path") is True @patch("subprocess.check_output") def test_rsync_not_running(self, mock_check_output): mock_check_output.side_effect = CalledProcessError(1, "pgrep") - assert rsync_is_running("/some/path") is False + assert rsync_is_running("/some/path", "/dst/path") is False class TestSubmitBackgroundProcess: diff --git a/dataflow_transfer/tests/test_run_classes.py b/dataflow_transfer/tests/test_run_classes.py index 09457fe..72d72cb 100644 --- a/dataflow_transfer/tests/test_run_classes.py +++ b/dataflow_transfer/tests/test_run_classes.py @@ -12,6 +12,7 @@ def novaseqxplus_testobj(tmp_path): config = { "log": {"file": "test.log"}, "transfer_details": {"user": "testuser", "host": "testhost"}, + "metadata_archive": "/data/metadata_archive", "statusdb": { "username": "dbuser", "password": "dbpass", @@ -23,7 +24,12 @@ def novaseqxplus_testobj(tmp_path): "miarka_destination": "/data/NovaSeqXPlus", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], - "rsync_options": ["--chmod=Dg+s,g+rw"], + "remote_rsync_options": ["--chmod=Dg+s,g+rw"], + "metadata_rsync_options": [ + "--include=RunInfo.xml", + "--include=RunParameters.xml", + "--exclude=*", + ], } }, } @@ -38,6 +44,7 @@ def nextseq_testobj(tmp_path): config = { "log": {"file": "test.log"}, "transfer_details": {"user": "testuser", "host": "testhost"}, + "metadata_archive": "/data/metadata_archive", "statusdb": { "username": "dbuser", "password": "dbpass", @@ -49,7 +56,12 @@ def nextseq_testobj(tmp_path): "miarka_destination": "/data/NextSeq", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], - "rsync_options": ["--chmod=Dg+s,g+rw"], + "remote_rsync_options": ["--chmod=Dg+s,g+rw"], + "metadata_rsync_options": [ + "--include=RunInfo.xml", + "--include=RunParameters.xml", + "--exclude=*", + ], } }, } @@ -64,6 +76,7 @@ def miseqseq_testobj(tmp_path): config = { "log": {"file": "test.log"}, "transfer_details": {"user": "testuser", "host": "testhost"}, + "metadata_archive": "/data/metadata_archive", "statusdb": { "username": "dbuser", "password": "dbpass", @@ -75,7 +88,12 @@ def miseqseq_testobj(tmp_path): "miarka_destination": "/data/MiSeq", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], - "rsync_options": ["--chmod=Dg+s,g+rw"], + "remote_rsync_options": ["--chmod=Dg+s,g+rw"], + "metadata_rsync_options": [ + "--include=RunInfo.xml", + "--include=RunParameters.xml", + "--exclude=*", + ], } }, } @@ -90,6 +108,7 @@ def miseqseqi100_testobj(tmp_path): config = { "log": {"file": "test.log"}, "transfer_details": {"user": "testuser", "host": "testhost"}, + "metadata_archive": "/data/metadata_archive", "statusdb": { "username": "dbuser", "password": "dbpass", @@ -101,7 +120,12 @@ def miseqseqi100_testobj(tmp_path): "miarka_destination": "/data/MiSeqi100", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], - "rsync_options": ["--chmod=Dg+s,g+rw"], + "remote_rsync_options": ["--chmod=Dg+s,g+rw"], + "metadata_rsync_options": [ + "--include=RunInfo.xml", + "--include=RunParameters.xml", + "--exclude=*", + ], } }, } @@ -166,27 +190,32 @@ def test_sequencing_ongoing(run_fixture, request): @pytest.mark.parametrize( - "run_fixture, final_sync", + "run_fixture, remote, with_exit_code_file", [ - ("novaseqxplus_testobj", False), - ("novaseqxplus_testobj", True), - ("nextseq_testobj", False), - ("nextseq_testobj", True), - ("miseqseq_testobj", False), - ("miseqseq_testobj", True), - ("miseqseqi100_testobj", False), - ("miseqseqi100_testobj", True), + ("novaseqxplus_testobj", False, True), + ("novaseqxplus_testobj", True, False), + ("novaseqxplus_testobj", True, True), + ("nextseq_testobj", False, True), + ("nextseq_testobj", True, False), + ("nextseq_testobj", True, True), + ("miseqseq_testobj", False, True), + ("miseqseq_testobj", True, False), + ("miseqseq_testobj", True, True), + ("miseqseqi100_testobj", False, True), + ("miseqseqi100_testobj", True, False), + ("miseqseqi100_testobj", True, True), ], ) -def test_generate_rsync_command(run_fixture, final_sync, request): +def test_generate_rsync_command(run_fixture, remote, with_exit_code_file, request): run_obj = request.getfixturevalue(run_fixture) - rsync_command = run_obj.generate_rsync_command(is_final_sync=final_sync) + rsync_command = run_obj.generate_rsync_command( + remote=remote, with_exit_code_file=with_exit_code_file + ) assert "run-one rsync" in rsync_command assert "--log-file=" in rsync_command - assert "--chmod=Dg+s,g+rw" in rsync_command assert run_obj.run_dir in rsync_command - if final_sync: - assert f"; echo $? > {run_obj.final_rsync_exitcode_file}" in rsync_command + if with_exit_code_file: + assert "; echo $? >" in rsync_command @pytest.mark.parametrize( @@ -213,7 +242,7 @@ def test_generate_rsync_command(run_fixture, final_sync, request): def test_start_transfer(run_fixture, rsync_running, final, request, monkeypatch): run_obj = request.getfixturevalue(run_fixture) - def mock_rsync_is_running(src): + def mock_rsync_is_running(src, dst): return rsync_running def mock_submit_background_process(command_str): @@ -270,6 +299,66 @@ def test_final_sync_successful(run_fixture, sync_successful, request): assert run_obj.final_sync_successful == sync_successful +@pytest.mark.parametrize( + "run_fixture, sync_successful", + [ + ("novaseqxplus_testobj", True), + ("novaseqxplus_testobj", False), + ("nextseq_testobj", True), + ("nextseq_testobj", False), + ("miseqseq_testobj", True), + ("miseqseq_testobj", False), + ("miseqseqi100_testobj", True), + ("miseqseqi100_testobj", False), + ], +) +def test_metadata_synced(run_fixture, sync_successful, request): + run_obj = request.getfixturevalue(run_fixture) + if sync_successful: + # Create the final rsync exit code file with a success code + with open(run_obj.metadata_rsync_exitcode_file, "w") as f: + f.write("0") + else: + # Create the final rsync exit code file with a failure code + with open(run_obj.metadata_rsync_exitcode_file, "w") as f: + f.write("1") + assert run_obj.metadata_synced == sync_successful + + +@pytest.mark.parametrize( + "run_fixture", + [ + "novaseqxplus_testobj", + "nextseq_testobj", + "miseqseq_testobj", + "miseqseqi100_testobj", + ], +) +def test_sync_metadata(run_fixture, request, monkeypatch): + run_obj = request.getfixturevalue(run_fixture) + + def mock_generate_rsync_command(remote, with_exit_code_file): + return "rsync command" + + def mock_rsync_is_running(src, dst): + return False + + def mock_submit_background_process(command_str): + mock_submit_background_process.called = True + mock_submit_background_process.command_str = command_str + + monkeypatch.setattr(run_obj, "generate_rsync_command", mock_generate_rsync_command) + monkeypatch.setattr(generic_runs.fs, "rsync_is_running", mock_rsync_is_running) + monkeypatch.setattr( + generic_runs.fs, "submit_background_process", mock_submit_background_process + ) + + run_obj.sync_metadata() + + assert hasattr(mock_submit_background_process, "called") + assert mock_submit_background_process.command_str == "rsync command" + + @pytest.mark.parametrize( "run_fixture, status_to_check, expected_result", [ From b83542d3a481d91d1c3c6370a5d58c36345b99cd Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Mon, 9 Feb 2026 15:53:28 +0100 Subject: [PATCH 05/10] Versioning and documentation --- README.md | 14 ++++++++++---- pyproject.toml | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index dee265c..b0d03e0 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,8 @@ log: run_one_path: /usr/bin/run-one +metadata_archive: /path/to/metadata/archive + transfer_details: user: username host: remote.host.com @@ -102,8 +104,11 @@ sequencers: - RunParameters.xml ignore_folders: - nosync - rsync_options: + remote_rsync_options: - --chmod=Dg+s,g+rw + metadata_rsync_options: + - "--exclude='*'" + - "--include=InterOp" # ... additional sequencer configurations ``` @@ -113,7 +118,7 @@ sequencers: 2. **Validation**: Confirms run ID matches expected format for the sequencer type 3. **Transfer Phases**: - **Sequencing Phase**: Starts continuous background rsync transfer while sequencing is ongoing (when the final sequencing file doesn't exist). Uploads status and metadata files (specified for each sequencer type in the config with `metadata_for_statusdb`) to database. - - **Final Transfer**: After sequencing completes (final sequencing file appears), initiates final rsync transfer and captures exit code. + - **Final Transfer**: After sequencing completes (final sequencing file appears), syncs specified metadata file to archive location, initiates final rsync transfer and captures exit codes. - **Completion**: Updates database when transfer was successful. ### Status Tracking @@ -145,14 +150,15 @@ Run status is tracked in CouchDB with events including: - Final completion is indicated by the presence of a sequencer-specific final file (e.g., `RTAComplete.txt` for Illumina) - Remote storage is accessible via rsync over SSH - CouchDB is accessible and the database exists -- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates +- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates and sync to metadata archive location ### Status Files The logic of the script relies on the following status files: - `run.final_file` - The final file written by each sequencing machine. Used to indicate when the sequencing has completed. -- `final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob. +- `.final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob. +- `.metadata_rsync_exitcode` - Used to indicate when rsync of metadata to the metadata archive is done, so that the rsync can be run in the background. This is useful when there are I/O issue with the disks. ## Development diff --git a/pyproject.toml b/pyproject.toml index e8b5d7b..34d25f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ ignore = [ [project] name = "dataflow_transfer" -version = "1.0.5" +version = "1.1.0" description = "Script for transferring sequencing data from sequencers to storage" authors = [ { name = "Sara Sjunnebo", email = "sara.sjunnebo@scilifelab.se" }, From e59d7739dbe934dace7f534eb5c54dafc8bedad2 Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Mon, 9 Feb 2026 16:01:41 +0100 Subject: [PATCH 06/10] Add `--exclude='*'` to last option for metadata rsync instead of in config --- README.md | 1 - dataflow_transfer/run_classes/generic_runs.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b0d03e0..0a56e72 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,6 @@ sequencers: remote_rsync_options: - --chmod=Dg+s,g+rw metadata_rsync_options: - - "--exclude='*'" - "--include=InterOp" # ... additional sequencer configurations ``` diff --git a/dataflow_transfer/run_classes/generic_runs.py b/dataflow_transfer/run_classes/generic_runs.py index 60d44f4..bedb688 100644 --- a/dataflow_transfer/run_classes/generic_runs.py +++ b/dataflow_transfer/run_classes/generic_runs.py @@ -107,6 +107,7 @@ def generate_rsync_command(self, remote=False, with_exit_code_file=False): "-au", log_file_option, *(rsync_options), + "--exclude='*'" if not remote else "", source, destination, ] From 80bed1f9c1ff09e8a4a848ddf5f41ebcecb286fd Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Tue, 10 Feb 2026 09:58:53 +0100 Subject: [PATCH 07/10] Change miarka_destination to remote_destination --- README.md | 2 +- dataflow_transfer/run_classes/generic_runs.py | 12 ++++++------ dataflow_transfer/tests/test_run_classes.py | 8 ++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 0a56e72..56af976 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,7 @@ statusdb: sequencers: NovaSeqXPlus: sequencing_path: /sequencing/NovaSeqXPlus - miarka_destination: /Illumina/NovaSeqXPlus + remote_destination: /Illumina/NovaSeqXPlus metadata_for_statusdb: - RunInfo.xml - RunParameters.xml diff --git a/dataflow_transfer/run_classes/generic_runs.py b/dataflow_transfer/run_classes/generic_runs.py index bedb688..2704746 100644 --- a/dataflow_transfer/run_classes/generic_runs.py +++ b/dataflow_transfer/run_classes/generic_runs.py @@ -32,7 +32,7 @@ def __init__(self, run_dir, configuration): self.final_rsync_exitcode_file = os.path.join( self.run_dir, ".final_rsync_exitcode" ) - self.miarka_destination = self.sequencer_config.get("miarka_destination") + self.remote_destination = self.sequencer_config.get("remote_destination") self.db = StatusdbSession(self.configuration.get("statusdb")) def confirm_run_type(self): @@ -85,7 +85,7 @@ def generate_rsync_command(self, remote=False, with_exit_code_file=False): + "@" + self.transfer_details.get("host") + ":" - + self.miarka_destination + + self.remote_destination ) log_file_option = "--log-file=" + os.path.join( self.run_dir, "rsync_remote_log.txt" @@ -121,15 +121,15 @@ def start_transfer(self, final=False): transfer_command = self.generate_rsync_command( remote=True, with_exit_code_file=final ) - if fs.rsync_is_running(src=self.run_dir, dst=self.miarka_destination): + if fs.rsync_is_running(src=self.run_dir, dst=self.remote_destination): logger.info( - f"Rsync is already running for {self.run_dir} to destination {self.miarka_destination}. Skipping background transfer initiation." + f"Rsync is already running for {self.run_dir} to destination {self.remote_destination}. Skipping background transfer initiation." ) return try: fs.submit_background_process(transfer_command) logger.info( - f"{self.run_id}: Started rsync to {self.miarka_destination}" + f"{self.run_id}: Started rsync to {self.remote_destination}" + f" with the following command: '{transfer_command}'" ) except Exception as e: @@ -137,7 +137,7 @@ def start_transfer(self, final=False): raise e rsync_info = { "command": transfer_command, - "destination_path": self.miarka_destination, + "destination_path": self.remote_destination, } if final: self.update_statusdb( diff --git a/dataflow_transfer/tests/test_run_classes.py b/dataflow_transfer/tests/test_run_classes.py index 72d72cb..a01143e 100644 --- a/dataflow_transfer/tests/test_run_classes.py +++ b/dataflow_transfer/tests/test_run_classes.py @@ -21,7 +21,7 @@ def novaseqxplus_testobj(tmp_path): }, "sequencers": { "NovaSeqXPlus": { - "miarka_destination": "/data/NovaSeqXPlus", + "remote_destination": "/data/NovaSeqXPlus", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], "remote_rsync_options": ["--chmod=Dg+s,g+rw"], @@ -53,7 +53,7 @@ def nextseq_testobj(tmp_path): }, "sequencers": { "NextSeq": { - "miarka_destination": "/data/NextSeq", + "remote_destination": "/data/NextSeq", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], "remote_rsync_options": ["--chmod=Dg+s,g+rw"], @@ -85,7 +85,7 @@ def miseqseq_testobj(tmp_path): }, "sequencers": { "MiSeq": { - "miarka_destination": "/data/MiSeq", + "remote_destination": "/data/MiSeq", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], "remote_rsync_options": ["--chmod=Dg+s,g+rw"], @@ -117,7 +117,7 @@ def miseqseqi100_testobj(tmp_path): }, "sequencers": { "MiSeqi100": { - "miarka_destination": "/data/MiSeqi100", + "remote_destination": "/data/MiSeqi100", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], "remote_rsync_options": ["--chmod=Dg+s,g+rw"], From e25bd1539f95d210b84013f8d35a0f1abad8f015 Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Tue, 10 Feb 2026 10:18:08 +0100 Subject: [PATCH 08/10] Change remote to metadata_only --- dataflow_transfer/run_classes/generic_runs.py | 27 ++++++++++--------- dataflow_transfer/tests/test_run_classes.py | 18 +++++++------ 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/dataflow_transfer/run_classes/generic_runs.py b/dataflow_transfer/run_classes/generic_runs.py index 2704746..daf88dc 100644 --- a/dataflow_transfer/run_classes/generic_runs.py +++ b/dataflow_transfer/run_classes/generic_runs.py @@ -58,7 +58,7 @@ def metadata_synced(self): def sync_metadata(self): """Start background rsync transfer for metadata files.""" metadata_rsync_command = self.generate_rsync_command( - remote=False, with_exit_code_file=True + metadata_only=True, with_exit_code_file=True ) if fs.rsync_is_running(src=self.run_dir, dst=self.metadata_destination): @@ -76,9 +76,17 @@ def sync_metadata(self): logger.error(f"Failed to start metadata rsync for {self.run_id}: {e}") raise e - def generate_rsync_command(self, remote=False, with_exit_code_file=False): + def generate_rsync_command(self, metadata_only=False, with_exit_code_file=False): """Generate an rsync command string.""" - if remote: + if metadata_only: + source = self.run_dir + "/" + destination = self.metadata_destination + "/" + log_file_option = "--log-file=" + os.path.join( + self.run_dir, "rsync_metadata_log.txt" + ) + rsync_options = self.sequencer_config.get("metadata_rsync_options", []) + exit_code_file = self.metadata_rsync_exitcode_file + else: source = self.run_dir destination = ( self.transfer_details.get("user") @@ -92,14 +100,7 @@ def generate_rsync_command(self, remote=False, with_exit_code_file=False): ) rsync_options = self.sequencer_config.get("remote_rsync_options", []) exit_code_file = self.final_rsync_exitcode_file - else: - source = self.run_dir + "/" - destination = self.metadata_destination + "/" - log_file_option = "--log-file=" + os.path.join( - self.run_dir, "rsync_metadata_log.txt" - ) - rsync_options = self.sequencer_config.get("metadata_rsync_options", []) - exit_code_file = self.metadata_rsync_exitcode_file + run_one_bin = self.configuration.get("run_one_path", "run-one") command = [ run_one_bin, @@ -107,7 +108,7 @@ def generate_rsync_command(self, remote=False, with_exit_code_file=False): "-au", log_file_option, *(rsync_options), - "--exclude='*'" if not remote else "", + "--exclude='*'" if metadata_only else "", source, destination, ] @@ -119,7 +120,7 @@ def generate_rsync_command(self, remote=False, with_exit_code_file=False): def start_transfer(self, final=False): """Start background rsync transfer to storage.""" transfer_command = self.generate_rsync_command( - remote=True, with_exit_code_file=final + metadata_only=False, with_exit_code_file=final ) if fs.rsync_is_running(src=self.run_dir, dst=self.remote_destination): logger.info( diff --git a/dataflow_transfer/tests/test_run_classes.py b/dataflow_transfer/tests/test_run_classes.py index a01143e..58238cb 100644 --- a/dataflow_transfer/tests/test_run_classes.py +++ b/dataflow_transfer/tests/test_run_classes.py @@ -190,26 +190,28 @@ def test_sequencing_ongoing(run_fixture, request): @pytest.mark.parametrize( - "run_fixture, remote, with_exit_code_file", + "run_fixture, metadata_only, with_exit_code_file", [ ("novaseqxplus_testobj", False, True), - ("novaseqxplus_testobj", True, False), + ("novaseqxplus_testobj", False, False), ("novaseqxplus_testobj", True, True), ("nextseq_testobj", False, True), - ("nextseq_testobj", True, False), + ("nextseq_testobj", False, False), ("nextseq_testobj", True, True), ("miseqseq_testobj", False, True), - ("miseqseq_testobj", True, False), + ("miseqseq_testobj", False, False), ("miseqseq_testobj", True, True), ("miseqseqi100_testobj", False, True), - ("miseqseqi100_testobj", True, False), + ("miseqseqi100_testobj", False, False), ("miseqseqi100_testobj", True, True), ], ) -def test_generate_rsync_command(run_fixture, remote, with_exit_code_file, request): +def test_generate_rsync_command( + run_fixture, metadata_only, with_exit_code_file, request +): run_obj = request.getfixturevalue(run_fixture) rsync_command = run_obj.generate_rsync_command( - remote=remote, with_exit_code_file=with_exit_code_file + metadata_only=metadata_only, with_exit_code_file=with_exit_code_file ) assert "run-one rsync" in rsync_command assert "--log-file=" in rsync_command @@ -337,7 +339,7 @@ def test_metadata_synced(run_fixture, sync_successful, request): def test_sync_metadata(run_fixture, request, monkeypatch): run_obj = request.getfixturevalue(run_fixture) - def mock_generate_rsync_command(remote, with_exit_code_file): + def mock_generate_rsync_command(metadata_only, with_exit_code_file): return "rsync command" def mock_rsync_is_running(src, dst): From e59af6fcee889ebdb067036a418429f72c44ce19 Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Tue, 10 Feb 2026 12:05:37 +0100 Subject: [PATCH 09/10] Make sure metadata gets synced even if transfer to HPC is done --- dataflow_transfer/dataflow_transfer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dataflow_transfer/dataflow_transfer.py b/dataflow_transfer/dataflow_transfer.py index 575c05f..a1964bb 100644 --- a/dataflow_transfer/dataflow_transfer.py +++ b/dataflow_transfer/dataflow_transfer.py @@ -22,7 +22,11 @@ def process_run(run_dir, sequencer, config): run.confirm_run_type() ## Transfer already completed. Do nothing. - if run.final_sync_successful and run.has_status("transferred_to_hpc"): + if ( + run.final_sync_successful + and run.has_status("transferred_to_hpc") + and not run.metadata_synced #TODO: consider the consequences of this + ): # Check transfer success both in statusdb and via exit code file # To restart transfer, remove the exit code file logger.info(f"Transfer of {run_dir} is finished. No action needed.") @@ -52,8 +56,9 @@ def process_run(run_dir, sequencer, config): ## Final transfer completed successfully. Update statusdb. if run.final_sync_successful: - logger.info(f"Final transfer completed successfully for {run_dir}.") - run.update_statusdb(status="transferred_to_hpc") + if not run.has_status("transferred_to_hpc"): + logger.info(f"Final transfer completed successfully for {run_dir}.") + run.update_statusdb(status="transferred_to_hpc") return ## Unknown status of run. Log error and raise exception. From 22521e487951ad3ba6b743628449c8bbb4b9f70a Mon Sep 17 00:00:00 2001 From: ssjunnebo Date: Tue, 10 Feb 2026 12:05:51 +0100 Subject: [PATCH 10/10] remove todo --- dataflow_transfer/dataflow_transfer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataflow_transfer/dataflow_transfer.py b/dataflow_transfer/dataflow_transfer.py index a1964bb..aaecc14 100644 --- a/dataflow_transfer/dataflow_transfer.py +++ b/dataflow_transfer/dataflow_transfer.py @@ -25,7 +25,7 @@ def process_run(run_dir, sequencer, config): if ( run.final_sync_successful and run.has_status("transferred_to_hpc") - and not run.metadata_synced #TODO: consider the consequences of this + and not run.metadata_synced ): # Check transfer success both in statusdb and via exit code file # To restart transfer, remove the exit code file