From 7a8e6f6f446d71f8fd4f17de48994c0b6bee72ee Mon Sep 17 00:00:00 2001 From: Christophe Eymard Date: Mon, 2 Apr 2012 13:32:20 +0200 Subject: [PATCH] Maxconnection now just limits the concurrent connections to mongodb instead of raising an exception when the maxconnections are reached. --- asyncmongo/cursor.py | 221 +++++++++++++++++++++++-------------------- asyncmongo/pool.py | 72 ++++++++------ 2 files changed, 159 insertions(+), 134 deletions(-) diff --git a/asyncmongo/cursor.py b/asyncmongo/cursor.py index 83cbbb5..c3b7865 100644 --- a/asyncmongo/cursor.py +++ b/asyncmongo/cursor.py @@ -1,5 +1,5 @@ #!/bin/env python -# +# # Copyright 2010 bit.ly # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -36,16 +36,16 @@ def __init__(self, dbname, collection, pool): assert isinstance(dbname, (str, unicode)) assert isinstance(collection, (str, unicode)) assert isinstance(pool, object) - + self.__dbname = dbname self.__collection = collection self.__pool = pool self.__slave_okay = False - + @property def full_collection_name(self): return u'%s.%s' % (self.__dbname, self.__collection) - + def drop(self, *args, **kwargs): raise NotImplemented("patches accepted") @@ -56,7 +56,7 @@ def save(self, doc, **kwargs): def insert(self, doc_or_docs, manipulate=True, safe=True, check_keys=True, callback=None, **kwargs): """Insert a document(s) into this collection. - + If `manipulate` is set, the document(s) are manipulated using any :class:`~pymongo.son_manipulator.SONManipulator` instances that have been added to this @@ -64,17 +64,17 @@ def insert(self, doc_or_docs, the inserted document or a list of ``"_id"`` values of the inserted documents. If the document(s) does not already contain an ``"_id"`` one will be added. - + If `safe` is ``True`` then the insert will be checked for errors, raising :class:`~pymongo.errors.OperationFailure` if one occurred. Safe inserts wait for a response from the database, while normal inserts do not. - + Any additional keyword arguments imply ``safe=True``, and will be used as options for the resultant `getLastError` command. For example, to wait for replication to 3 nodes, pass ``w=3``. - + :Parameters: - `doc_or_docs`: a document or list of documents to be inserted @@ -87,77 +87,79 @@ def insert(self, doc_or_docs, - `**kwargs` (optional): any additional arguments imply ``safe=True``, and will be used as options for the `getLastError` command - + .. mongodoc:: insert """ if not isinstance(safe, bool): raise TypeError("safe must be an instance of bool") - + docs = doc_or_docs # return_one = False if isinstance(docs, dict): # return_one = True docs = [docs] - + # if manipulate: # docs = [self.__database._fix_incoming(doc, self) for doc in docs] - + self.__limit = None if kwargs: safe = True - + if safe and not callable(callback): raise TypeError("callback must be callable") if not safe and callback is not None: raise TypeError("callback can not be used with safe=False") - + if callback: callback = functools.partial(self._handle_response, orig_callback=callback) - connection = self.__pool.connection() - try: - connection.send_message( - message.insert(self.full_collection_name, docs, - check_keys, safe, kwargs), callback=callback) - except: - connection.close() - raise - + def _handle_connection(connection): + try: + connection.send_message( + message.insert(self.full_collection_name, docs, + check_keys, safe, kwargs), callback=callback) + except: + connection.close() + raise + self.__pool.connection(_handle_connection) + def remove(self, spec_or_id=None, safe=True, callback=None, **kwargs): if not isinstance(safe, bool): raise TypeError("safe must be an instance of bool") - + if spec_or_id is None: spec_or_id = {} if not isinstance(spec_or_id, dict): spec_or_id = {"_id": spec_or_id} - + self.__limit = None if kwargs: safe = True - + if safe and not callable(callback): raise TypeError("callback must be callable") if not safe and callback is not None: raise TypeError("callback can not be used with safe=False") - + if callback: callback = functools.partial(self._handle_response, orig_callback=callback) - connection = self.__pool.connection() - try: - connection.send_message( - message.delete(self.full_collection_name, spec_or_id, safe, kwargs), - callback=callback) - except: - connection.close() - raise + def _handle_connection(connection): + try: + connection.send_message( + message.delete(self.full_collection_name, spec_or_id, safe, kwargs), + callback=callback) + except: + connection.close() + raise + self.__pool.connection(_handle_connection) + - def update(self, spec, document, upsert=False, manipulate=False, safe=True, multi=False, callback=None, **kwargs): """Update a document(s) in this collection. - + Raises :class:`TypeError` if either `spec` or `document` is not an instance of ``dict`` or `upsert` is not an instance of ``bool``. If `safe` is ``True`` then the update will be @@ -166,14 +168,14 @@ def update(self, spec, document, upsert=False, manipulate=False, occurred. Safe updates require a response from the database, while normal updates do not - thus, setting `safe` to ``True`` will negatively impact performance. - + There are many useful `update modifiers`_ which can be used when performing updates. For example, here we use the ``"$set"`` modifier to modify some fields in a matching document: - + .. doctest:: - + >>> db.test.insert({"x": "y", "a": "b"}) ObjectId('...') >>> list(db.test.find()) @@ -181,15 +183,15 @@ def update(self, spec, document, upsert=False, manipulate=False, >>> db.test.update({"x": "y"}, {"$set": {"a": "c"}}) >>> list(db.test.find()) [{u'a': u'c', u'x': u'y', u'_id': ObjectId('...')}] - + If `safe` is ``True`` returns the response to the *lastError* command. Otherwise, returns ``None``. - + # Any additional keyword arguments imply ``safe=True``, and will # be used as options for the resultant `getLastError` # command. For example, to wait for replication to 3 nodes, pass # ``w=3``. - + :Parameters: - `spec`: a ``dict`` or :class:`~bson.son.SON` instance specifying elements which must be present for a document @@ -214,9 +216,9 @@ def update(self, spec, document, upsert=False, manipulate=False, - `**kwargs` (optional): any additional arguments imply ``safe=True``, and will be used as options for the `getLastError` command - + .. _update modifiers: http://www.mongodb.org/display/DOCS/Updating - + .. mongodoc:: update """ if not isinstance(spec, dict): @@ -230,32 +232,33 @@ def update(self, spec, document, upsert=False, manipulate=False, # TODO: apply SON manipulators # if upsert and manipulate: # document = self.__database._fix_incoming(document, self) - + if kwargs: safe = True - + if safe and not callable(callback): raise TypeError("callback must be callable") if not safe and callback is not None: raise TypeError("callback can not be used with safe=False") - + if callback: callback = functools.partial(self._handle_response, orig_callback=callback) self.__limit = None - connection = self.__pool.connection() - try: - connection.send_message( - message.update(self.full_collection_name, upsert, multi, - spec, document, safe, kwargs), callback=callback) - except: - connection.close() - raise - - + def _handle_connection(connection): + try: + connection.send_message( + message.update(self.full_collection_name, upsert, multi, + spec, document, safe, kwargs), callback=callback) + except: + connection.close() + raise + self.__pool.connection(_handle_connection) + + def find_one(self, spec_or_id, **kwargs): """Get a single document from the database. - + All arguments to :meth:`find` are also valid arguments for :meth:`find_one`, although any `limit` argument will be ignored. Returns a single document, or ``None`` if no matching @@ -265,29 +268,29 @@ def find_one(self, spec_or_id, **kwargs): spec_or_id = {"_id": spec_or_id} kwargs['limit'] = -1 self.find(spec_or_id, **kwargs) - + def find(self, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, slave_okay=False, _must_use_master=False, _is_command=False, hint=None, debug=False, callback=None): """Query the database. - + The `spec` argument is a prototype document that all results must match. For example: - + >>> db.test.find({"hello": "world"}, callback=...) - + only matches documents that have a key "hello" with value "world". Matches can have other keys *in addition* to "hello". The `fields` argument is used to specify a subset of fields that should be included in the result documents. By limiting results to a certain subset of fields you can cut down on network traffic and decoding time. - + Raises :class:`TypeError` if any of the arguments are of improper type. - + :Parameters: - `spec` (optional): a SON object specifying elements which must be present for a document to be included in the @@ -325,13 +328,13 @@ def find(self, spec=None, fields=None, skip=0, limit=0, examined when performing the query - `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance - + .. mongodoc:: find """ - + if spec is None: spec = {} - + if not isinstance(spec, dict): raise TypeError("spec must be an instance of dict") if not isinstance(skip, int): @@ -346,19 +349,19 @@ def find(self, spec=None, fields=None, skip=0, limit=0, raise TypeError("tailable must be an instance of bool") if not callable(callback): raise TypeError("callback must be callable") - + if fields is not None: if not fields: fields = {"_id": 1} if not isinstance(fields, dict): fields = helpers._fields_list_to_dict(fields) - + self.__spec = spec self.__fields = fields self.__skip = skip self.__limit = limit self.__batch_size = 0 - + self.__timeout = timeout self.__tailable = tailable self.__snapshot = snapshot @@ -372,48 +375,56 @@ def find(self, spec=None, fields=None, skip=0, limit=0, self.__tz_aware = False #collection.database.connection.tz_aware self.__must_use_master = _must_use_master self.__is_command = _is_command - - connection = self.__pool.connection() - try: - if self.__debug: - logging.debug('QUERY_SPEC: %r' % self.__query_spec()) - - connection.send_message( - message.query(self.__query_options(), - self.full_collection_name, - self.__skip, - self.__limit, - self.__query_spec(), - self.__fields), - callback=functools.partial(self._handle_response, orig_callback=callback)) - except Exception, e: - logging.error('Error sending query %s' % e) - connection.close() - raise - + + def _handle_connection(connection): + try: + if self.__debug: + logging.debug('QUERY_SPEC: %r' % self.__query_spec()) + + connection.send_message( + message.query(self.__query_options(), + self.full_collection_name, + self.__skip, + self.__limit, + self.__query_spec(), + self.__fields), + callback=functools.partial(self._handle_response, orig_callback=callback)) + except Exception, e: + logging.error('Error sending query %s' % e) + connection.close() + raise + self.__pool.connection(_handle_connection) + def _handle_response(self, result, error=None, orig_callback=None): - if result and result.get('cursor_id'): - connection = self.__pool.connection() + def _handle_finish(_res=None, _err=None): + if error: + logging.error('%s %s' % (self.full_collection_name , error)) + orig_callback(None, error=error) + else: + if self.__limit == -1 and len(result['data']) == 1: + # handle the find_one() call + orig_callback(result['data'][0], error=None) + else: + orig_callback(result['data'], error=None) + + def _close_cursor(connection): try: connection.send_message( message.kill_cursors([result['cursor_id']]), callback=None) + _handle_finish() except Exception, e: logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e)) connection.close() raise - - if error: - logging.error('%s %s' % (self.full_collection_name , error)) - orig_callback(None, error=error) + + if result and result.get('cursor_id'): + self.__pool.connection(_close_cursor) else: - if self.__limit == -1 and len(result['data']) == 1: - # handle the find_one() call - orig_callback(result['data'][0], error=None) - else: - orig_callback(result['data'], error=None) + _handle_finish() + + - def __query_options(self): """Get the query options string to use for this query.""" options = 0 @@ -424,7 +435,7 @@ def __query_options(self): if not self.__timeout: options |= _QUERY_OPTIONS["no_timeout"] return options - + def __query_spec(self): """Get the spec to use for a query.""" spec = self.__spec @@ -441,5 +452,5 @@ def __query_spec(self): if self.__max_scan: spec["$maxScan"] = self.__max_scan return spec - - + + diff --git a/asyncmongo/pool.py b/asyncmongo/pool.py index 97d6411..ee7b8fd 100644 --- a/asyncmongo/pool.py +++ b/asyncmongo/pool.py @@ -1,5 +1,5 @@ #!/bin/env python -# +# # Copyright 2010 bit.ly # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -15,6 +15,7 @@ # under the License. from threading import Condition +from collections import deque import logging from errors import TooManyConnections, ProgrammingError from connection import Connection @@ -35,7 +36,7 @@ def get_connection_pool(self, pool_id, *args, **kwargs): if pool_id not in self._pools: self._pools[pool_id] = ConnectionPool(*args, **kwargs) return self._pools[pool_id] - + @classmethod def close_idle_connections(self, pool_id=None): """close idle connections to mongo""" @@ -54,7 +55,7 @@ def close_idle_connections(self, pool_id=None): class ConnectionPool(object): """Connection Pool to a single mongo instance. - + :Parameters: - `mincached` (optional): minimum connections to open on instantiation. 0 to open connections on first use - `maxcached` (optional): maximum inactive cached connections for this pool. 0 for unlimited @@ -63,15 +64,15 @@ class ConnectionPool(object): - `dbname`: mongo database name - `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance - `**kwargs`: passed to `connection.Connection` - + """ - def __init__(self, - mincached=0, - maxcached=0, - maxconnections=0, - maxusage=0, - dbname=None, - slave_okay=False, + def __init__(self, + mincached=0, + maxcached=0, + maxconnections=0, + maxusage=0, + dbname=None, + slave_okay=False, *args, **kwargs): assert isinstance(mincached, int) assert isinstance(maxcached, int) @@ -95,33 +96,41 @@ def __init__(self, self._slave_okay = slave_okay self._connections = 0 - + self._backlog_queue = deque() # The queue of all the clients waiting for + # a connection. + # Establish an initial number of idle database connections: - idle = [self.connection() for i in range(mincached)] - while idle: - self.cache(idle.pop()) - + idle = [self.new_connection() for i in range(mincached)] + while idle: self.cache(idle.pop()) + def new_connection(self): kwargs = self._kwargs kwargs['pool'] = self return Connection(*self._args, **kwargs) - - def connection(self): + + def connection(self, callback, from_backlog=False): """ get a cached connection from the pool """ - + + con = None self._condition.acquire() try: if (self._maxconnections and self._connections >= self._maxconnections): - raise TooManyConnections("%d connections are already equal to the max: %d" % (self._connections, self._maxconnections)) - # connection limit not reached, get a dedicated connection - try: # first try to get it from the idle cache - con = self._idle_cache.pop(0) - except IndexError: # else get a fresh connection - con = self.new_connection() - self._connections += 1 + if from_backlog: # We requeue the request on top of the backlog if it came from there. + self._backlog_queue.appendleft(callback) + else: # Otherwise, we just queue it. + self._backlog_queue.append(callback) + else: + # connection limit not reached, get a dedicated connection + try: # first try to get it from the idle cache + con = self._idle_cache.pop(0) + except IndexError: # else get a fresh connection + con = self.new_connection() + self._connections += 1 finally: self._condition.release() - return con + + # If we acquired a connection object, we can call the callback that was supplied. + if con: callback(con) def cache(self, con): """Put a dedicated connection back into the idle cache.""" @@ -146,7 +155,12 @@ def cache(self, con): finally: self._connections -= 1 self._condition.release() - + + if self._backlog_queue: + # Try to see if we can use a connection if the backlog is not empty. + self.connection(self._backlog_queue.popleft(), from_backlog=True) + + def close(self): """Close all connections in the pool.""" self._condition.acquire() @@ -161,5 +175,5 @@ def close(self): self._condition.notifyAll() finally: self._condition.release() - +