diff --git a/README.md b/README.md index dee265c..56af976 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,8 @@ log: run_one_path: /usr/bin/run-one +metadata_archive: /path/to/metadata/archive + transfer_details: user: username host: remote.host.com @@ -96,14 +98,16 @@ statusdb: sequencers: NovaSeqXPlus: sequencing_path: /sequencing/NovaSeqXPlus - miarka_destination: /Illumina/NovaSeqXPlus + remote_destination: /Illumina/NovaSeqXPlus metadata_for_statusdb: - RunInfo.xml - RunParameters.xml ignore_folders: - nosync - rsync_options: + remote_rsync_options: - --chmod=Dg+s,g+rw + metadata_rsync_options: + - "--include=InterOp" # ... additional sequencer configurations ``` @@ -113,7 +117,7 @@ sequencers: 2. **Validation**: Confirms run ID matches expected format for the sequencer type 3. **Transfer Phases**: - **Sequencing Phase**: Starts continuous background rsync transfer while sequencing is ongoing (when the final sequencing file doesn't exist). Uploads status and metadata files (specified for each sequencer type in the config with `metadata_for_statusdb`) to database. - - **Final Transfer**: After sequencing completes (final sequencing file appears), initiates final rsync transfer and captures exit code. + - **Final Transfer**: After sequencing completes (final sequencing file appears), syncs specified metadata file to archive location, initiates final rsync transfer and captures exit codes. - **Completion**: Updates database when transfer was successful. ### Status Tracking @@ -145,14 +149,15 @@ Run status is tracked in CouchDB with events including: - Final completion is indicated by the presence of a sequencer-specific final file (e.g., `RTAComplete.txt` for Illumina) - Remote storage is accessible via rsync over SSH - CouchDB is accessible and the database exists -- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates +- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates and sync to metadata archive location ### Status Files The logic of the script relies on the following status files: - `run.final_file` - The final file written by each sequencing machine. Used to indicate when the sequencing has completed. -- `final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob. +- `.final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob. +- `.metadata_rsync_exitcode` - Used to indicate when rsync of metadata to the metadata archive is done, so that the rsync can be run in the background. This is useful when there are I/O issue with the disks. ## Development diff --git a/dataflow_transfer/dataflow_transfer.py b/dataflow_transfer/dataflow_transfer.py index ebbae9c..aaecc14 100644 --- a/dataflow_transfer/dataflow_transfer.py +++ b/dataflow_transfer/dataflow_transfer.py @@ -22,7 +22,11 @@ def process_run(run_dir, sequencer, config): run.confirm_run_type() ## Transfer already completed. Do nothing. - if run.final_sync_successful and run.has_status("transferred_to_hpc"): + if ( + run.final_sync_successful + and run.has_status("transferred_to_hpc") + and not run.metadata_synced + ): # Check transfer success both in statusdb and via exit code file # To restart transfer, remove the exit code file logger.info(f"Transfer of {run_dir} is finished. No action needed.") @@ -34,6 +38,11 @@ def process_run(run_dir, sequencer, config): run.start_transfer(final=False) return + ## Sequencing finished. Copy metadata in the background if not already done. + if run.has_status("sequencing_finished"): + if not run.metadata_synced: + run.sync_metadata() + ## Sequencing finished but transfer not complete. Start final transfer. if not run.final_sync_successful: if run.has_status("sequencing_finished"): @@ -47,8 +56,9 @@ def process_run(run_dir, sequencer, config): ## Final transfer completed successfully. Update statusdb. if run.final_sync_successful: - logger.info(f"Final transfer completed successfully for {run_dir}.") - run.update_statusdb(status="transferred_to_hpc") + if not run.has_status("transferred_to_hpc"): + logger.info(f"Final transfer completed successfully for {run_dir}.") + run.update_statusdb(status="transferred_to_hpc") return ## Unknown status of run. Log error and raise exception. diff --git a/dataflow_transfer/run_classes/generic_runs.py b/dataflow_transfer/run_classes/generic_runs.py index d76d0b4..daf88dc 100644 --- a/dataflow_transfer/run_classes/generic_runs.py +++ b/dataflow_transfer/run_classes/generic_runs.py @@ -21,10 +21,18 @@ def __init__(self, run_dir, configuration): ) self.final_file = "" self.transfer_details = self.configuration.get("transfer_details", {}) + self.metadata_rsync_exitcode_file = os.path.join( + self.run_dir, ".metadata_rsync_exitcode" + ) + self.metadata_destination = os.path.join( + self.configuration.get("metadata_archive"), + getattr(self, "run_type", None), + self.run_id, + ) self.final_rsync_exitcode_file = os.path.join( self.run_dir, ".final_rsync_exitcode" ) - self.miarka_destination = self.sequencer_config.get("miarka_destination") + self.remote_destination = self.sequencer_config.get("remote_destination") self.db = StatusdbSession(self.configuration.get("statusdb")) def confirm_run_type(self): @@ -42,42 +50,87 @@ def sequencing_ongoing(self): return False return True - def generate_rsync_command(self, is_final_sync=False): - """Generate an rsync command string.""" - destination = ( - self.transfer_details.get("user") - + "@" - + self.transfer_details.get("host") - + ":" - + self.miarka_destination + @property + def metadata_synced(self): + """Check if the metadata rsync was successful by reading the exit code file.""" + return fs.check_exit_status(self.metadata_rsync_exitcode_file) + + def sync_metadata(self): + """Start background rsync transfer for metadata files.""" + metadata_rsync_command = self.generate_rsync_command( + metadata_only=True, with_exit_code_file=True ) + + if fs.rsync_is_running(src=self.run_dir, dst=self.metadata_destination): + logger.info( + f"Metadata rsync is already running for {self.run_dir} to destination {self.metadata_destination}. Skipping background metadata sync initiation." + ) + return + try: + fs.submit_background_process(metadata_rsync_command) + logger.info( + f"{self.run_id}: Started metadata rsync to {self.metadata_destination}" + + f" with the following command: '{metadata_rsync_command}'" + ) + except Exception as e: + logger.error(f"Failed to start metadata rsync for {self.run_id}: {e}") + raise e + + def generate_rsync_command(self, metadata_only=False, with_exit_code_file=False): + """Generate an rsync command string.""" + if metadata_only: + source = self.run_dir + "/" + destination = self.metadata_destination + "/" + log_file_option = "--log-file=" + os.path.join( + self.run_dir, "rsync_metadata_log.txt" + ) + rsync_options = self.sequencer_config.get("metadata_rsync_options", []) + exit_code_file = self.metadata_rsync_exitcode_file + else: + source = self.run_dir + destination = ( + self.transfer_details.get("user") + + "@" + + self.transfer_details.get("host") + + ":" + + self.remote_destination + ) + log_file_option = "--log-file=" + os.path.join( + self.run_dir, "rsync_remote_log.txt" + ) + rsync_options = self.sequencer_config.get("remote_rsync_options", []) + exit_code_file = self.final_rsync_exitcode_file + run_one_bin = self.configuration.get("run_one_path", "run-one") command = [ run_one_bin, "rsync", "-au", - "--log-file=" + os.path.join(self.run_dir, "rsync_remote_log.txt"), - *(self.sequencer_config.get("rsync_options", [])), - self.run_dir, + log_file_option, + *(rsync_options), + "--exclude='*'" if metadata_only else "", + source, destination, ] command_str = " ".join(command) - if is_final_sync: - command_str += f"; echo $? > {self.final_rsync_exitcode_file}" + if with_exit_code_file: + command_str += f"; echo $? > {exit_code_file}" return command_str def start_transfer(self, final=False): """Start background rsync transfer to storage.""" - transfer_command = self.generate_rsync_command(is_final_sync=final) - if fs.rsync_is_running(src=self.run_dir): + transfer_command = self.generate_rsync_command( + metadata_only=False, with_exit_code_file=final + ) + if fs.rsync_is_running(src=self.run_dir, dst=self.remote_destination): logger.info( - f"Rsync is already running for {self.run_dir}. Skipping background transfer initiation." + f"Rsync is already running for {self.run_dir} to destination {self.remote_destination}. Skipping background transfer initiation." ) return try: fs.submit_background_process(transfer_command) logger.info( - f"{self.run_id}: Started rsync to {self.miarka_destination}" + f"{self.run_id}: Started rsync to {self.remote_destination}" + f" with the following command: '{transfer_command}'" ) except Exception as e: @@ -85,7 +138,7 @@ def start_transfer(self, final=False): raise e rsync_info = { "command": transfer_command, - "destination_path": self.miarka_destination, + "destination_path": self.remote_destination, } if final: self.update_statusdb( diff --git a/dataflow_transfer/tests/test_filesystem.py b/dataflow_transfer/tests/test_filesystem.py index 2812b82..463e16e 100644 --- a/dataflow_transfer/tests/test_filesystem.py +++ b/dataflow_transfer/tests/test_filesystem.py @@ -69,12 +69,12 @@ class TestRsyncIsRunning: @patch("subprocess.check_output") def test_rsync_running(self, mock_check_output): mock_check_output.return_value = b"12345" - assert rsync_is_running("/some/path") is True + assert rsync_is_running("/some/path", "/dst/path") is True @patch("subprocess.check_output") def test_rsync_not_running(self, mock_check_output): mock_check_output.side_effect = CalledProcessError(1, "pgrep") - assert rsync_is_running("/some/path") is False + assert rsync_is_running("/some/path", "/dst/path") is False class TestSubmitBackgroundProcess: diff --git a/dataflow_transfer/tests/test_run_classes.py b/dataflow_transfer/tests/test_run_classes.py index 09457fe..58238cb 100644 --- a/dataflow_transfer/tests/test_run_classes.py +++ b/dataflow_transfer/tests/test_run_classes.py @@ -12,6 +12,7 @@ def novaseqxplus_testobj(tmp_path): config = { "log": {"file": "test.log"}, "transfer_details": {"user": "testuser", "host": "testhost"}, + "metadata_archive": "/data/metadata_archive", "statusdb": { "username": "dbuser", "password": "dbpass", @@ -20,10 +21,15 @@ def novaseqxplus_testobj(tmp_path): }, "sequencers": { "NovaSeqXPlus": { - "miarka_destination": "/data/NovaSeqXPlus", + "remote_destination": "/data/NovaSeqXPlus", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], - "rsync_options": ["--chmod=Dg+s,g+rw"], + "remote_rsync_options": ["--chmod=Dg+s,g+rw"], + "metadata_rsync_options": [ + "--include=RunInfo.xml", + "--include=RunParameters.xml", + "--exclude=*", + ], } }, } @@ -38,6 +44,7 @@ def nextseq_testobj(tmp_path): config = { "log": {"file": "test.log"}, "transfer_details": {"user": "testuser", "host": "testhost"}, + "metadata_archive": "/data/metadata_archive", "statusdb": { "username": "dbuser", "password": "dbpass", @@ -46,10 +53,15 @@ def nextseq_testobj(tmp_path): }, "sequencers": { "NextSeq": { - "miarka_destination": "/data/NextSeq", + "remote_destination": "/data/NextSeq", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], - "rsync_options": ["--chmod=Dg+s,g+rw"], + "remote_rsync_options": ["--chmod=Dg+s,g+rw"], + "metadata_rsync_options": [ + "--include=RunInfo.xml", + "--include=RunParameters.xml", + "--exclude=*", + ], } }, } @@ -64,6 +76,7 @@ def miseqseq_testobj(tmp_path): config = { "log": {"file": "test.log"}, "transfer_details": {"user": "testuser", "host": "testhost"}, + "metadata_archive": "/data/metadata_archive", "statusdb": { "username": "dbuser", "password": "dbpass", @@ -72,10 +85,15 @@ def miseqseq_testobj(tmp_path): }, "sequencers": { "MiSeq": { - "miarka_destination": "/data/MiSeq", + "remote_destination": "/data/MiSeq", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], - "rsync_options": ["--chmod=Dg+s,g+rw"], + "remote_rsync_options": ["--chmod=Dg+s,g+rw"], + "metadata_rsync_options": [ + "--include=RunInfo.xml", + "--include=RunParameters.xml", + "--exclude=*", + ], } }, } @@ -90,6 +108,7 @@ def miseqseqi100_testobj(tmp_path): config = { "log": {"file": "test.log"}, "transfer_details": {"user": "testuser", "host": "testhost"}, + "metadata_archive": "/data/metadata_archive", "statusdb": { "username": "dbuser", "password": "dbpass", @@ -98,10 +117,15 @@ def miseqseqi100_testobj(tmp_path): }, "sequencers": { "MiSeqi100": { - "miarka_destination": "/data/MiSeqi100", + "remote_destination": "/data/MiSeqi100", "metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"], "ignore_folders": ["nosync"], - "rsync_options": ["--chmod=Dg+s,g+rw"], + "remote_rsync_options": ["--chmod=Dg+s,g+rw"], + "metadata_rsync_options": [ + "--include=RunInfo.xml", + "--include=RunParameters.xml", + "--exclude=*", + ], } }, } @@ -166,27 +190,34 @@ def test_sequencing_ongoing(run_fixture, request): @pytest.mark.parametrize( - "run_fixture, final_sync", + "run_fixture, metadata_only, with_exit_code_file", [ - ("novaseqxplus_testobj", False), - ("novaseqxplus_testobj", True), - ("nextseq_testobj", False), - ("nextseq_testobj", True), - ("miseqseq_testobj", False), - ("miseqseq_testobj", True), - ("miseqseqi100_testobj", False), - ("miseqseqi100_testobj", True), + ("novaseqxplus_testobj", False, True), + ("novaseqxplus_testobj", False, False), + ("novaseqxplus_testobj", True, True), + ("nextseq_testobj", False, True), + ("nextseq_testobj", False, False), + ("nextseq_testobj", True, True), + ("miseqseq_testobj", False, True), + ("miseqseq_testobj", False, False), + ("miseqseq_testobj", True, True), + ("miseqseqi100_testobj", False, True), + ("miseqseqi100_testobj", False, False), + ("miseqseqi100_testobj", True, True), ], ) -def test_generate_rsync_command(run_fixture, final_sync, request): +def test_generate_rsync_command( + run_fixture, metadata_only, with_exit_code_file, request +): run_obj = request.getfixturevalue(run_fixture) - rsync_command = run_obj.generate_rsync_command(is_final_sync=final_sync) + rsync_command = run_obj.generate_rsync_command( + metadata_only=metadata_only, with_exit_code_file=with_exit_code_file + ) assert "run-one rsync" in rsync_command assert "--log-file=" in rsync_command - assert "--chmod=Dg+s,g+rw" in rsync_command assert run_obj.run_dir in rsync_command - if final_sync: - assert f"; echo $? > {run_obj.final_rsync_exitcode_file}" in rsync_command + if with_exit_code_file: + assert "; echo $? >" in rsync_command @pytest.mark.parametrize( @@ -213,7 +244,7 @@ def test_generate_rsync_command(run_fixture, final_sync, request): def test_start_transfer(run_fixture, rsync_running, final, request, monkeypatch): run_obj = request.getfixturevalue(run_fixture) - def mock_rsync_is_running(src): + def mock_rsync_is_running(src, dst): return rsync_running def mock_submit_background_process(command_str): @@ -270,6 +301,66 @@ def test_final_sync_successful(run_fixture, sync_successful, request): assert run_obj.final_sync_successful == sync_successful +@pytest.mark.parametrize( + "run_fixture, sync_successful", + [ + ("novaseqxplus_testobj", True), + ("novaseqxplus_testobj", False), + ("nextseq_testobj", True), + ("nextseq_testobj", False), + ("miseqseq_testobj", True), + ("miseqseq_testobj", False), + ("miseqseqi100_testobj", True), + ("miseqseqi100_testobj", False), + ], +) +def test_metadata_synced(run_fixture, sync_successful, request): + run_obj = request.getfixturevalue(run_fixture) + if sync_successful: + # Create the final rsync exit code file with a success code + with open(run_obj.metadata_rsync_exitcode_file, "w") as f: + f.write("0") + else: + # Create the final rsync exit code file with a failure code + with open(run_obj.metadata_rsync_exitcode_file, "w") as f: + f.write("1") + assert run_obj.metadata_synced == sync_successful + + +@pytest.mark.parametrize( + "run_fixture", + [ + "novaseqxplus_testobj", + "nextseq_testobj", + "miseqseq_testobj", + "miseqseqi100_testobj", + ], +) +def test_sync_metadata(run_fixture, request, monkeypatch): + run_obj = request.getfixturevalue(run_fixture) + + def mock_generate_rsync_command(metadata_only, with_exit_code_file): + return "rsync command" + + def mock_rsync_is_running(src, dst): + return False + + def mock_submit_background_process(command_str): + mock_submit_background_process.called = True + mock_submit_background_process.command_str = command_str + + monkeypatch.setattr(run_obj, "generate_rsync_command", mock_generate_rsync_command) + monkeypatch.setattr(generic_runs.fs, "rsync_is_running", mock_rsync_is_running) + monkeypatch.setattr( + generic_runs.fs, "submit_background_process", mock_submit_background_process + ) + + run_obj.sync_metadata() + + assert hasattr(mock_submit_background_process, "called") + assert mock_submit_background_process.command_str == "rsync command" + + @pytest.mark.parametrize( "run_fixture, status_to_check, expected_result", [ diff --git a/dataflow_transfer/utils/filesystem.py b/dataflow_transfer/utils/filesystem.py index a02aadb..016159e 100644 --- a/dataflow_transfer/utils/filesystem.py +++ b/dataflow_transfer/utils/filesystem.py @@ -28,9 +28,9 @@ def find_runs(base_dir, ignore_folders=[]): return runs -def rsync_is_running(src): - """Check if rsync is already running for given src.""" - pattern = f"rsync.*{src}" +def rsync_is_running(src, dst): + """Check if rsync is already running for given src and destination.""" + pattern = f"rsync.*{src}.*{dst}" try: subprocess.check_output(["pgrep", "-f", pattern]) return True diff --git a/pyproject.toml b/pyproject.toml index e8b5d7b..34d25f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ ignore = [ [project] name = "dataflow_transfer" -version = "1.0.5" +version = "1.1.0" description = "Script for transferring sequencing data from sequencers to storage" authors = [ { name = "Sara Sjunnebo", email = "sara.sjunnebo@scilifelab.se" },