diff --git a/marconiclient/__init__.py b/marconiclient/__init__.py index 0489ae5..c930f3b 100644 --- a/marconiclient/__init__.py +++ b/marconiclient/__init__.py @@ -1,10 +1,4 @@ """Marconi Client Library Binding""" -# Hoist everything up into marconiclient -from client import * -from misc import * -from exceptions import * -from auth import * -from queue import * -from message import * -from claim import * +from marconiclient.client import Connection +from marconiclient import exceptions diff --git a/marconiclient/auth.py b/marconiclient/auth.py index 6397faa..9dae0cf 100644 --- a/marconiclient/auth.py +++ b/marconiclient/auth.py @@ -1,8 +1,8 @@ -from exceptions import ClientException - -from keystoneclient.v2_0 import client as ksclient from keystoneclient import exceptions +from keystoneclient.v2_0 import client as ksclient + +from marconiclient import exceptions as exc def authenticate(auth_url, user, key, **kwargs): @@ -46,11 +46,11 @@ def authenticate(auth_url, user, key, **kwargs): insecure=insecure) except exceptions.Unauthorized as ex: - raise ClientException('Unauthorized. Check username, password' - ' and tenant name/id') + raise exc.ClientException('Unauthorized. Check username, password' + ' and tenant name/id') except exceptions.AuthorizationFailure as err: - raise ClientException('Authorization Failure. %s' % err) + raise exc.ClientException('Authorization Failure. %s' % err) if not endpoint: # The user did not pass in an endpoint, so we need to @@ -68,6 +68,6 @@ def authenticate(auth_url, user, key, **kwargs): service_type=service_type, endpoint_type=endpoint_type) except exceptions.EndpointNotFound as ex: - raise ClientException('Endpoint not found in service catalog') + raise exc.ClientException('Endpoint not found in service catalog') return (endpoint, _ksclient.auth_token) diff --git a/marconiclient/claim.py b/marconiclient/claim.py deleted file mode 100644 index b1d54d9..0000000 --- a/marconiclient/claim.py +++ /dev/null @@ -1,41 +0,0 @@ - -class Claim(object): - - def __init__(self, conn, href, messages): - """ - :param: conn The conn to use for manipulating this claim - :param: href The fully-qualified URL for this claim - :param: messages A list of messages belonging to this claim - """ - self._conn = conn - self._href = href - self._msgs = messages - - @property - def messages(self): - """ - Returns the messages that were associated with - this claim at creation time. - """ - return self._msgs - - def read(self): - """ - Gets the claim metadata and the associated messages. - """ - hdrs, body = self._conn._perform_http(href=self._href, method='GET') - - return body - - def update(self, ttl): - """ - Updates this claim with the specified TTL - """ - body = {"ttl": ttl} - self._conn._perform_http(href=self._href, method='PATCH', body=body) - - def release(self): - """ - Releases the current claim - """ - self._conn._perform_http(href=self._href, method='DELETE') diff --git a/marconiclient/client.py b/marconiclient/client.py index cda5a03..0f9cc9a 100644 --- a/marconiclient/client.py +++ b/marconiclient/client.py @@ -1,17 +1,19 @@ -from eventlet.green.urllib import quote -import eventlet -eventlet.monkey_patch(socket=True, select=True) +try: + import simplejson as json +except ImportError: + import json +import urlparse -import json -from functools import wraps -from auth import authenticate -from misc import proc_template -from queue import Queue, NoSuchQueueError -from exceptions import ClientException -from urlparse import urljoin +import eventlet import requests +from marconiclient import auth +from marconiclient import misc +from marconiclient import resources + +eventlet.monkey_patch(socket=True, select=True) + class Connection(object): def __init__(self, client_id, auth_endpoint, user, key, **kwargs): @@ -61,22 +63,19 @@ def connect(self, token=None): self.auth_token = token else: (self._endpoint, - self.auth_token) = authenticate(self._auth_endpoint, - self._user, self._key, - endpoint=self._endpoint, - cacert=self._cacert) + self.auth_token) = auth.authenticate( + self._auth_endpoint, + self._user, self._key, + endpoint=self._endpoint, + cacert=self._cacert) self._load_homedoc_hrefs() @property def auth_token(self): - try: - return self._session.headers['X-Auth-Token'] - except KeyError: - return None + return self._session.headers.get('X-Auth-Token') @auth_token.setter def auth_token(self, value): - self._token = value self._session.headers['X-Auth-Token'] = value def _load_homedoc_hrefs(self): @@ -122,12 +121,12 @@ def create_queue(self, queue_name): :param queue_name: The name of the queue :param ttl: The default time-to-live for messages in this queue """ - href = proc_template(self.queue_href, queue_name=queue_name) + href = misc.proc_template(self.queue_href, queue_name=queue_name) body = {} self._perform_http(href=href, method='PUT', request_body=body) - return Queue(self, href=href, name=queue_name, metadata=body) + return resources.Queue(self, href=href, name=queue_name, metadata=body) def get_queue(self, queue_name): """ @@ -135,14 +134,15 @@ def get_queue(self, queue_name): :param queue_name: The name of the queue """ - href = proc_template(self.queue_href, queue_name=queue_name) + href = misc.proc_template(self.queue_href, queue_name=queue_name) try: hdrs, body = self._perform_http(href=href, method='GET') - except ClientException as ex: - raise NoSuchQueueError(queue_name) if ex.http_status == 404 else ex + except exc.ClientException as ex: + raise exc.NoSuchQueueError(queue_name) if \ + ex.http_status == 404 else ex - return Queue(self, href=href, name=queue_name, metadata=body) + return resources.Queue(self, href=href, name=queue_name, metadata=body) def get_queues(self): href = self.queues_href @@ -151,8 +151,10 @@ def get_queues(self): queues = res["queues"] for queue in queues: - yield Queue(conn=self._conn, name=queue['name'], - href=queue['href'], metadata=queue['metadata']) + yield resources.Queue(conn=self._conn, + name=queue['name'], + href=queue['href'], + metadata=queue['metadata']) def delete_queue(self, queue_name): """ @@ -160,16 +162,17 @@ def delete_queue(self, queue_name): :param queue_name: The name of the queue """ - href = proc_template(self.queue_href, queue_name=queue_name) + href = misc.proc_template(self.queue_href, queue_name=queue_name) self._perform_http(href=href, method='DELETE') def get_queue_metadata(self, queue_name): - href = proc_template(self._queue_href, queue_name=queue_name) + href = misc.proc_template(self._queue_href, queue_name=queue_name) try: return self._perform_http(conn, href, 'GET') - except ClientException as ex: - raise NoSuchQueueError(queue_name) if ex.http_status == 404 else ex + except exc.ClientException as ex: + raise exc.NoSuchQueueError(queue_name) if \ + ex.http_status == 404 else ex def _perform_http(self, method, href, request_body='', headers={}): """ @@ -185,7 +188,7 @@ def _perform_http(self, method, href, request_body='', headers={}): if not isinstance(request_body, str): request_body = json.dumps(request_body) - url = urljoin(self._endpoint, href) + url = urlparse.urljoin(self._endpoint, href) response = requests.request(method=method, url=url, data=request_body, headers={"Client-Id": self._client_id}) @@ -195,10 +198,10 @@ def _perform_http(self, method, href, request_body='', headers={}): # Check if the status code is 2xx class if not response.ok: - raise ClientException(href=href, - method=method, - http_status=response.status_code, - http_response_content=response.content) + raise exc.ClientException(href=href, + method=method, + http_status=response.status_code, + http_response_content=response.content) resp_body = json.loads(response.content) if response.content else '' diff --git a/marconiclient/exceptions.py b/marconiclient/exceptions.py index ee13fb0..99b0d00 100644 --- a/marconiclient/exceptions.py +++ b/marconiclient/exceptions.py @@ -1,5 +1,4 @@ - class ClientException(Exception): """Exception for wrapping up Marconi client errors""" def __init__(self, href='', http_status=0, @@ -12,3 +11,16 @@ def __init__(self, href='', http_status=0, msg = "%s %s returned %d" % (self.method, self.href, self.http_status) Exception.__init__(self, msg) + + +class NoSuchQueueError(Exception): + def __init__(self, name): + self._queue_name = name + + def __str__(self): + return "Queue '%s' not found" % (self._queue_name) + + +class NoSuchMessageError(Exception): + def __init__(self, name): + pass diff --git a/marconiclient/message.py b/marconiclient/message.py deleted file mode 100644 index cc77b74..0000000 --- a/marconiclient/message.py +++ /dev/null @@ -1,42 +0,0 @@ - - -class NoSuchMessageError(Exception): - def __init__(self, name): - pass - - -class Message(object): - - def __init__(self, conn, href, content=None): - """ - Creates a message object. This class should never - be instantiated directly by a user - - :param - """ - self._conn = conn - self._href = href - self._content = content - - def __getitem__(self, key): - if self._content: - return self._content[key] - else: - raise KeyError() - - @property - def href(self): - return self._href - - def read(self): - """ - Gets this message and returns the content, includinig all metadata - """ - hdrs, body = self._conn._perform_http(href=self._href, method='GET') - - return body - - def delete(self): - # Note: marconi currently treats messages as idempotent, so - # we should never receive a 404 back - self._conn._perform_http(href=self._href, method='DELETE') diff --git a/marconiclient/queue.py b/marconiclient/resources.py similarity index 57% rename from marconiclient/queue.py rename to marconiclient/resources.py index 527cf63..a8fdd81 100644 --- a/marconiclient/queue.py +++ b/marconiclient/resources.py @@ -1,16 +1,5 @@ -from misc import proc_template -from message import Message -from claim import Claim -from stats import Stats - - -class NoSuchQueueError(Exception): - def __init__(self, name): - self._queue_name = name - - def __str__(self): - return "Queue '%s' not found" % (self._queue_name) +from marconiclient import misc class Queue(object): @@ -54,7 +43,8 @@ def post_message(self, message, ttl): Posts a single message with the specified ttl :param ttl: The TTL to set on this message """ - href = proc_template(self._conn.messages_href, queue_name=self.name) + href = misc.proc_template(self._conn.messages_href, + queue_name=self.name) body = [{"ttl": ttl, "body": message}] @@ -70,7 +60,7 @@ def claim(self, ttl, grace, limit=5): Claims a set of messages. The server configuration determines the maximum number of messages that can be claimed. """ - href = proc_template(self._claims_template, limit=str(limit)) + href = misc.proc_template(self._claims_template, limit=str(limit)) body = {"ttl": ttl, "grace": grace} @@ -94,8 +84,9 @@ def get_messages(self, echo=False, restart=False): TODO(jdp): Comment me """ if not self._get_messages_href or restart: - self._get_messages_href = proc_template(self._conn.messages_href, - queue_name=self.name) + self._get_messages_href = misc.proc_template( + self._conn.messages_href, + queue_name=self.name) if echo: self._get_messages_href += "?echo=true" @@ -124,13 +115,92 @@ def get_messages(self, echo=False, restart=False): def get_stats(self): """Retrieves statistics about the queue""" - href = proc_template(self._conn.stats_href, queue_name=self._name) + href = misc.proc_template(self._conn.stats_href, queue_name=self._name) hdrs, body = self._conn._perform_http(href=href, method='GET') - return Stats(body) + return body @property def name(self): """The name of this queue""" return self._name + + +class Message(object): + + def __init__(self, conn, href, content=None): + """ + Creates a message object. This class should never + be instantiated directly by a user + + :param + """ + self._conn = conn + self._href = href + self._content = content + + def __getitem__(self, key): + if self._content: + return self._content[key] + else: + raise KeyError() + + @property + def href(self): + return self._href + + def read(self): + """ + Gets this message and returns the content, includinig all metadata + """ + hdrs, body = self._conn._perform_http(href=self._href, method='GET') + + return body + + def delete(self): + # Note: marconi currently treats messages as idempotent, so + # we should never receive a 404 back + self._conn._perform_http(href=self._href, method='DELETE') + + +class Claim(object): + + def __init__(self, conn, href, messages): + """ + :param: conn The conn to use for manipulating this claim + :param: href The fully-qualified URL for this claim + :param: messages A list of messages belonging to this claim + """ + self._conn = conn + self._href = href + self._msgs = messages + + @property + def messages(self): + """ + Returns the messages that were associated with + this claim at creation time. + """ + return self._msgs + + def read(self): + """ + Gets the claim metadata and the associated messages. + """ + hdrs, body = self._conn._perform_http(href=self._href, method='GET') + + return body + + def update(self, ttl): + """ + Updates this claim with the specified TTL + """ + body = {"ttl": ttl} + self._conn._perform_http(href=self._href, method='PATCH', body=body) + + def release(self): + """ + Releases the current claim + """ + self._conn._perform_http(href=self._href, method='DELETE') diff --git a/marconiclient/stats.py b/marconiclient/stats.py deleted file mode 100644 index f6fe17a..0000000 --- a/marconiclient/stats.py +++ /dev/null @@ -1,18 +0,0 @@ -class Stats(object): - - def __init__(self, stats): - """ - :param: stats Statistics about a queue - """ - self._message_stats = stats['messages'] - self._action_stats = stats['actions'] - - @property - def messages(self): - """Returns statistics about the queue's messages""" - return self._message_stats - - @property - def actions(self): - """Returns statistics about the queue's actions""" - return self._action_stats diff --git a/tests/test_marconiclient.py b/tests/test_marconiclient.py index ade973b..719e05e 100644 --- a/tests/test_marconiclient.py +++ b/tests/test_marconiclient.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. - import testtools -from marconiclient import * -from eventlet import GreenPool import uuid -from urlparse import urlparse + +import eventlet + +import marconiclient class TestClientException(testtools.TestCase): @@ -26,14 +26,14 @@ class TestClientException(testtools.TestCase): def test_exception(self): # Basic instantiation and inheritance check - ex = ClientException("Something bad happened") + ex = marconiclient.exceptions.ClientException("Something bad happened") self.assertTrue(isinstance(ex, Exception)) #TODO Use dependency injection to mock HTTP(S)Client def test_connection(self): """ - conn = Connection( + conn = marconiclient.Connection( auth_endpoint="https://identity.api.rackspacecloud.com/v2.0", client_id=str(uuid.uuid4()), endpoint="http://localhost:8888/v1/12345", @@ -41,7 +41,7 @@ def test_connection(self): """ - conn = Connection( + conn = marconiclient.Connection( auth_endpoint="https://identity.api.rackspacecloud.com/v2.0", client_id=str(uuid.uuid4()), endpoint="http://166.78.143.130/v1/12345", @@ -59,7 +59,7 @@ def delete_worker(queue_name): conn.delete_queue(queue_name) return queue_name - pool = GreenPool(100) + pool = eventlet.GreenPool(100) def on_message_posted(greenthread): msg = greenthread.wait()