Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: API Tests

on:
push:
branches: [master]
pull_request:
branches: [master]

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: pip install -e ".[test]"

- name: Run API tests
run: pytest tests/test_api.py -v
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ share/python-wheels/
MANIFEST

# Personal
**/tests/
tests/**
#**/tests/
#tests/**
227 changes: 59 additions & 168 deletions bioinformatics_tools/workflow_tools/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,17 @@
from datetime import datetime
from pathlib import Path

from bioinformatics_tools.caragols import clix
from bioinformatics_tools.caragols.condo import CxNode
from bioinformatics_tools.file_classes.base_classes import command
from bioinformatics_tools.workflow_tools.bapptainer import (
CacheSifError, cache_sif_files, run_apptainer_container)
from bioinformatics_tools.workflow_tools.models import (ApptainerKey,
WorkflowKey)
CacheSifError, cache_sif_files)
from bioinformatics_tools.workflow_tools.models import WorkflowKey
from bioinformatics_tools.workflow_tools.output_cache import restore_all, store_all
from bioinformatics_tools.workflow_tools.programs import ProgramBase

LOGGER = logging.getLogger(__name__)
WORKFLOW_DIR = Path(__file__).parent


apptainer_keys: dict[str, ApptainerKey] = {
'prodigal': ApptainerKey(
executable='apptainer.lima',
sif_path='prodigal.sif',
commands=[]
)
}

workflow_keys: dict[str, WorkflowKey] = {
'example': WorkflowKey(
cmd_identifier='example',
Expand All @@ -51,8 +41,8 @@
}


class WorkflowBase(clix.App):
'''Base class for all workflows. Allows us to have access to config, logging, and reporting
class WorkflowBase(ProgramBase):
'''Snakemake workflow execution. Inherits single-program commands from ProgramBase.
'''

def __init__(self, workflow_id=None):
Expand Down Expand Up @@ -105,7 +95,7 @@ def build_executable(self, key: WorkflowKey, config_dict: dict = None, mode='not
# core_command.extend(output_list)
return core_command

def _run_workflow(self, wf_command):
def _run_subprocess(self, wf_command):
'''essentially a wrapper for subprocess.run() to tightly control snakemake execution'''
LOGGER.debug('Received command and running: %s', wf_command)
try:
Expand All @@ -115,154 +105,83 @@ def _run_workflow(self, wf_command):
self.failed(f'Critical ERROR during subprocess.run({wf_command}): {e}')
return 0

@command
def do_example(self):
'''example workflow to execute'''

LOGGER.info('Config:\n%s', self.conf.show())
def _run_pipeline(self, key_name: str, smk_config: dict, cache_map: dict = None, mode='dev'):
'''Shared pipeline execution: cache containers, restore outputs, run snakemake, store outputs.'''
selected_wf = workflow_keys.get(key_name)
if not selected_wf:
self.failed(f'No workflow key found for "{key_name}"')
return 1

# ----------------------- Step 0 - Get the WorkflowKey ----------------------- #
if not (selected_wf := workflow_keys.get('example')):
# Download / ensure .sif files are cached
try:
cache_sif_files(selected_wf.sif_files)
except CacheSifError as e:
LOGGER.critical('Error with cache_sif_files: %s', e)
self.failed(f'Error with cache_sif_files: {e}')
return 1

# -------------- Step 1 - Get the input file (eventually files) -------------- #
# Restore cached outputs from DB so snakemake skips completed rules
db_path = smk_config.get('margie_db')
input_file = smk_config.get('input_fasta')
if cache_map and db_path and input_file:
restored = restore_all(db_path, input_file, cache_map)
LOGGER.info('Cache restore results: %s', restored)

# Build and run snakemake
wf_command = self.build_executable(selected_wf, config_dict=smk_config, mode=mode)
LOGGER.info('Running snakemake command: %s', ' '.join(wf_command))
self._run_subprocess(wf_command)

# Store new outputs into DB cache
if cache_map and db_path and input_file:
store_all(db_path, input_file, cache_map)

self.succeeded(msg=f'Workflow "{key_name}" completed successfully')

@command
def do_example(self):
'''example workflow to execute'''
input_file = self.conf.get('input')
if not input_file:
LOGGER.error('No input file specified. Use: dane_wf example input: <file>')
self.failed('No input file specified')
return 1

# ------- Step 2 - Get the appropriate output file from the input file ------- #
# Derive output filename from input (e.g., file.fasta -> file-output.txt)
# Basically we need a way to trace input to final output
input_path = Path(input_file)
output_file = f"{input_path.stem}-output.txt"
LOGGER.info('Input file: %s', input_file)
LOGGER.info('Output file: %s', output_file)

# Log which snakemake executable will be used
try:
which_result = subprocess.run(['which', 'snakemake'], capture_output=True, text=True, check=True)
LOGGER.info('Using snakemake from: %s', which_result.stdout.strip())
except subprocess.CalledProcessError:
LOGGER.warning('Could not find snakemake executable in PATH')

# --- Step 3 - get program-specific params and send to snakemake as config --- #
prodigal_config = self.conf.get('prodigal')
threads = prodigal_config.get('threads')
#TODO: Is there a way to automatically get all config from prodigal or control here

smk_config = {
'input_fasta': input_file,
'output_fasta': output_file,
'prodigal_threads': threads
'output_fasta': f"{input_path.stem}-output.txt",
'prodigal_threads': prodigal_config.get('threads'),
}

# -------- TODO: Step 3.5 - Download / ensure .sif file is downloaded -------- #
try:
cache_sif_files(selected_wf.sif_files)
except CacheSifError as e:
LOGGER.critical('Error with cache_sif_files: %s', e)
self.failed(f'Error with cache_sif_files: {e}')

# ----------------------- Step 4 - build the executable ---------------------- #
wf_command = self.build_executable(selected_wf, config_dict=smk_config)
LOGGER.info('Running snakemake command: %s', wf_command)
str_smk = ' '.join(wf_command)
LOGGER.info('String snakemake: %s', str_smk)
print(f'\n=== SNAKEMAKE COMMAND ===\n{str_smk}\n========================\n')

# ------ Step 5 Execute the actual workflow (happens within our UV env) ------ #
self._run_workflow(wf_command)
self.succeeded(msg="All good in the neighborhood (AppleBees TM)")
self._run_pipeline('example', smk_config)


def get_prg_args(self, config_group):
'''find relevant configuration settings to add to container run
'''
args_list = []

# Get the config node for this program
try:
prog_node: CxNode = self.conf.get(config_group)
except KeyError:
LOGGER.warning('No configuration found for %s', config_group)
return args_list

# If it's not a CxNode (e.g., it's a simple value), return empty
if not hasattr(prog_node, 'children'):
LOGGER.warning('%s is not a configuration group', config_group)
return args_list

# Iterate over the direct children of this config group
for key, value in prog_node.children.items():
# Skip nested CxNode objects (only process direct key-value pairs)
if not isinstance(value, type(prog_node)):
# Convert key to command-line flag format
flag = f'--{key}'
# Convert value to string
str_value = str(value)
# Add to args list
args_list.extend([flag, str_value])

LOGGER.debug('Generated args for %s: %s', config_group, args_list)
return args_list

@command
def do_prodigal(self):
'''run prodigal'''
EXECUTABLE = 'prodigal'

if not (container := apptainer_keys.get('prodigal')):
self.failed('No known match for "prodigal"')
return

prg_args = self.get_prg_args(config_group='prodigal')
prg_args.insert(0, EXECUTABLE)
LOGGER.info('Program arguments: %s', prg_args)
exit_code = run_apptainer_container(container, prg_args)
self.succeeded(msg='Successfully ran prodigal!')

@command
def do_margie(self, mode='dev'):
'''run margie workflow'''

LOGGER.info('Config:\n%s', self.conf.show())

# ----------------------- Step 0 - Get the WorkflowKey ----------------------- #
if not (selected_wf := workflow_keys.get('margie')):
return 1

# -------------- Step 1 - Get the input file (eventually files) -------------- #
input_file = self.conf.get('input')
if not input_file:
LOGGER.error('No input file specified. Use: dane_wf example input: <file>')
LOGGER.error('No input file specified. Use: dane_wf margie input: <file>')
self.failed('No input file specified')
return 1

# ------- Step 2 - Get the appropriate output file from the input file ------- #
# Derive output filename from input (e.g., file.fasta -> file-output.txt)
# Basically we need a way to trace input to final output and use the stem as SAMPLE
input_path = Path(input_file)
out_prodigal = f"prodigal/{input_path.stem}-prodigal.tkn"
out_prodigal_faa = f"prodigal/{input_path.stem}-prodigal.faa"
out_prodigal_db = f"prodigal/{input_path.stem}-prodigal_db.tkn"
out_pfam = f"pfam/{input_path.stem}-pfam.tkn"
out_pfam_db = f"pfam/{input_path.stem}-pfam_db.tkn"
stem = Path(input_file).stem
prodigal_config = self.conf.get('prodigal')
margie_db = self.conf.get('margie_db', '/depot/lindems/data/margie/margie.db')

# Output paths
out_prodigal = f"prodigal/{stem}-prodigal.tkn"
out_prodigal_faa = f"prodigal/{stem}-prodigal.faa"
out_prodigal_db = f"prodigal/{stem}-prodigal_db.tkn"
out_pfam = f"pfam/{stem}-pfam.tkn"
out_pfam_db = f"pfam/{stem}-pfam_db.tkn"
out_cog_classify = "cog/cog_classify.tsv"
out_cog_count = "cog/cog_count.tsv"
out_cog_db = f"cog/{input_path.stem}-cog_db.tkn"
out_dbcan = f"{input_path.stem}-dbcan.tkn" # Not being used not
out_kofam = f"{input_path.stem}-kofam.tkn" # Not being used not
LOGGER.info('Input file: %s', input_file)
LOGGER.info('out_prodigal file: %s', out_prodigal)
LOGGER.info('out_dbcan file: %s', out_dbcan)
out_cog_db = f"cog/{stem}-cog_db.tkn"

# --- Step 3 - get program-specific params and send to snakemake as config --- #
prodigal_config = self.conf.get('prodigal')
threads = prodigal_config.get('threads')
margie_db = self.conf.get('margie_db', '/depot/lindems/data/margie/margie.db')
#TODO: way to automatically get all config from prodigal or do control this here?
# TODO: These can probably be default values specified in margie.smk
smk_config = {
'input_fasta': input_file,
'out_prodigal': out_prodigal,
Expand All @@ -273,24 +192,12 @@ def do_margie(self, mode='dev'):
'out_cog_classify': out_cog_classify,
'out_cog_count': out_cog_count,
'out_cog_db': out_cog_db,
'out_dbcan': out_dbcan,
'out_kofam': out_kofam,
'prodigal_threads': threads,
'out_dbcan': f"{stem}-dbcan.tkn",
'out_kofam': f"{stem}-kofam.tkn",
'prodigal_threads': prodigal_config.get('threads'),
'margie_db': margie_db,
}

# ------: Step 3.5 - Download / ensure .sif file is downloaded -------- #
# ~/.cache/bioinformatics-tools/prodigal.sif --> multiple for some snakemake pipelines
try:
cache_sif_files(selected_wf.sif_files)
except CacheSifError as e:
LOGGER.critical('Error with cache_sif_files: %s', e)
self.failed(f'Error with cache_sif_files: {e}')

# ----------- Step 3.6 - Restore cached outputs from the DB ----------- #
# This is super important and revives the working directory with cache
# so snakemake can use its default DAG logic from the database. e.g. - places previously generated output
# files into the working directory so snakemake knows which rules to skip
cache_map = {
'prodigal': [out_prodigal, out_prodigal_faa],
'prodigal_db': [out_prodigal_db],
Expand All @@ -299,21 +206,5 @@ def do_margie(self, mode='dev'):
'cog': [out_cog_classify, out_cog_count],
'cog_db': [out_cog_db],
}
restored = restore_all(margie_db, input_file, cache_map)
LOGGER.info('Cache restore results: %s', restored)

# ----------------------- Step 4 - build the executable ---------------------- #
wf_command = self.build_executable(selected_wf, config_dict=smk_config, mode=mode)

LOGGER.info('Running snakemake command: %s', wf_command)
str_smk = ' '.join(wf_command)
LOGGER.info('\n=== SNAKEMAKE COMMAND ===\n%s\n========================\n', str_smk)

# ------ Step 5 Execute the actual workflow (happens within our UV env) ------ #
self._run_workflow(wf_command)

# ---- Step 5.5 - Store new outputs into DB cache ---- #
store_all(margie_db, input_file, cache_map)

# ------------------- Step 6 - Provide output status report ------------------ #
self.succeeded(msg="All good in the neighborhood (AppleBees TM)")
self._run_pipeline('margie', smk_config, cache_map, mode=mode)
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ dependencies = [
"uvicorn"
]

[project.optional-dependencies]
test = [
"pytest>=7.0",
"httpx>=0.23.0",
]

[project.urls]
Homepage = "https://github.com/Diet-Microbiome-Interactions-Lab/GeneralTools"

Expand Down
7 changes: 7 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def pytest_configure(config):
config.addinivalue_line(
"markers", "cli: marks tests that test CLI functionality"
)
config.addinivalue_line(
"markers", "api: marks tests that test API endpoints"
)


# Pytest collection configuration
Expand All @@ -87,6 +90,10 @@ def pytest_collection_modifyitems(config, items):
# Mark CLI tests
if "test_cli" in item.nodeid or "cli" in item.name.lower():
item.add_marker(pytest.mark.cli)

# Mark API tests
if "test_api" in item.nodeid:
item.add_marker(pytest.mark.api)

# Mark integration tests (tests that use subprocess or external files)
if any(keyword in item.nodeid for keyword in ["subprocess", "real_test", "example_"]):
Expand Down
Loading