From 7a668da82a93096b7b314946ee6274196972ed6c Mon Sep 17 00:00:00 2001 From: Mingxun Wang Date: Thu, 2 Sep 2021 09:49:20 -0700 Subject: [PATCH 1/2] adding refactor to try to scale up --- dash/requirements.txt | 2 +- dash/run_worker.sh | 2 +- dash/tasks.py | 34 +++++++++++++++++++++++++++++++++- massql/msql_engine.py | 12 ++++++++---- massql/msql_engine_ray.py | 11 +++++++++-- requirements.txt | 2 +- tests/test_query.py | 30 +++++++++++++++++++----------- 7 files changed, 72 insertions(+), 21 deletions(-) diff --git a/dash/requirements.txt b/dash/requirements.txt index 1a093e0..b87a813 100644 --- a/dash/requirements.txt +++ b/dash/requirements.txt @@ -41,7 +41,7 @@ dash_daq Flask-Caching lark-parser pyarrow -ray==1.3.0 +ray==1.6.0 celery==5.0.5 click==7.1.1 py_expression_eval diff --git a/dash/run_worker.sh b/dash/run_worker.sh index bcfe8cd..49db696 100755 --- a/dash/run_worker.sh +++ b/dash/run_worker.sh @@ -1,3 +1,3 @@ #!/bin/bash -celery -A tasks worker -l info -c 4 -Q worker --max-tasks-per-child 10 --loglevel INFO +celery -A tasks worker -l info -c 1 -Q worker --max-tasks-per-child 10 --loglevel INFO diff --git a/dash/tasks.py b/dash/tasks.py index 9463dfb..db01587 100644 --- a/dash/tasks.py +++ b/dash/tasks.py @@ -2,6 +2,9 @@ import glob import sys import os +from tempfile import NamedTemporaryFile +import subprocess +import pandas as pd import requests import requests_cache requests_cache.install_cache('temp/demo_cache') @@ -21,7 +24,13 @@ def task_computeheartbeat(): @celery_instance.task(time_limit=120) def task_executequery(query, filename): - + if "X" in query: + return _query_cmd(query, filename) + else: + # Doing the query via API + return _query_api(query, filename) + +def _query_api(query, filename): parse_results = msql_parser.parse_msql(query) results_df = msql_engine.process_query(query, filename, parallel=False) @@ -34,6 +43,29 @@ def task_executequery(query, filename): return all_results +def _query_cmd(query, filename): + # we want to query via commandline so that we can use the parallel features + + f = NamedTemporaryFile(delete=False, suffix='.tsv') + temp_filename = f.name + + cmd_list = ["python", "./massql/msql_cmd.py", filename, query, "--output_file", temp_filename, "--parallel_query", "YES"] + subprocess.run(cmd_list) + + results_df = pd.read_csv(temp_filename, sep="\t") + + os.unlink(f.name) + + all_results = results_df.to_dict(orient="records") + + try: + all_results = _enrich_results(all_results) + except: + pass + + return all_results + + def _get_gnps_spectruminfo(spectrumid): url = "https://gnps-external.ucsd.edu/gnpsspectrum?SpectrumID={}".format(spectrumid) diff --git a/massql/msql_engine.py b/massql/msql_engine.py index e775124..0d4d6b3 100644 --- a/massql/msql_engine.py +++ b/massql/msql_engine.py @@ -21,10 +21,14 @@ def DEBUG_MSG(msg): print(msg, file=sys.stderr, flush=True) -def init_ray(): +def init_ray(temp_dir=None): import ray if not ray.is_initialized(): - ray.init(ignore_reinit_error=True, object_store_memory=8000000000, num_cpus=8) + if temp_dir is None: + ray.init(ignore_reinit_error=True, object_store_memory=500000000, num_cpus=8) + else: + ray.init(ignore_reinit_error=True, object_store_memory=500000000, num_cpus=8, _temp_dir=temp_dir) + def _get_ppm_tolerance(qualifiers): @@ -329,11 +333,11 @@ def _evalute_variable_query(parsed_dict, input_filename, cache=True, parallel=Fa execute_serial = True if parallel: import ray - import msql_engine_ray + from massql import msql_engine_ray if ray.is_initialized(): # TODO: Divide up the parallel thing - chunk_size = 100 + chunk_size = 1000 concrete_query_lists = [all_concrete_queries[i:i + chunk_size] for i in range(0, len(all_concrete_queries), chunk_size)] futures = [msql_engine_ray._executeconditions_query_ray.remote(concrete_query_list, input_filename, ms1_input_df=ms1_df, ms2_input_df=ms2_df, cache=cache) for concrete_query_list in concrete_query_lists] all_ray_results = ray.get(futures) diff --git a/massql/msql_engine_ray.py b/massql/msql_engine_ray.py index 3404f3b..a572bfd 100644 --- a/massql/msql_engine_ray.py +++ b/massql/msql_engine_ray.py @@ -1,5 +1,6 @@ from massql.msql_engine import _executeconditions_query, _executecollate_query import ray +from tqdm import tqdm @ray.remote def _executeconditions_query_ray(parsed_dict_list, input_filename, ms1_input_df=None, ms2_input_df=None, cache=True): @@ -19,10 +20,16 @@ def _executeconditions_query_ray(parsed_dict_list, input_filename, ms1_input_df= collated_list = [] - for parsed_dict in parsed_dict_list: - ms1_df, ms2_df = _executeconditions_query(parsed_dict, input_filename, ms1_input_df=ms1_input_df, ms2_input_df=ms2_input_df, cache=cache) + # Copying DataFrames + ms1_input_copy_df = ms1_input_df.copy() + ms2_input_copy_df = ms2_input_df.copy() + + for parsed_dict in tqdm(parsed_dict_list): + ms1_df, ms2_df = _executeconditions_query(parsed_dict, input_filename, ms1_input_df=ms1_input_copy_df, ms2_input_df=ms2_input_copy_df, cache=cache) collated_df = _executecollate_query(parsed_dict, ms1_df, ms2_df) collated_list.append(collated_df) + print("FINISH RAY QUERY") + return collated_list \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5fe7d29..415c395 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ pymzml lark-parser pandas pyarrow -ray +ray==1.6.0 tqdm py_expression_eval matchms diff --git a/tests/test_query.py b/tests/test_query.py index 11d8d23..bedb690 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -204,25 +204,35 @@ def test_ms1_iron(): assert(1223 in list(results_df["scan"])) assert(len(results_df) == 15) -@pytest.mark.skip(reason="parallel not really supported anymore") +@pytest.mark.skip(reason="Parallel testing not automated") def test_ms1_iron_parallel(): - msql_engine.init_ray() + msql_engine.init_ray(temp_dir=os.path.abspath("/media/temp_ray")) + + # query = "QUERY scaninfo(MS1DATA) \ + # WHERE \ + # RTMIN=3.03 \ + # AND RTMAX=3.05 \ + # AND MS1MZ=X-2:INTENSITYMATCH=Y*0.063:INTENSITYMATCHPERCENT=25 \ + # AND MS1MZ=X:INTENSITYMATCH=Y:INTENSITYMATCHREFERENCE \ + # FILTER \ + # MS1MZ=X" query = "QUERY scaninfo(MS1DATA) \ WHERE \ - RTMIN=3.03 \ - AND RTMAX=3.05 \ + RTMIN=3 \ + AND RTMAX=5 \ AND MS1MZ=X-2:INTENSITYMATCH=Y*0.063:INTENSITYMATCHPERCENT=25 \ AND MS1MZ=X:INTENSITYMATCH=Y:INTENSITYMATCHREFERENCE \ FILTER \ MS1MZ=X" + parse_obj = msql_parser.parse_msql(query) print(parse_obj) print(json.dumps(parse_obj, indent=4)) - results_df = msql_engine.process_query(query, "tests/data/JB_182_2_fe.mzML") + results_df = msql_engine.process_query(query, "tests/data/JB_182_2_fe.mzML", parallel=False) print(results_df) assert(1223 in list(results_df["scan"])) - assert(len(results_df) == 15) + assert(len(results_df) == 2569) def test_ms1_iron_X_changes_intensity(): query = "QUERY scaninfo(MS2DATA) WHERE \ @@ -565,9 +575,7 @@ def test_ms2_mobility_variable2(): -def main(): - #msql_engine.init_ray() - +def main(): #test_noquery() #test_simple_ms2_twoqualifier() #test_simple_ms2_twoconditions() @@ -588,7 +596,7 @@ def main(): #test_min_intensity() #test_min_intensitypercent() #test_ms1_iron() - #test_ms1_iron_parallel() + test_ms1_iron_parallel() #test_polarity() #test_scan_range() #test_charge_filter() @@ -628,7 +636,7 @@ def main(): #test_quad_brominated() #test_ms2_mobility() #test_ms2_mobility_variable() - test_ms2_mobility_variable2() + #test_ms2_mobility_variable2() if __name__ == "__main__": main() From 8c1d19f9a0d5d900ebf312564fe7a6b250a60aff Mon Sep 17 00:00:00 2001 From: Mingxun Wang Date: Thu, 2 Sep 2021 09:54:04 -0700 Subject: [PATCH 2/2] updating --- dash/docker-compose.yml | 10 ++++++++++ dash/run_worker.sh | 2 +- dash/tasks.py | 3 +++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/dash/docker-compose.yml b/dash/docker-compose.yml index 3295dd7..2d409b1 100644 --- a/dash/docker-compose.yml +++ b/dash/docker-compose.yml @@ -53,14 +53,24 @@ services: msql-rabbitmq: container_name: msql-rabbitmq image: rabbitmq + restart: unless-stopped networks: - default + deploy: + resources: + limits: + memory: 8000M msql-redis: container_name: msql-redis image: redis + restart: unless-stopped networks: - default + deploy: + resources: + limits: + memory: 8000M networks: nginx-net: diff --git a/dash/run_worker.sh b/dash/run_worker.sh index 49db696..bcfe8cd 100755 --- a/dash/run_worker.sh +++ b/dash/run_worker.sh @@ -1,3 +1,3 @@ #!/bin/bash -celery -A tasks worker -l info -c 1 -Q worker --max-tasks-per-child 10 --loglevel INFO +celery -A tasks worker -l info -c 4 -Q worker --max-tasks-per-child 10 --loglevel INFO diff --git a/dash/tasks.py b/dash/tasks.py index db01587..9485c43 100644 --- a/dash/tasks.py +++ b/dash/tasks.py @@ -24,6 +24,9 @@ def task_computeheartbeat(): @celery_instance.task(time_limit=120) def task_executequery(query, filename): + # Doing the query via API + return _query_api(query, filename) + if "X" in query: return _query_cmd(query, filename) else: