Skip to content
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ log:

run_one_path: /usr/bin/run-one

metadata_archive: /path/to/metadata/archive

transfer_details:
user: username
host: remote.host.com
Expand All @@ -96,14 +98,16 @@ statusdb:
sequencers:
NovaSeqXPlus:
sequencing_path: /sequencing/NovaSeqXPlus
miarka_destination: /Illumina/NovaSeqXPlus
remote_destination: /Illumina/NovaSeqXPlus
metadata_for_statusdb:
- RunInfo.xml
- RunParameters.xml
ignore_folders:
- nosync
rsync_options:
remote_rsync_options:
- --chmod=Dg+s,g+rw
metadata_rsync_options:
- "--include=InterOp"
# ... additional sequencer configurations
```

Expand All @@ -113,7 +117,7 @@ sequencers:
2. **Validation**: Confirms run ID matches expected format for the sequencer type
3. **Transfer Phases**:
- **Sequencing Phase**: Starts continuous background rsync transfer while sequencing is ongoing (when the final sequencing file doesn't exist). Uploads status and metadata files (specified for each sequencer type in the config with `metadata_for_statusdb`) to database.
- **Final Transfer**: After sequencing completes (final sequencing file appears), initiates final rsync transfer and captures exit code.
- **Final Transfer**: After sequencing completes (final sequencing file appears), syncs specified metadata file to archive location, initiates final rsync transfer and captures exit codes.
- **Completion**: Updates database when transfer was successful.

### Status Tracking
Expand Down Expand Up @@ -145,14 +149,15 @@ Run status is tracked in CouchDB with events including:
- Final completion is indicated by the presence of a sequencer-specific final file (e.g., `RTAComplete.txt` for Illumina)
- Remote storage is accessible via rsync over SSH
- CouchDB is accessible and the database exists
- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates
- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates and sync to metadata archive location

### Status Files

The logic of the script relies on the following status files:

- `run.final_file` - The final file written by each sequencing machine. Used to indicate when the sequencing has completed.
- `final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob.
- `.final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob.
- `.metadata_rsync_exitcode` - Used to indicate when rsync of metadata to the metadata archive is done, so that the rsync can be run in the background. This is useful when there are I/O issue with the disks.

## Development

Expand Down
16 changes: 13 additions & 3 deletions dataflow_transfer/dataflow_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ def process_run(run_dir, sequencer, config):
run.confirm_run_type()

## Transfer already completed. Do nothing.
if run.final_sync_successful and run.has_status("transferred_to_hpc"):
if (
run.final_sync_successful
and run.has_status("transferred_to_hpc")
and not run.metadata_synced
):
# Check transfer success both in statusdb and via exit code file
# To restart transfer, remove the exit code file
logger.info(f"Transfer of {run_dir} is finished. No action needed.")
Expand All @@ -34,6 +38,11 @@ def process_run(run_dir, sequencer, config):
run.start_transfer(final=False)
return

## Sequencing finished. Copy metadata in the background if not already done.
if run.has_status("sequencing_finished"):
if not run.metadata_synced:
run.sync_metadata()

## Sequencing finished but transfer not complete. Start final transfer.
if not run.final_sync_successful:
if run.has_status("sequencing_finished"):
Expand All @@ -47,8 +56,9 @@ def process_run(run_dir, sequencer, config):

## Final transfer completed successfully. Update statusdb.
if run.final_sync_successful:
logger.info(f"Final transfer completed successfully for {run_dir}.")
run.update_statusdb(status="transferred_to_hpc")
if not run.has_status("transferred_to_hpc"):
logger.info(f"Final transfer completed successfully for {run_dir}.")
run.update_statusdb(status="transferred_to_hpc")
return

## Unknown status of run. Log error and raise exception.
Expand Down
91 changes: 72 additions & 19 deletions dataflow_transfer/run_classes/generic_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@ def __init__(self, run_dir, configuration):
)
self.final_file = ""
self.transfer_details = self.configuration.get("transfer_details", {})
self.metadata_rsync_exitcode_file = os.path.join(
self.run_dir, ".metadata_rsync_exitcode"
)
self.metadata_destination = os.path.join(
self.configuration.get("metadata_archive"),
getattr(self, "run_type", None),
self.run_id,
)
self.final_rsync_exitcode_file = os.path.join(
self.run_dir, ".final_rsync_exitcode"
)
self.miarka_destination = self.sequencer_config.get("miarka_destination")
self.remote_destination = self.sequencer_config.get("remote_destination")
self.db = StatusdbSession(self.configuration.get("statusdb"))

def confirm_run_type(self):
Expand All @@ -42,50 +50,95 @@ def sequencing_ongoing(self):
return False
return True

def generate_rsync_command(self, is_final_sync=False):
"""Generate an rsync command string."""
destination = (
self.transfer_details.get("user")
+ "@"
+ self.transfer_details.get("host")
+ ":"
+ self.miarka_destination
@property
def metadata_synced(self):
"""Check if the metadata rsync was successful by reading the exit code file."""
return fs.check_exit_status(self.metadata_rsync_exitcode_file)

def sync_metadata(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing something or Is this not called anywhere yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's called in dataflow_transfer.py, by process_run

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how I missed that. But anyhow, so the sync is triggered once the sequencing is finished but after that we don't really ensure that it finished properly right? Not sure how to change it, but we have had occassional errors on the rsync to the nas-ns. But maybe I misinterpret and its actually attempted again if failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the exit code of the rsync is not 0 it will retry. The exception is if the first case, if run.final_sync_successful and run.has_status("transferred_to_hpc") is true, then it wouldn't get to the metadata sync step. Maybe I can add another check in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some additional logic to process_run so now it should be synced even if the sync to miarka is finished

"""Start background rsync transfer for metadata files."""
metadata_rsync_command = self.generate_rsync_command(
metadata_only=True, with_exit_code_file=True
)

if fs.rsync_is_running(src=self.run_dir, dst=self.metadata_destination):
logger.info(
f"Metadata rsync is already running for {self.run_dir} to destination {self.metadata_destination}. Skipping background metadata sync initiation."
)
return
try:
fs.submit_background_process(metadata_rsync_command)
logger.info(
f"{self.run_id}: Started metadata rsync to {self.metadata_destination}"
+ f" with the following command: '{metadata_rsync_command}'"
)
except Exception as e:
logger.error(f"Failed to start metadata rsync for {self.run_id}: {e}")
raise e

def generate_rsync_command(self, metadata_only=False, with_exit_code_file=False):
"""Generate an rsync command string."""
if metadata_only:
source = self.run_dir + "/"
destination = self.metadata_destination + "/"
log_file_option = "--log-file=" + os.path.join(
self.run_dir, "rsync_metadata_log.txt"
)
rsync_options = self.sequencer_config.get("metadata_rsync_options", [])
exit_code_file = self.metadata_rsync_exitcode_file
else:
source = self.run_dir
destination = (
self.transfer_details.get("user")
+ "@"
+ self.transfer_details.get("host")
+ ":"
+ self.remote_destination
)
log_file_option = "--log-file=" + os.path.join(
self.run_dir, "rsync_remote_log.txt"
)
rsync_options = self.sequencer_config.get("remote_rsync_options", [])
exit_code_file = self.final_rsync_exitcode_file

run_one_bin = self.configuration.get("run_one_path", "run-one")
command = [
run_one_bin,
"rsync",
"-au",
"--log-file=" + os.path.join(self.run_dir, "rsync_remote_log.txt"),
*(self.sequencer_config.get("rsync_options", [])),
self.run_dir,
log_file_option,
*(rsync_options),
"--exclude='*'" if metadata_only else "",
source,
destination,
]
command_str = " ".join(command)
if is_final_sync:
command_str += f"; echo $? > {self.final_rsync_exitcode_file}"
if with_exit_code_file:
command_str += f"; echo $? > {exit_code_file}"
return command_str

def start_transfer(self, final=False):
"""Start background rsync transfer to storage."""
transfer_command = self.generate_rsync_command(is_final_sync=final)
if fs.rsync_is_running(src=self.run_dir):
transfer_command = self.generate_rsync_command(
metadata_only=False, with_exit_code_file=final
)
if fs.rsync_is_running(src=self.run_dir, dst=self.remote_destination):
logger.info(
f"Rsync is already running for {self.run_dir}. Skipping background transfer initiation."
f"Rsync is already running for {self.run_dir} to destination {self.remote_destination}. Skipping background transfer initiation."
)
return
try:
fs.submit_background_process(transfer_command)
logger.info(
f"{self.run_id}: Started rsync to {self.miarka_destination}"
f"{self.run_id}: Started rsync to {self.remote_destination}"
+ f" with the following command: '{transfer_command}'"
)
except Exception as e:
logger.error(f"Failed to start rsync for {self.run_id}: {e}")
raise e
rsync_info = {
"command": transfer_command,
"destination_path": self.miarka_destination,
"destination_path": self.remote_destination,
}
if final:
self.update_statusdb(
Expand Down
4 changes: 2 additions & 2 deletions dataflow_transfer/tests/test_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ class TestRsyncIsRunning:
@patch("subprocess.check_output")
def test_rsync_running(self, mock_check_output):
mock_check_output.return_value = b"12345"
assert rsync_is_running("/some/path") is True
assert rsync_is_running("/some/path", "/dst/path") is True

@patch("subprocess.check_output")
def test_rsync_not_running(self, mock_check_output):
mock_check_output.side_effect = CalledProcessError(1, "pgrep")
assert rsync_is_running("/some/path") is False
assert rsync_is_running("/some/path", "/dst/path") is False


class TestSubmitBackgroundProcess:
Expand Down
Loading