Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dash/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,24 @@ services:
msql-rabbitmq:
container_name: msql-rabbitmq
image: rabbitmq
restart: unless-stopped
networks:
- default
deploy:
resources:
limits:
memory: 8000M

msql-redis:
container_name: msql-redis
image: redis
restart: unless-stopped
networks:
- default
deploy:
resources:
limits:
memory: 8000M

networks:
nginx-net:
Expand Down
2 changes: 1 addition & 1 deletion dash/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dash_daq
Flask-Caching
lark-parser
pyarrow
ray==1.3.0
ray==1.6.0
celery==5.0.5
click==7.1.1
py_expression_eval
Expand Down
37 changes: 36 additions & 1 deletion dash/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import glob
import sys
import os
from tempfile import NamedTemporaryFile
import subprocess
import pandas as pd
import requests
import requests_cache
requests_cache.install_cache('temp/demo_cache')
Expand All @@ -21,7 +24,16 @@ def task_computeheartbeat():

@celery_instance.task(time_limit=120)
def task_executequery(query, filename):

# Doing the query via API
return _query_api(query, filename)

if "X" in query:
return _query_cmd(query, filename)
else:
# Doing the query via API
return _query_api(query, filename)

def _query_api(query, filename):
parse_results = msql_parser.parse_msql(query)
results_df = msql_engine.process_query(query, filename, parallel=False)

Expand All @@ -34,6 +46,29 @@ def task_executequery(query, filename):

return all_results

def _query_cmd(query, filename):
# we want to query via commandline so that we can use the parallel features

f = NamedTemporaryFile(delete=False, suffix='.tsv')
temp_filename = f.name

cmd_list = ["python", "./massql/msql_cmd.py", filename, query, "--output_file", temp_filename, "--parallel_query", "YES"]
subprocess.run(cmd_list)

results_df = pd.read_csv(temp_filename, sep="\t")

os.unlink(f.name)

all_results = results_df.to_dict(orient="records")

try:
all_results = _enrich_results(all_results)
except:
pass

return all_results



def _get_gnps_spectruminfo(spectrumid):
url = "https://gnps-external.ucsd.edu/gnpsspectrum?SpectrumID={}".format(spectrumid)
Expand Down
12 changes: 8 additions & 4 deletions massql/msql_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ def DEBUG_MSG(msg):

print(msg, file=sys.stderr, flush=True)

def init_ray():
def init_ray(temp_dir=None):
import ray
if not ray.is_initialized():
ray.init(ignore_reinit_error=True, object_store_memory=8000000000, num_cpus=8)
if temp_dir is None:
ray.init(ignore_reinit_error=True, object_store_memory=500000000, num_cpus=8)
else:
ray.init(ignore_reinit_error=True, object_store_memory=500000000, num_cpus=8, _temp_dir=temp_dir)



def _get_ppm_tolerance(qualifiers):
Expand Down Expand Up @@ -329,11 +333,11 @@ def _evalute_variable_query(parsed_dict, input_filename, cache=True, parallel=Fa
execute_serial = True
if parallel:
import ray
import msql_engine_ray
from massql import msql_engine_ray

if ray.is_initialized():
# TODO: Divide up the parallel thing
chunk_size = 100
chunk_size = 1000
concrete_query_lists = [all_concrete_queries[i:i + chunk_size] for i in range(0, len(all_concrete_queries), chunk_size)]
futures = [msql_engine_ray._executeconditions_query_ray.remote(concrete_query_list, input_filename, ms1_input_df=ms1_df, ms2_input_df=ms2_df, cache=cache) for concrete_query_list in concrete_query_lists]
all_ray_results = ray.get(futures)
Expand Down
11 changes: 9 additions & 2 deletions massql/msql_engine_ray.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from massql.msql_engine import _executeconditions_query, _executecollate_query
import ray
from tqdm import tqdm

@ray.remote
def _executeconditions_query_ray(parsed_dict_list, input_filename, ms1_input_df=None, ms2_input_df=None, cache=True):
Expand All @@ -19,10 +20,16 @@ def _executeconditions_query_ray(parsed_dict_list, input_filename, ms1_input_df=

collated_list = []

for parsed_dict in parsed_dict_list:
ms1_df, ms2_df = _executeconditions_query(parsed_dict, input_filename, ms1_input_df=ms1_input_df, ms2_input_df=ms2_input_df, cache=cache)
# Copying DataFrames
ms1_input_copy_df = ms1_input_df.copy()
ms2_input_copy_df = ms2_input_df.copy()

for parsed_dict in tqdm(parsed_dict_list):
ms1_df, ms2_df = _executeconditions_query(parsed_dict, input_filename, ms1_input_df=ms1_input_copy_df, ms2_input_df=ms2_input_copy_df, cache=cache)

collated_df = _executecollate_query(parsed_dict, ms1_df, ms2_df)
collated_list.append(collated_df)

print("FINISH RAY QUERY")

return collated_list
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pymzml
lark-parser
pandas
pyarrow
ray
ray==1.6.0
tqdm
py_expression_eval
matchms
Expand Down
30 changes: 19 additions & 11 deletions tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,25 +204,35 @@ def test_ms1_iron():
assert(1223 in list(results_df["scan"]))
assert(len(results_df) == 15)

@pytest.mark.skip(reason="parallel not really supported anymore")
@pytest.mark.skip(reason="Parallel testing not automated")
def test_ms1_iron_parallel():
msql_engine.init_ray()
msql_engine.init_ray(temp_dir=os.path.abspath("/media/temp_ray"))

# query = "QUERY scaninfo(MS1DATA) \
# WHERE \
# RTMIN=3.03 \
# AND RTMAX=3.05 \
# AND MS1MZ=X-2:INTENSITYMATCH=Y*0.063:INTENSITYMATCHPERCENT=25 \
# AND MS1MZ=X:INTENSITYMATCH=Y:INTENSITYMATCHREFERENCE \
# FILTER \
# MS1MZ=X"

query = "QUERY scaninfo(MS1DATA) \
WHERE \
RTMIN=3.03 \
AND RTMAX=3.05 \
RTMIN=3 \
AND RTMAX=5 \
AND MS1MZ=X-2:INTENSITYMATCH=Y*0.063:INTENSITYMATCHPERCENT=25 \
AND MS1MZ=X:INTENSITYMATCH=Y:INTENSITYMATCHREFERENCE \
FILTER \
MS1MZ=X"

parse_obj = msql_parser.parse_msql(query)
print(parse_obj)
print(json.dumps(parse_obj, indent=4))
results_df = msql_engine.process_query(query, "tests/data/JB_182_2_fe.mzML")
results_df = msql_engine.process_query(query, "tests/data/JB_182_2_fe.mzML", parallel=False)
print(results_df)
assert(1223 in list(results_df["scan"]))
assert(len(results_df) == 15)
assert(len(results_df) == 2569)

def test_ms1_iron_X_changes_intensity():
query = "QUERY scaninfo(MS2DATA) WHERE \
Expand Down Expand Up @@ -565,9 +575,7 @@ def test_ms2_mobility_variable2():



def main():
#msql_engine.init_ray()

def main():
#test_noquery()
#test_simple_ms2_twoqualifier()
#test_simple_ms2_twoconditions()
Expand All @@ -588,7 +596,7 @@ def main():
#test_min_intensity()
#test_min_intensitypercent()
#test_ms1_iron()
#test_ms1_iron_parallel()
test_ms1_iron_parallel()
#test_polarity()
#test_scan_range()
#test_charge_filter()
Expand Down Expand Up @@ -628,7 +636,7 @@ def main():
#test_quad_brominated()
#test_ms2_mobility()
#test_ms2_mobility_variable()
test_ms2_mobility_variable2()
#test_ms2_mobility_variable2()

if __name__ == "__main__":
main()
Expand Down