diff --git a/LIMS2DB/classes.py b/LIMS2DB/classes.py
index eff2c50..2b27d1c 100644
--- a/LIMS2DB/classes.py
+++ b/LIMS2DB/classes.py
@@ -1,5 +1,4 @@
import copy
-import http.client as http_client
import re
from datetime import datetime
@@ -14,6 +13,7 @@
ReagentType,
Researcher,
)
+from ibm_cloud_sdk_core.api_exception import ApiException
from requests import get as rget
from sqlalchemy import text
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
@@ -23,195 +23,6 @@
from LIMS2DB.utils import send_mail
-class Workset:
- def __init__(self, lims, crawler, log):
- self.log = log
- self.name = set()
- self.lims = lims
- self.obj = {}
- # get the identifier
- outs = crawler.starting_proc.all_outputs()
- for out in outs:
- if out.type == "Analyte" and len(out.samples) == 1:
- try:
- self.name.add(out.location[0].name)
- except:
- self.log.warn(f"no name found for workset {out.id}")
-
- try:
- self.obj["name"] = self.name.pop()
- except:
- self.log.error(f"No name found for current workset {crawler.starting_proc.id}, might be an ongoing step.")
- raise NameError
- self.obj["technician"] = crawler.starting_proc.technician.initials
- self.obj["id"] = crawler.starting_proc.id
- self.obj["date_run"] = crawler.starting_proc.date_run
- # only get the latest aggregate qc date
- latest_date = 0
- for agr in crawler.libaggre:
- if agr.date_run > latest_date:
- latest_date = agr.date_run
- if not latest_date:
- latest_date = None
- self.obj["last_aggregate"] = latest_date
- pjs = {}
- for p in crawler.projects:
- pjs[p.id] = {}
- pjs[p.id]["name"] = p.name
- try:
- pjs[p.id]["library"] = p.udf["Library construction method"]
- except KeyError:
- pjs[p.id]["library"] = None
- try:
- pjs[p.id]["application"] = p.udf["Application"]
- except KeyError:
- pjs[p.id]["application"] = None
- try:
- pjs[p.id]["sequencing_setup"] = f"{p.udf['Sequencing platform']} {p.udf['Sequencing setup']}"
- except KeyError:
- pjs[p.id]["sequencing_setup"] = None
-
- pjs[p.id]["samples"] = {}
- for sample in crawler.samples:
- if sample.project == p:
- pjs[p.id]["samples"][sample.name] = {}
- pjs[p.id]["samples"][sample.name]["library"] = {}
- pjs[p.id]["samples"][sample.name]["sequencing"] = {}
- try:
- pjs[p.id]["samples"][sample.name]["customer_name"] = sample.udf["Customer Name"]
- except KeyError:
- pjs[p.id]["samples"][sample.name]["customer_name"] = None
-
- pjs[p.id]["samples"][sample.name]["rec_ctrl"] = {}
- for i in crawler.starting_proc.all_inputs():
- if sample in i.samples:
- pjs[p.id]["samples"][sample.name]["rec_ctrl"]["status"] = i.qc_flag
-
- for output in crawler.starting_proc.all_outputs():
- if output.type == "Analyte" and sample in output.samples:
- pjs[p.id]["samples"][sample.name]["location"] = output.location[1]
-
- for lib in sorted(crawler.libaggre, key=lambda l: l.date_run, reverse=True):
- for inp in lib.all_inputs():
- if sample in inp.samples:
- onelib = {}
- onelib["status"] = inp.qc_flag
- onelib["art"] = inp.id
- onelib["date"] = lib.date_run
- onelib["name"] = lib.protocol_name
- onelib["id"] = lib.id
- if "Concentration" in inp.udf and "Conc. Units" in inp.udf:
- onelib["concentration"] = "{} {}".format(
- round(inp.udf["Concentration"], 2),
- inp.udf["Conc. Units"],
- )
- if "Molar Conc. (nM)" in inp.udf:
- onelib["concentration"] = f"{round(inp.udf['Molar Conc. (nM)'], 2)} nM"
- if "Size (bp)" in inp.udf:
- onelib["size"] = round(inp.udf["Size (bp)"], 2)
- if "NeoPrep Machine QC" in inp.udf and onelib["status"] == "UNKNOWN":
- onelib["status"] = inp.udf["NeoPrep Machine QC"]
-
- pjs[p.id]["samples"][sample.name]["library"][lib.id] = onelib
- if "library_status" not in pjs[p.id]["samples"][sample.name]:
- pjs[p.id]["samples"][sample.name]["library_status"] = inp.qc_flag
-
- for seq in sorted(crawler.seq, key=lambda s: s.date_run, reverse=True):
- for inp in seq.all_inputs():
- if sample in inp.samples:
- pjs[p.id]["samples"][sample.name]["sequencing"][seq.id] = {}
- pjs[p.id]["samples"][sample.name]["sequencing"][seq.id]["status"] = inp.qc_flag
- pjs[p.id]["samples"][sample.name]["sequencing"][seq.id]["date"] = seq.date_run
- if "sequencing_status" not in pjs[p.id]["samples"][sample.name]:
- pjs[p.id]["samples"][sample.name]["sequencing_status"] = inp.qc_flag
-
- self.obj["projects"] = pjs
-
-
-class LimsCrawler:
- def __init__(self, lims, starting_proc=None, starting_inputs=None):
- self.lims = lims
- self.starting_proc = starting_proc
- self.samples = set()
- self.projects = set()
- self.finlibinitqc = set()
- self.initqc = set()
- self.initaggr = set()
- self.pooling = set()
- self.preprepstart = set()
- self.prepstart = set()
- self.prepend = set()
- self.libval = set()
- self.finliblibval = set()
- self.libaggre = set()
- self.dilstart = set()
- self.seq = set()
- self.demux = set()
- self.caliper = set()
- self.projsum = set()
- self.inputs = set()
- if starting_proc:
- for i in starting_proc.all_inputs():
- if i.type == "Analyte":
- self.samples.update(i.samples)
- self.inputs.add(i)
- if starting_inputs:
- for i in starting_inputs:
- if i.type == "Analyte":
- self.samples.update(i.samples)
- self.inputs.add(i)
- for sample in self.samples:
- if sample.project:
- self.projects.add(sample.project)
-
- def crawl(self, starting_step=None):
- nextsteps = set()
- if not starting_step:
- if not self.starting_proc:
- for i in self.inputs:
- if i.type == "Analyte" and (self.samples.intersection(i.samples)):
- nextsteps.update(self.lims.get_processes(inputartifactlimsid=i.id))
- else:
- starting_step = self.starting_proc
- if starting_step:
- for o in starting_step.all_outputs():
- if o.type == "Analyte" and (self.samples.intersection(o.samples)):
- nextsteps.update(self.lims.get_processes(inputartifactlimsid=o.id))
- for step in nextsteps:
- if step.type.name in list(pc_cg.PREPREPSTART.values()):
- self.preprepstart.add(step)
- elif step.type.name in list(pc_cg.PREPSTART.values()):
- self.prepstart.add(step)
- elif step.type.name in list(pc_cg.PREPEND.values()):
- self.prepend.add(step)
- elif step.type.name in list(pc_cg.LIBVAL.values()):
- self.libval.add(step)
- elif step.type.name in list(pc_cg.AGRLIBVAL.values()):
- self.libaggre.add(step)
- elif step.type.name in list(pc_cg.SEQUENCING.values()):
- self.seq.add(step)
- elif step.type.name in list(pc_cg.DEMULTIPLEX.values()):
- self.demux.add(step)
- elif step.type.name in list(pc_cg.INITALQCFINISHEDLIB.values()):
- self.finlibinitqc.add(step)
- elif step.type.name in list(pc_cg.INITALQC.values()):
- self.initqc.add(step)
- elif step.type.name in list(pc_cg.AGRINITQC.values()):
- self.initaggr.add(step)
- elif step.type.name in list(pc_cg.POOLING.values()):
- self.pooling.add(step)
- elif step.type.name in list(pc_cg.DILSTART.values()):
- self.dilstart.add(step)
- elif step.type.name in list(pc_cg.SUMMARY.values()):
- self.projsum.add(step)
- elif step.type.name in list(pc_cg.CALIPER.values()):
- self.caliper.add(step)
-
- # if the step has analytes as outputs
- if [x for x in step.all_outputs() if x.type == "Analyte"]:
- self.crawl(starting_step=step)
-
-
class Workset_SQL:
def __init__(self, session, log, step):
self.log = log
@@ -452,14 +263,19 @@ def save(self, update_modification_time=True):
doc = None
# When running for a single project, sometimes the connection is lost so retry
try:
- self.couch["projects"]
- except http_client.BadStatusLine:
+ self.couch.get_server_information().get_result()
+ except ApiException:
self.log.warning(f"Access to couch failed before trying to save new doc for project {self.pid}")
pass
- db = self.couch["projects"]
- view = db.view("project/project_id")
- for row in view[self.pid]:
- doc = db.get(row.id)
+ result = self.couch.post_view(
+ db="projects",
+ ddoc="project",
+ view="project_id",
+ key=self.pid,
+ include_docs=True,
+ ).get_result()["rows"]
+ if result:
+ doc = result[0]["doc"]
if doc:
fields_saved = [
"_id",
@@ -502,7 +318,11 @@ def save(self, update_modification_time=True):
self.obj["order_details"] = doc["order_details"]
self.log.info(f"Trying to save new doc for project {self.pid}")
- db.save(self.obj)
+ self.couch.put_document(
+ db="projects",
+ doc_id=self.obj["_id"],
+ document=self.obj,
+ ).get_result()
if self.obj.get("details", {}).get("type", "") == "Application":
lib_method_text = f"Library method: {self.obj['details'].get('library_construction_method', 'N/A')}"
application = self.obj.get("details", {}).get("application", "")
@@ -514,12 +334,12 @@ def save(self, update_modification_time=True):
if diffs["key details contract_received"][1] == "missing":
old_contract_received = diffs["key details contract_received"][0]
msg = f"Contract received on {old_contract_received} deleted for applications project "
- msg += f'{self.obj["project_id"]}, {self.obj["project_name"]}[{lib_method_text}]\
+ msg += f'{self.obj["project_id"]}, {self.obj["project_name"]} [{lib_method_text}]\
{single_cell_text if is_single_cell else ""}.'
else:
contract_received = diffs["key details contract_received"][1]
msg = "Contract received for applications project "
- msg += f'{self.obj["project_id"]}, {self.obj["project_name"]}[{lib_method_text}]\
+ msg += f'{self.obj["project_id"]}, {self.obj["project_name"]} [{lib_method_text}]\
{single_cell_text if is_single_cell else ""} on {contract_received}.'
if is_single_cell:
@@ -537,7 +357,10 @@ def save(self, update_modification_time=True):
self.obj["creation_time"] = datetime.now().isoformat()
self.obj["modification_time"] = self.obj["creation_time"]
self.log.info(f"Trying to save new doc for project {self.pid}")
- db.save(self.obj)
+ self.couch.post_document(
+ db="projects",
+ document=self.obj,
+ ).get_result()
if self.obj.get("details", {}).get("type", "") == "Application":
genstat_url = f"{self.genstat_proj_url}{self.obj['project_id']}"
lib_method_text = f"Library method: {self.obj['details'].get('library_construction_method', 'N/A')}"
diff --git a/LIMS2DB/diff.py b/LIMS2DB/diff.py
index 2c29fc6..9735c71 100644
--- a/LIMS2DB/diff.py
+++ b/LIMS2DB/diff.py
@@ -1,37 +1,38 @@
-import http.client as http_client
-
from genologics_sql.utils import get_configuration, get_session
+from ibm_cloud_sdk_core.api_exception import ApiException
from LIMS2DB.utils import setupLog
-def diff_project_objects(pj_id, couch, proj_db, logfile, oconf):
+def diff_project_objects(pj_id, couch, logfile, oconf):
# Import is put here to defer circular imports
from LIMS2DB.classes import ProjectSQL
log = setupLog(f"diff - {pj_id}", logfile)
- view = proj_db.view("projects/lims_followed")
-
def fetch_project(pj_id):
- try:
- old_project_couchid = view[pj_id].rows[0].value
- except (KeyError, IndexError):
- log.error(f"No such project {pj_id}")
+ result = couch.post_view(
+ db="projects",
+ ddoc="projects",
+ view="lims_followed",
+ key=pj_id,
+ include_docs=True,
+ ).get_result()["rows"]
+ if not result:
+ log.error(f"No project found in couch for {pj_id}")
return None
- return old_project_couchid
+ return result[0]["doc"]
try:
- old_project_couchid = fetch_project(pj_id)
- except http_client.BadStatusLine:
- log.error("BadStatusLine received after large project")
+ old_project = fetch_project(pj_id)
+ except ApiException:
+ log.error("Connection issues after large project")
# Retry
- old_project_couchid = fetch_project(pj_id)
+ old_project = fetch_project(pj_id)
- if old_project_couchid is None:
+ if old_project is None:
return None
- old_project = proj_db.get(old_project_couchid)
old_project.pop("_id", None)
old_project.pop("_rev", None)
old_project.pop("modification_time", None)
diff --git a/LIMS2DB/flowcell_sql.py b/LIMS2DB/flowcell_sql.py
index 9d4c7b9..fe7c5eb 100644
--- a/LIMS2DB/flowcell_sql.py
+++ b/LIMS2DB/flowcell_sql.py
@@ -76,11 +76,10 @@ def upload_to_couch(couch, runid, lims_data, pro):
elif pc_cg.SEQUENCING.get(str(pro.typeid), "") in ["AVITI Run v1.0"]:
dbname = "element_runs"
- db = couch[dbname]
- view = db.view("info/id")
doc = None
- for row in view[runid]:
- doc = db.get(row.value)
+ result = couch.post_view(db=dbname, ddoc="info", view="id", key=runid, include_docs=True).get_result()["rows"]
+ if result:
+ doc = result[0]["doc"]
if doc:
running_notes = {}
@@ -89,4 +88,7 @@ def upload_to_couch(couch, runid, lims_data, pro):
doc["lims_data"] = lims_data
if running_notes:
doc["lims_data"]["container_running_notes"] = running_notes
- db.save(doc)
+ couch.post_document(
+ db=dbname,
+ document=doc,
+ ).get_result()
diff --git a/LIMS2DB/parallel.py b/LIMS2DB/parallel.py
index 3c6977d..19edd99 100644
--- a/LIMS2DB/parallel.py
+++ b/LIMS2DB/parallel.py
@@ -1,128 +1,15 @@
import logging
-import logging.handlers
import multiprocessing as mp
import queue as Queue
import genologics_sql.tables as gt
-import statusdb.db as sdb
import yaml
-from genologics.config import BASEURI, PASSWORD, USERNAME
-from genologics.entities import Process
-from genologics.lims import Lims
from genologics_sql.utils import get_session
import LIMS2DB.classes as lclasses
import LIMS2DB.utils as lutils
-def processWSUL(options, queue, logqueue):
- mycouch = sdb.Couch()
- mycouch.set_db("worksets")
- mycouch.connect()
- view = mycouch.db.view("worksets/name")
- mylims = Lims(BASEURI, USERNAME, PASSWORD)
- work = True
- procName = mp.current_process().name
- proclog = logging.getLogger(procName)
- proclog.setLevel(level=logging.INFO)
- mfh = QueueHandler(logqueue)
- mft = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
- mfh.setFormatter(mft)
- proclog.addHandler(mfh)
-
- while work:
- # grabs project from queue
- try:
- ws_id = queue.get(block=True, timeout=3)
- proclog.info(f"Starting work on {ws_id}")
- except Queue.Empty:
- work = False
- proclog.info("exiting gracefully")
- break
- else:
- wsp = Process(mylims, id=ws_id)
- if not wsp.date_run:
- continue
- lc = lclasses.LimsCrawler(mylims, wsp)
- lc.crawl()
- try:
- ws = lclasses.Workset(mylims, lc, proclog)
- except NameError:
- continue
-
- # If there is already a workset with that name in the DB
- if len(view[ws.obj["name"]].rows) == 1:
- remote_doc = view[ws.obj["name"]].rows[0].value
- # remove id and rev for comparison
- doc_id = remote_doc.pop("_id")
- doc_rev = remote_doc.pop("_rev")
- if remote_doc != ws.obj:
- # if they are different, though they have the same name, upload the new one
- ws.obj = lutils.merge(ws.obj, remote_doc)
- ws.obj["_id"] = doc_id
- ws.obj["_rev"] = doc_rev
- mycouch.db[doc_id] = ws.obj
- proclog.info(f"updating {ws.obj['name']}")
- else:
- proclog.info(f"not modifying {ws.obj['name']}")
- elif len(view[ws.obj["name"]].rows) == 0:
- # it is a new doc, upload it
- mycouch.save(ws.obj)
- proclog.info(f"saving {ws.obj['name']}")
- else:
- proclog.warn(f"more than one row with name {ws.obj['name']} found")
- # signals to queue job is done
- queue.task_done()
-
-
-def masterProcess(options, wslist, mainlims, logger):
- worksetQueue = mp.JoinableQueue()
- logQueue = mp.Queue()
- childs = []
- procs_nb = 1
- # Initial step : order worksets by date:
- logger.info("ordering the workset list")
- orderedwslist = sorted(wslist, key=lambda x: x.date_run)
- logger.info("done ordering the workset list")
- if len(wslist) < options.procs:
- procs_nb = len(wslist)
- else:
- procs_nb = options.procs
-
- # spawn a pool of processes, and pass them queue instance
- for i in range(procs_nb):
- p = mp.Process(target=processWSUL, args=(options, worksetQueue, logQueue))
- p.start()
- childs.append(p)
- # populate queue with data
- # CHEATING
- if options.queue:
- worksetQueue.put(options.queue)
- orderedwslist = []
- for ws in orderedwslist:
- worksetQueue.put(ws.id)
-
- # wait on the queue until everything has been processed
- notDone = True
- while notDone:
- try:
- log = logQueue.get(False)
- logger.handle(log)
- except Queue.Empty:
- if not stillRunning(childs):
- notDone = False
- break
-
-
-def stillRunning(processList):
- ret = False
- for p in processList:
- if p.is_alive():
- ret = True
-
- return ret
-
-
def masterProcessSQL(args, wslist, logger):
worksetQueue = mp.JoinableQueue()
logQueue = mp.Queue()
@@ -149,7 +36,7 @@ def masterProcessSQL(args, wslist, logger):
log = logQueue.get(False)
logger.handle(log)
except Queue.Empty:
- if not stillRunning(childs):
+ if not lutils.stillRunning(childs):
notDone = False
break
@@ -159,12 +46,11 @@ def processWSULSQL(args, queue, logqueue):
session = get_session()
with open(args.conf) as conf_file:
conf = yaml.load(conf_file, Loader=yaml.SafeLoader)
- couch = lutils.setupServer(conf)
- db = couch["worksets"]
+ couch = lutils.load_couch_server(conf)
procName = mp.current_process().name
proclog = logging.getLogger(procName)
proclog.setLevel(level=logging.INFO)
- mfh = QueueHandler(logqueue)
+ mfh = lutils.QueueHandler(logqueue)
mft = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
mfh.setFormatter(mft)
proclog.addHandler(mfh)
@@ -182,83 +68,36 @@ def processWSULSQL(args, queue, logqueue):
step = session.query(gt.Process).filter(gt.Process.processid == int(ws_id)).one()
ws = lclasses.Workset_SQL(session, proclog, step)
doc = {}
- for row in db.view("worksets/lims_id")[ws.obj["id"]]:
- doc = db.get(row.id)
+ result = couch.post_view(
+ db="worksets",
+ ddoc="worksets",
+ view="lims_id",
+ key=ws.obj["id"],
+ include_docs=True,
+ ).get_result()["rows"]
+ if result:
+ doc = result[0]["doc"]
if doc:
final_doc = lutils.merge(ws.obj, doc)
else:
final_doc = ws.obj
# clean possible name duplicates
- for row in db.view("worksets/name")[ws.obj["name"]]:
- doc = db.get(row.id)
+ result = couch.post_view(
+ db="worksets",
+ ddoc="worksets",
+ view="name",
+ key=ws.obj["name"],
+ include_docs=True,
+ ).get_result()["rows"]
+ if result:
+ doc = result[0]["doc"]
if doc["id"] != ws.obj["id"]:
proclog.warning(f"Duplicate name {doc['name']} for worksets {doc['id']} and {final_doc['id']}")
- db.delete(doc)
- db.save(final_doc)
+ couch.delete_document(db="worksets", doc_id=doc["_id"], rev=doc["_rev"]).get_result()
+ # upload the document
+ couch.post_document(
+ db="worksets",
+ document=final_doc,
+ ).get_result()
proclog.info(f"updating {ws.obj['name']}")
queue.task_done()
-
-
-class QueueHandler(logging.Handler):
- """
- This handler sends events to a queue. Typically, it would be used together
- with a multiprocessing Queue to centralise logging to file in one process
- (in a multi-process application), so as to avoid file write contention
- between processes.
-
- This code is new in Python 3.2, but this class can be copy pasted into
- user code for use with earlier Python versions.
- """
-
- def __init__(self, queue):
- """
- Initialise an instance, using the passed queue.
- """
- logging.Handler.__init__(self)
- self.queue = queue
-
- def enqueue(self, record):
- """
- Enqueue a record.
-
- The base implementation uses put_nowait. You may want to override
- this method if you want to use blocking, timeouts or custom queue
- implementations.
- """
- self.queue.put_nowait(record)
-
- def prepare(self, record):
- """
- Prepares a record for queuing. The object returned by this method is
- enqueued.
-
- The base implementation formats the record to merge the message
- and arguments, and removes unpickleable items from the record
- in-place.
-
- You might want to override this method if you want to convert
- the record to a dict or JSON string, or send a modified copy
- of the record while leaving the original intact.
- """
- # The format operation gets traceback text into record.exc_text
- # (if there's exception data), and also puts the message into
- # record.message. We can then use this to replace the original
- # msg + args, as these might be unpickleable. We also zap the
- # exc_info attribute, as it's no longer needed and, if not None,
- # will typically not be pickleable.
- self.format(record)
- record.msg = record.message
- record.args = None
- record.exc_info = None
- return record
-
- def emit(self, record):
- """
- Emit a record.
-
- Writes the LogRecord to the queue, preparing it for pickling first.
- """
- try:
- self.enqueue(self.prepare(record))
- except Exception:
- self.handleError(record)
diff --git a/LIMS2DB/utils.py b/LIMS2DB/utils.py
index 1f3b223..c956505 100644
--- a/LIMS2DB/utils.py
+++ b/LIMS2DB/utils.py
@@ -3,7 +3,7 @@
import smtplib
from email.mime.text import MIMEText
-import couchdb
+from ibmcloudant import CouchDbSessionAuthenticator, cloudant_v1
# merges d2 in d1, keeps values from d1
@@ -42,10 +42,18 @@ def formatStack(stack):
return "\n".join(formatted_error)
-def setupServer(conf):
+def load_couch_server(conf):
+ """Loads the CouchDB server instance from the configuration.
+ :param dict conf: Configuration dictionary containing statusdb settings
+ :return: CouchDB server instance
+ """
db_conf = conf["statusdb"]
- url = f"https://{db_conf['username']}:{db_conf['password']}@{db_conf['url']}"
- return couchdb.Server(url)
+ couchdb = cloudant_v1.CloudantV1(authenticator=CouchDbSessionAuthenticator(db_conf["username"], db_conf["password"]))
+ url = db_conf["url"]
+ if not url.startswith("https://"):
+ url = f"https://{url}"
+ couchdb.set_service_url(url)
+ return couchdb
def send_mail(subject, content, receiver):
@@ -64,3 +72,77 @@ def send_mail(subject, content, receiver):
s = smtplib.SMTP("localhost")
s.sendmail("LIMS2DB", [receiver], msg.as_string())
s.quit()
+
+
+def stillRunning(processList):
+ ret = False
+ for p in processList:
+ if p.is_alive():
+ ret = True
+
+ return ret
+
+
+class QueueHandler(logging.Handler):
+ """
+ This handler sends events to a queue. Typically, it would be used together
+ with a multiprocessing Queue to centralise logging to file in one process
+ (in a multi-process application), so as to avoid file write contention
+ between processes.
+
+ This code is new in Python 3.2, but this class can be copy pasted into
+ user code for use with earlier Python versions.
+ """
+
+ def __init__(self, queue):
+ """
+ Initialise an instance, using the passed queue.
+ """
+ logging.Handler.__init__(self)
+ self.queue = queue
+
+ def enqueue(self, record):
+ """
+ Enqueue a record.
+
+ The base implementation uses put_nowait. You may want to override
+ this method if you want to use blocking, timeouts or custom queue
+ implementations.
+ """
+ self.queue.put_nowait(record)
+
+ def prepare(self, record):
+ """
+ Prepares a record for queuing. The object returned by this method is
+ enqueued.
+
+ The base implementation formats the record to merge the message
+ and arguments, and removes unpickleable items from the record
+ in-place.
+
+ You might want to override this method if you want to convert
+ the record to a dict or JSON string, or send a modified copy
+ of the record while leaving the original intact.
+ """
+ # The format operation gets traceback text into record.exc_text
+ # (if there's exception data), and also puts the message into
+ # record.message. We can then use this to replace the original
+ # msg + args, as these might be unpickleable. We also zap the
+ # exc_info attribute, as it's no longer needed and, if not None,
+ # will typically not be pickleable.
+ self.format(record)
+ record.msg = record.message
+ record.args = None
+ record.exc_info = None
+ return record
+
+ def emit(self, record):
+ """
+ Emit a record.
+
+ Writes the LogRecord to the queue, preparing it for pickling first.
+ """
+ try:
+ self.enqueue(self.prepare(record))
+ except Exception:
+ self.handleError(record)
diff --git a/README.md b/README.md
index 6f1d864..ecacfeb 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@ conda create -n lims2db_dev python=2.7
conda activate lims2db_dev
```
-LIMS2DB is highly dependent on the [statusdb](https://github.com/SciLifeLab/statusdb), [genologics](https://github.com/scilifelab/genologics), [genologics_sql](https://github.com/scilifelab/genologics_sql) packages. The two latter are available on pypi and can be installed with pip. However, it might still be a good idea to install all three of these manually to be sure you get the latest version:
+LIMS2DB is highly dependent on the [genologics](https://github.com/scilifelab/genologics) and [genologics_sql](https://github.com/scilifelab/genologics_sql) packages. The two are available on pypi and can be installed with pip. However, it might still be a good idea to install all of these manually to be sure you get the latest version:
```
git clone repo
diff --git a/VERSIONLOG.md b/VERSIONLOG.md
index a0b285a..1293b2b 100644
--- a/VERSIONLOG.md
+++ b/VERSIONLOG.md
@@ -1,5 +1,9 @@
# LIMS2DB Version Log
+## 20250815.1
+
+Replace couchdb calls with cloudant; remove statusdb package dependency
+
## 20250422.1
Skip over QC results of all pools for Library validation QC, not just TruSeq small RNA
diff --git a/docs/conf.py b/docs/conf.py
index 871891b..ddda42d 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -21,14 +21,10 @@
MOCK_MODULES = [
- "couchdb",
"genologics",
"genologics.lims",
"genologics.entities",
"genologics.lims_utils",
- "statusdb",
- "statusdb.db",
- "statusdb.db.utils",
]
for mod_name in MOCK_MODULES:
sys.modules[mod_name] = mock.Mock()
diff --git a/requirements.txt b/requirements.txt
index f90b5ec..6abc575 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,10 +1,9 @@
-couchdb
genologics @ git+https://github.com/SciLifeLab/genologics.git
genologics_sql @ git+https://github.com/NationalGenomicsInfrastructure/genologics_sql.git
+ibmcloudant
markdown
pyyaml
requests
setuptools
six
-sqlalchemy
-statusdb @ git+https://github.com/NationalGenomicsInfrastructure/statusdb.git
\ No newline at end of file
+sqlalchemy
\ No newline at end of file
diff --git a/scripts/bioinfo_project_status_update.py b/scripts/bioinfo_project_status_update.py
index 570d414..783a8cb 100644
--- a/scripts/bioinfo_project_status_update.py
+++ b/scripts/bioinfo_project_status_update.py
@@ -17,30 +17,41 @@ def main(args):
lims = Lims(BASEURI, USERNAME, PASSWORD)
with open(args.conf) as conf_file:
conf = yaml.safe_load(conf_file)
- bioinfodb = lutils.setupServer(conf)["bioinfo_analysis"]
- open_projects = bioinfodb.view("latest_data/sample_id_open")
+ couch = lutils.load_couch_server(conf)
+ open_projects = couch.post_view(
+ db="bioinfo_analysis",
+ ddoc="latest_data",
+ view="sample_id_open",
+ ).get_result()["rows"]
- for row in open_projects.rows:
- project_id = row.key[0]
- sample_id = row.key[3]
+ for row in open_projects:
+ project_id = row["key"][0]
+ sample_id = row["key"][3]
close_date = None
try:
close_date = Project(lims=lims, id=project_id).close_date
except HTTPError as e:
if "404: Project not found" in str(e):
- log.error("Project " + project_id + " not found in LIMS")
+ log.error(f"Project {project_id} not found in LIMS")
continue
if close_date is not None:
try:
- doc = bioinfodb.get(row.id)
+ doc = couch.get_document(
+ db="bioinfo_analysis",
+ doc_id=row["id"],
+ ).get_result()
except Exception as e:
- log.error(e + "in Project " + project_id + " Sample " + sample_id + " while accessing doc from statusdb")
+ log.error(f"{e} in Project {project_id} Sample {sample_id} while accessing doc from statusdb")
doc["project_closed"] = True
try:
- bioinfodb.save(doc)
- log.info("Updated Project " + project_id + " Sample " + sample_id)
+ couch.put_document(
+ db="bioinfo_analysis",
+ document=doc,
+ doc_id=row["id"],
+ ).get_result()
+ log.info(f"Updated Project {project_id} Sample {sample_id}")
except Exception as e:
- log.error(e + "in Project " + project_id + " Sample " + sample_id + " while saving to statusdb")
+ log.error(f"{e} in Project {project_id} Sample {sample_id} while saving to statusdb")
if __name__ == "__main__":
diff --git a/scripts/escalation_running_notes.py b/scripts/escalation_running_notes.py
index 8d17c5c..3a92974 100644
--- a/scripts/escalation_running_notes.py
+++ b/scripts/escalation_running_notes.py
@@ -12,18 +12,20 @@
import genologics_sql.tables as tbls
import markdown
+import yaml
from genologics_sql.utils import get_session
from sqlalchemy import text
from sqlalchemy.orm import aliased
-from statusdb.db.utils import load_couch_server
-from LIMS2DB.utils import send_mail
+from LIMS2DB.utils import load_couch_server, send_mail
def main(args):
session = get_session()
- couch = load_couch_server(args.conf)
- db = couch["running_notes"]
+ with open(args.conf) as conf_file:
+ conf = yaml.load(conf_file, Loader=yaml.SafeLoader)
+ couch = load_couch_server(conf)
+ db = "running_notes"
def get_researcher(userid):
query = "select rs.* from principals pr \
@@ -72,7 +74,7 @@ def make_esc_running_note(
def update_note_db(note):
updated = False
- note_existing = db.get(note["_id"])
+ note_existing = couch.get_document(db=db, doc_id=note["_id"]).get_result()
if "_rev" in note.keys():
del note["_rev"]
@@ -82,10 +84,10 @@ def update_note_db(note):
del dict_note["_rev"]
if not dict_note == note:
note_existing.update(note)
- db.save(note_existing)
+ couch.put_document(db=db, doc_id=note_existing["_id"], document=note_existing).get_result()
updated = True
else:
- db.save(note)
+ couch.post_document(db=db, document=note).get_result()
updated = True
return updated
diff --git a/scripts/flowcell_sql_upload.py b/scripts/flowcell_sql_upload.py
index 6d6dd9d..e907734 100644
--- a/scripts/flowcell_sql_upload.py
+++ b/scripts/flowcell_sql_upload.py
@@ -22,7 +22,7 @@
get_sequencing_steps,
upload_to_couch,
)
-from LIMS2DB.utils import setupServer
+from LIMS2DB.utils import load_couch_server
def main(args):
@@ -41,7 +41,7 @@ def main(args):
with open(args.conf) as conf_file:
conf = yaml.load(conf_file, Loader=yaml.SafeLoader)
- couch = setupServer(conf)
+ couch = load_couch_server(conf)
interval = f"{args.hours} hours"
# list the right sequencing steps
diff --git a/scripts/mail_resp.py b/scripts/mail_resp.py
index 0af1b9f..407efc0 100644
--- a/scripts/mail_resp.py
+++ b/scripts/mail_resp.py
@@ -4,9 +4,11 @@
from datetime import date, timedelta
from email.mime.text import MIMEText
+import yaml
from genologics.config import BASEURI, PASSWORD, USERNAME
from genologics.lims import Lims
-from statusdb.db.utils import load_couch_server
+
+from LIMS2DB.utils import load_couch_server
def main(args):
@@ -15,8 +17,9 @@ def main(args):
sixMonthsAgo = date.today() - timedelta(weeks=26)
yesterday = date.today() - timedelta(days=1)
pjs = lims.get_projects(open_date=sixMonthsAgo.strftime("%Y-%m-%d"))
- statusdb = load_couch_server(args.conf)
- proj_id_view = statusdb["projects"].view("project/project_id")
+ with open(args.conf) as conf_file:
+ conf = yaml.load(conf_file, Loader=yaml.SafeLoader)
+ statusdb = load_couch_server(conf)
operator = "par.lundin@scilifelab.se"
summary = {}
@@ -149,7 +152,13 @@ def get_email(fullname):
)
if completed: # If we actually have stuff to mail
- doc = statusdb["projects"].get(proj_id_view[p.id].rows[0].value)
+ doc = statusdb.post_view(
+ db="projects",
+ ddoc="project",
+ view="project_id",
+ key=p.id,
+ include_docs=True,
+ ).get_result()["rows"][0]["doc"]
if "project_coordinator" in doc["details"]:
pc = doc["details"]["project_coordinator"]
summary[pc] = completed
diff --git a/scripts/project_summary_upload_LIMS.py b/scripts/project_summary_upload_LIMS.py
index 79d620f..9a76db6 100755
--- a/scripts/project_summary_upload_LIMS.py
+++ b/scripts/project_summary_upload_LIMS.py
@@ -21,16 +21,17 @@
from genologics_sql.queries import get_last_modified_projectids
from genologics_sql.tables import Project as DBProject
from genologics_sql.utils import get_configuration, get_session
-from statusdb.db.utils import load_couch_server
from LIMS2DB.classes import ProjectSQL
-from LIMS2DB.utils import formatStack
+from LIMS2DB.utils import QueueHandler, formatStack, load_couch_server, stillRunning
def main(options):
conf = options.conf
output_f = options.output_f
- couch = load_couch_server(conf)
+ with open(conf) as conf_file:
+ couch_conf = yaml.load(conf_file, Loader=yaml.SafeLoader)
+ couch = load_couch_server(couch_conf)
mainlims = Lims(BASEURI, USERNAME, PASSWORD)
lims_db = get_session()
@@ -47,7 +48,7 @@ def main(options):
with open(options.oconf) as ocf:
oconf = yaml.load(ocf, Loader=yaml.SafeLoader)["order_portal"]
except Exception as e:
- mainlog.warn(f"Loading orderportal config {options.oconf} failed due to {e}, so order information for project will not be updated")
+ mainlog.warning(f"Loading orderportal config {options.oconf} failed due to {e}, so order information for project will not be updated")
if options.project_name:
host = get_configuration()["url"]
@@ -97,7 +98,9 @@ def create_projects_list(options, db_session, lims, log):
def processPSUL(options, queue, logqueue, oconf=None):
- couch = load_couch_server(options.conf)
+ with open(options.conf) as conf_file:
+ couch_conf = yaml.load(conf_file, Loader=yaml.SafeLoader)
+ couch = load_couch_server(couch_conf)
db_session = get_session()
work = True
procName = mp.current_process().name
@@ -190,80 +193,6 @@ def masterProcess(options, projectList, mainlims, logger, oconf=None):
break
-def stillRunning(processList):
- ret = False
- for p in processList:
- if p.is_alive():
- ret = True
-
- return ret
-
-
-class QueueHandler(logging.Handler):
- """
- This handler sends events to a queue. Typically, it would be used together
- with a multiprocessing Queue to centralise logging to file in one process
- (in a multi-process application), so as to avoid file write contention
- between processes.
-
- This code is new in Python 3.2, but this class can be copy pasted into
- user code for use with earlier Python versions.
- """
-
- def __init__(self, queue):
- """
- Initialise an instance, using the passed queue.
- """
- logging.Handler.__init__(self)
- self.queue = queue
-
- def enqueue(self, record):
- """
- Enqueue a record.
-
- The base implementation uses put_nowait. You may want to override
- this method if you want to use blocking, timeouts or custom queue
- implementations.
- """
- self.queue.put_nowait(record)
-
- def prepare(self, record):
- """
- Prepares a record for queuing. The object returned by this method is
- enqueued.
-
- The base implementation formats the record to merge the message
- and arguments, and removes unpickleable items from the record
- in-place.
-
- You might want to override this method if you want to convert
- the record to a dict or JSON string, or send a modified copy
- of the record while leaving the original intact.
- """
- # The format operation gets traceback text into record.exc_text
- # (if there's exception data), and also puts the message into
- # record.message. We can then use this to replace the original
- # msg + args, as these might be unpickleable. We also zap the
- # exc_info attribute, as it's no longer needed and, if not None,
- # will typically not be pickleable.
- self.format(record)
- record.msg = record.message
- record.args = None
- record.exc_info = None
- return record
-
- def emit(self, record):
- """
- Emit a record.
-
- Writes the LogRecord to the queue, preparing it for pickling first.
- """
- try:
- self.enqueue(self.prepare(record))
- except Exception:
- self.handleError(record)
-
-
if __name__ == "__main__":
usage = "Usage: python project_summary_upload_LIMS.py [options]"
parser = ArgumentParser(usage=usage)
diff --git a/scripts/run_diff_with_DB.py b/scripts/run_diff_with_DB.py
index 5cd4714..310e1ca 100644
--- a/scripts/run_diff_with_DB.py
+++ b/scripts/run_diff_with_DB.py
@@ -4,9 +4,9 @@
import random
import yaml
-from statusdb.db.utils import load_couch_server
import LIMS2DB.diff as df
+from LIMS2DB.utils import load_couch_server
def write_results_to_file(diffs, args):
@@ -19,34 +19,44 @@ def write_results_to_file(diffs, args):
def main(args):
- couch = load_couch_server(args.conf)
- proj_db = couch["projects"]
+ with open(args.conf) as conf_file:
+ couch_conf = yaml.load(conf_file, Loader=yaml.SafeLoader)
+ couch = load_couch_server(couch_conf)
with open(args.oconf) as ocf:
oconf = yaml.load(ocf, Loader=yaml.SafeLoader)["order_portal"]
diffs = {}
if args.pj_id:
- diffs[args.pj_id] = df.diff_project_objects(args.pj_id, couch, proj_db, args.log, oconf)
+ diffs[args.pj_id] = df.diff_project_objects(args.pj_id, couch, args.log, oconf)
elif args.random:
random.seed()
closed_ids = []
- proj_db = couch["projects"]
- view = proj_db.view("project/summary")
- for row in view[["closed", ""] : ["closed", "ZZZZZZZZ"]]:
- if row.value.get("open_date", "0") > "2014-06-01":
- closed_ids.append(row.key[1])
+ rows = couch.post_view(
+ db="projects",
+ ddoc="project",
+ view="summary",
+ startkey=["closed", ""],
+ endkey=["closed", "ZZZZZZZZ"],
+ ).get_result()["rows"]
+ for row in rows:
+ if row["value"].get("open_date", "0") > "2014-06-01":
+ closed_ids.append(row["key"][1])
nb = int(len(closed_ids) / 10)
picked_ids = random.sample(closed_ids, nb)
for one_id in picked_ids:
- diffs[one_id] = df.diff_project_objects(one_id, couch, proj_db, args.log, oconf)
+ diffs[one_id] = df.diff_project_objects(one_id, couch, args.log, oconf)
else:
- view = proj_db.view("project/project_id")
+ view = couch.post_view(
+ db="projects",
+ ddoc="project",
+ view="project_id",
+ ).get_result()["rows"]
for row in view:
- proj_diff = df.diff_project_objects(row.key, couch, proj_db, args.log, oconf)
+ proj_diff = df.diff_project_objects(row["key"], couch, args.log, oconf)
if proj_diff is not None:
- diffs[row.key] = proj_diff
+ diffs[row["key"]] = proj_diff
write_results_to_file(diffs, args)
diff --git a/scripts/update_project_min_reads.py b/scripts/update_project_min_reads.py
index 11b2249..3d4cc87 100644
--- a/scripts/update_project_min_reads.py
+++ b/scripts/update_project_min_reads.py
@@ -8,7 +8,7 @@
from genologics_sql.queries import get_last_modified_projectids
from genologics_sql.utils import get_session
-from LIMS2DB.utils import setupServer
+from LIMS2DB.utils import load_couch_server
def main(args):
@@ -16,15 +16,15 @@ def main(args):
lims = Lims(BASEURI, USERNAME, PASSWORD)
with open(args.conf) as cf:
db_conf = yaml.load(cf, Loader=yaml.SafeLoader)
- couch = setupServer(db_conf)
- db = couch["expected_yields"]
+ couch = load_couch_server(db_conf)
postgres_string = f"{args.hours} hours"
project_ids = get_last_modified_projectids(lims_db, postgres_string)
min_yields = {}
- for row in db.view("yields/min_yield"):
- db_key = " ".join(x if x else "" for x in row.key).strip()
- min_yields[db_key] = row.value
+ rows = couch.post_view(db="expected_yields", ddoc="yields", view="min_yield").get_result()["rows"]
+ for row in rows:
+ db_key = " ".join(x if x else "" for x in row["key"]).strip()
+ min_yields[db_key] = row["value"]
for project in [Project(lims, id=x) for x in project_ids]:
samples_count = 0
diff --git a/scripts/workset_upload_sql.py b/scripts/workset_upload_sql.py
index f344fe2..773a07a 100644
--- a/scripts/workset_upload_sql.py
+++ b/scripts/workset_upload_sql.py
@@ -20,15 +20,18 @@ def main(args):
ws = lclasses.Workset_SQL(session, log, step)
with open(args.conf) as conf_file:
conf = yaml.load(conf_file, Loader=yaml.SafeLoader)
- couch = lutils.setupServer(conf)
- db = couch["worksets"]
+ couch = lutils.load_couch_server(conf)
doc = {}
- for row in db.view("worksets/lims_id")[ws.obj["id"]]:
- doc = db.get(row.id)
+ result = couch.post_view(db="worksets", ddoc="worksets", view="lims_id", key=ws.obj["id"], include_docs=True).get_result()["rows"]
+ if result:
+ doc = result[0]["doc"]
final_doc = lutils.merge(ws.obj, doc)
- db.save(final_doc)
+ couch.post_document(
+ db="worksets",
+ document=final_doc,
+ ).get_result()
elif args.recent:
recent_processes = get_last_modified_processes(