diff --git a/.travis.yml b/.travis.yml index 907d336..2ebcf5d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -58,9 +58,9 @@ install: # flake8 style checker - pip install flake8 pep8-naming flake8-debugger flake8-docstrings script: - - QUAMASH_QTIMPL=PySide py.test - - QUAMASH_QTIMPL=PyQt4 py.test - - QUAMASH_QTIMPL=PyQt5 py.test + - py.test --qtimpl PySide + - py.test --qtimpl PyQt4 + - py.test --qtimpl PyQt5 - flake8 --ignore=D1,W191,E501 - flake8 --select=D1 quamash/*.py cache: diff --git a/appveyor.yml b/appveyor.yml index 665e124..7a1a295 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -30,7 +30,7 @@ install: build: off test_script: - - "%PYTHON%\\Scripts\\py.test.exe" + - "%PYTHON%\\Scripts\\py.test.exe --qtimpl %QTIMPL%" notifications: - provider: Webhook diff --git a/conftest.py b/conftest.py index e34e139..0a96761 100644 --- a/conftest.py +++ b/conftest.py @@ -1,6 +1,7 @@ import sys import os.path import logging +from importlib import import_module from pytest import fixture sys.path.insert(0, os.path.dirname(__file__)) logging.basicConfig( @@ -12,7 +13,39 @@ collect_ignore = ['quamash/_windows.py'] +def pytest_addoption(parser): + parser.addoption("--qtimpl", default='guess') + + +def guess_qtimpl(): + for guess in ('PyQt5', 'PyQt4', 'PySide'): + try: + __import__(guess) + except ImportError: + continue + else: + return guess + + @fixture(scope='session') -def application(): - from quamash import QApplication - return QApplication([]) +def application(request): + qtimpl = request.config.getoption('qtimpl') + if qtimpl == 'guess': + qtimpl = guess_qtimpl() + __import__(qtimpl) + + for module in ('.QtWidgets', '.QtGui'): + try: + return import_module(module, qtimpl).QApplication([]) + except (ImportError, AttributeError): + continue + + +@fixture(scope='session') +def qtcore(request): + qtimpl = request.config.getoption('qtimpl') + if qtimpl == 'guess': + qtimpl = guess_qtimpl() + __import__(qtimpl) + + return import_module('.QtCore', qtimpl) diff --git a/quamash/__init__.py b/quamash/__init__.py index a7a5831..e945d85 100644 --- a/quamash/__init__.py +++ b/quamash/__init__.py @@ -12,99 +12,48 @@ import os import asyncio import time -from functools import wraps import itertools from queue import Queue from concurrent.futures import Future -import logging -logger = logging.getLogger('quamash') - -try: - QtModuleName = os.environ['QUAMASH_QTIMPL'] -except KeyError: - QtModule = None -else: - logger.info('Forcing use of {} as Qt Implementation'.format(QtModuleName)) - QtModule = __import__(QtModuleName) - -if not QtModule: - for QtModuleName in ('PyQt5', 'PyQt4', 'PySide'): - try: - QtModule = __import__(QtModuleName) - except ImportError: - continue - else: - break - else: - raise ImportError('No Qt implementations found') - -logger.info('Using Qt Implementation: {}'.format(QtModuleName)) - -QtCore = __import__(QtModuleName + '.QtCore', fromlist=(QtModuleName,)) -QtGui = __import__(QtModuleName + '.QtGui', fromlist=(QtModuleName,)) -if QtModuleName == 'PyQt5': - from PyQt5 import QtWidgets - QApplication = QtWidgets.QApplication -else: - QApplication = QtGui.QApplication - -if not hasattr(QtCore, 'Signal'): - QtCore.Signal = QtCore.pyqtSignal - +from importlib import import_module +import warnings from ._common import with_logger +if 'QUAMASH_QTIMPL' in os.environ: + warnings.warn("QUAMASH_QTIMPL environment variable set, this version of quamash ignores it.") -@with_logger -class _QThreadWorker(QtCore.QThread): - - """ - Read from the queue. - - For use by the QThreadExecutor - """ - def __init__(self, queue, num): - self.__queue = queue - self.__stop = False - self.__num = num - super().__init__() +def _run_in_worker(queue, num, logger): + while True: + command = queue.get() + if command is None: + # Stopping... + break - def run(self): - queue = self.__queue - while True: - command = queue.get() - if command is None: - # Stopping... - break - - future, callback, args, kwargs = command - self._logger.debug( - '#{} got callback {} with args {} and kwargs {} from queue' - .format(self.__num, callback, args, kwargs) - ) - if future.set_running_or_notify_cancel(): - self._logger.debug('Invoking callback') - try: - r = callback(*args, **kwargs) - except Exception as err: - self._logger.debug('Setting Future exception: {}'.format(err)) - future.set_exception(err) - else: - self._logger.debug('Setting Future result: {}'.format(r)) - future.set_result(r) + future, callback, args, kwargs = command + logger.debug( + '#{} got callback {} with args {} and kwargs {} from queue' + .format(num, callback, args, kwargs) + ) + if future.set_running_or_notify_cancel(): + logger.debug('Invoking callback') + try: + r = callback(*args, **kwargs) + except Exception as err: + logger.debug('Setting Future exception: {}'.format(err)) + future.set_exception(err) else: - self._logger.debug('Future was canceled') - - self._logger.debug('Thread #{} stopped'.format(self.__num)) + logger.debug('Setting Future result: {}'.format(r)) + future.set_result(r) + else: + logger.debug('Future was canceled') - def wait(self): - self._logger.debug('Waiting for thread #{} to stop...'.format(self.__num)) - super().wait() + logger.debug('Thread #{} stopped'.format(num)) @with_logger -class QThreadExecutor(QtCore.QObject): +class QThreadExecutor: """ ThreadExecutor that produces QThreads. @@ -112,17 +61,33 @@ class QThreadExecutor(QtCore.QObject): Same API as `concurrent.futures.Executor` >>> from quamash import QThreadExecutor - >>> with QThreadExecutor(5) as executor: + >>> QtCore = getfixture('qtcore') + >>> with QThreadExecutor(QtCore, 5) as executor: ... f = executor.submit(lambda x: 2 + x, 2) ... r = f.result() ... assert r == 4 """ - def __init__(self, max_workers=10, parent=None): - super().__init__(parent) + def __init__(self, qtcore, max_workers=10): + super().__init__() + + @with_logger + class QThreadWorker(qtcore.QThread): + def __init__(self, queue, num): + super().__init__() + self.__queue = queue + self.__num = num + + def run(self): + _run_in_worker(self.__queue, self.__num, self._logger) + + def wait(self): + self._logger.debug('Waiting for thread #{} to stop...'.format(self.__num)) + super().wait() + self.__max_workers = max_workers self.__queue = Queue() - self.__workers = [_QThreadWorker(self.__queue, i + 1) for i in range(max_workers)] + self.__workers = [QThreadWorker(self.__queue, i + 1) for i in range(max_workers)] self.__been_shutdown = False for w in self.__workers: @@ -165,56 +130,16 @@ def __exit__(self, *args): self.shutdown() -def _easycallback(fn): - """ - Decorator that wraps a callback in a signal. - - It also packs & unpacks arguments, and makes the wrapped function effectively - threadsafe. If you call the function from one thread, it will be executed in - the thread the QObject has affinity with. - - Remember: only objects that inherit from QObject can support signals/slots - - >>> import asyncio - >>> - >>> import quamash - >>> QThread, QObject = quamash.QtCore.QThread, quamash.QtCore.QObject - >>> - >>> app = getfixture('application') - >>> - >>> global_thread = QThread.currentThread() - >>> class MyObject(QObject): - ... @_easycallback - ... def mycallback(self): - ... global global_thread, mythread - ... cur_thread = QThread.currentThread() - ... assert cur_thread is not global_thread - ... assert cur_thread is mythread - >>> - >>> mythread = QThread() - >>> mythread.start() - >>> myobject = MyObject() - >>> myobject.moveToThread(mythread) - >>> - >>> @asyncio.coroutine - ... def mycoroutine(): - ... myobject.mycallback() - >>> - >>> loop = QEventLoop(app) - >>> asyncio.set_event_loop(loop) - >>> with loop: - ... loop.run_until_complete(mycoroutine()) - """ - @wraps(fn) - def in_wrapper(self, *args, **kwargs): - return signaler.signal.emit(self, args, kwargs) +def _make_signaller(qtimpl_qtcore, *args): + try: + signal_class = qtimpl_qtcore.Signal + except AttributeError: + signal_class = qtimpl_qtcore.pyqtSignal - class Signaler(QtCore.QObject): - signal = QtCore.Signal(object, tuple, dict) + class Signaller(qtimpl_qtcore.QObject): + signal = signal_class(*args) - signaler = Signaler() - signaler.signal.connect(lambda self, args, kwargs: fn(self, *args, **kwargs)) - return in_wrapper + return Signaller() if os.name == 'nt': @@ -231,6 +156,9 @@ class QEventLoop(_baseclass): """ Implementation of asyncio event loop that uses the Qt Event loop. + Parameters: + :app: Any instance of QApplication + >>> import asyncio >>> >>> app = getfixture('application') @@ -247,10 +175,11 @@ class QEventLoop(_baseclass): ... loop.run_until_complete(xplusy(2, 2)) """ - def __init__(self, app=None): + def __init__(self, app): self.__timers = [] - self.__app = app or QApplication.instance() - assert self.__app is not None, 'No QApplication has been instantiated' + if app is None: + raise ValueError("app must be an instance of QApplication") + self.__app = app self.__is_running = False self.__debug_enabled = False self.__default_executor = None @@ -258,9 +187,13 @@ def __init__(self, app=None): self._read_notifiers = {} self._write_notifiers = {} - assert self.__app is not None + qtcore = import_module('..QtCore', type(app).__module__) - super().__init__() + super().__init__(qtcore) + + self.__call_soon_signaller = signaller = _make_signaller(self._qtcore, object, tuple) + self.__call_soon_signal = signaller.signal + signaller.signal.connect(lambda callback, args: self.call_soon(callback, *args)) def run_forever(self): """Run eventloop forever.""" @@ -352,7 +285,7 @@ def upon_timeout(): handle._run() self._logger.debug('Adding callback {} with delay {}'.format(handle, delay)) - timer = QtCore.QTimer(self.__app) + timer = self._qtcore.QTimer(self.__app) timer.timeout.connect(upon_timeout) timer.setSingleShot(True) timer.start(delay * 1000) @@ -384,7 +317,7 @@ def add_reader(self, fd, callback, *args): existing.activated.disconnect() # will get overwritten by the assignment below anyways - notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read) + notifier = self._qtcore.QSocketNotifier(fd, self._qtcore.QSocketNotifier.Read) notifier.setEnabled(True) self._logger.debug('Adding reader callback for file descriptor {}'.format(fd)) notifier.activated.connect( @@ -416,7 +349,7 @@ def add_writer(self, fd, callback, *args): existing.activated.disconnect() # will get overwritten by the assignment below anyways - notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Write) + notifier = self._qtcore.QSocketNotifier(fd, self._qtcore.QSocketNotifier.Write) notifier.setEnabled(True) self._logger.debug('Adding writer callback for file descriptor {}'.format(fd)) notifier.activated.connect( @@ -472,10 +405,9 @@ def __on_notifier_ready(self, notifiers, notifier, fd, callback, args): # Methods for interacting with threads. - @_easycallback def call_soon_threadsafe(self, callback, *args): """Thread-safe version of call_soon.""" - self.call_soon(callback, *args) + self.__call_soon_signal.emit(callback, args) def run_in_executor(self, executor, callback, *args): """Run callback in executor. @@ -496,7 +428,7 @@ def run_in_executor(self, executor, callback, *args): executor = executor or self.__default_executor if executor is None: self._logger.debug('Creating default executor') - executor = self.__default_executor = QThreadExecutor() + executor = self.__default_executor = QThreadExecutor(self._qtcore) self._logger.debug('Using default executor') return asyncio.wrap_future(executor.submit(callback, *args)) diff --git a/quamash/_unix.py b/quamash/_unix.py index f6c47e2..7c0a223 100644 --- a/quamash/_unix.py +++ b/quamash/_unix.py @@ -8,7 +8,7 @@ from asyncio import selectors import collections -from . import QtCore, with_logger +from . import with_logger EVENT_READ = (1 << 0) @@ -106,11 +106,11 @@ def register(self, fileobj, events, data=None): self._fd_to_key[key.fd] = key if events & EVENT_READ: - notifier = QtCore.QSocketNotifier(key.fd, QtCore.QSocketNotifier.Read) + notifier = self._qtcore.QSocketNotifier(key.fd, self._qtcore.QSocketNotifier.Read) notifier.activated.connect(self.__on_read_activated) self.__read_notifiers[key.fd] = notifier if events & EVENT_WRITE: - notifier = QtCore.QSocketNotifier(key.fd, QtCore.QSocketNotifier.Write) + notifier = self._qtcore.QSocketNotifier(key.fd, self._qtcore.QSocketNotifier.Write) notifier.activated.connect(self.__on_write_activated) self.__write_notifiers[key.fd] = notifier @@ -186,7 +186,8 @@ def _key_from_fd(self, fd): class _SelectorEventLoop(asyncio.SelectorEventLoop): - def __init__(self): + def __init__(self, qtcore): + self._qtcore = qtcore self._signal_safe_callbacks = [] selector = _Selector(self) diff --git a/quamash/_windows.py b/quamash/_windows.py index e12c85e..2a613a0 100644 --- a/quamash/_windows.py +++ b/quamash/_windows.py @@ -15,22 +15,24 @@ import math -from . import QtCore +from . import _make_signaller from ._common import with_logger UINT32_MAX = 0xffffffff -class _ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop): +class _ProactorEventLoop(asyncio.ProactorEventLoop): """Proactor based event loop.""" - def __init__(self): - QtCore.QObject.__init__(self) - asyncio.ProactorEventLoop.__init__(self, _IocpProactor()) + def __init__(self, qtcore): + self._qtcore = qtcore + super().__init__(_IocpProactor()) - self.__event_poller = _EventPoller() - self.__event_poller.sig_events.connect(self._process_events) + self.__event_signaller = _make_signaller(self._qtcore, list) + self.__event_signal = self.__event_signaller.signal + self.__event_signal.connect(self._process_events) + self.__event_poller = _EventPoller(self.__event_signal, self._qtcore) def _process_events(self, events): """Process events from proactor.""" @@ -113,14 +115,14 @@ def _poll(self, timeout=None): @with_logger -class _EventWorker(QtCore.QThread): - def __init__(self, proactor, parent): +class _EventWorker: + def __init__(self, proactor, parent, semaphore_factory): super().__init__() self.__stop = False self.__proactor = proactor self.__sig_events = parent.sig_events - self.__semaphore = QtCore.QSemaphore() + self.__semaphore = semaphore_factory() def start(self): super().start() @@ -145,15 +147,21 @@ def run(self): @with_logger -class _EventPoller(QtCore.QObject): +class _EventPoller: """Polling of events in separate thread.""" - sig_events = QtCore.Signal(list) + def __init__(self, sig_events, qtcore): + self.sig_events = sig_events + self._qtcore = qtcore def start(self, proactor): self._logger.debug('Starting (proactor: {})...'.format(proactor)) - self.__worker = _EventWorker(proactor, self) + + class EventWorker(_EventWorker, self._qtcore.QThread): + pass + + self.__worker = EventWorker(proactor, self, self._qtcore.QSemaphore) self.__worker.start() def stop(self): diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index fa5b512..cb283c4 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -65,7 +65,6 @@ def excepthook(type, *args): orig_excepthook = sys.excepthook sys.excepthook = excepthook - lp.set_exception_handler(except_handler) request.addfinalizer(fin) return lp @@ -74,12 +73,15 @@ def excepthook(type, *args): @pytest.fixture( params=[None, quamash.QThreadExecutor, ThreadPoolExecutor, ProcessPoolExecutor] ) -def executor(request): +def executor(request, qtcore): exc_cls = request.param if exc_cls is None: return None + elif exc_cls is quamash.QThreadExecutor: + exc = exc_cls(qtcore) + else: + exc = exc_cls(1) # FIXME? fixed number of workers? - exc = exc_cls(1) # FIXME? fixed number of workers? request.addfinalizer(exc.shutdown) return exc diff --git a/tests/test_qthreadexec.py b/tests/test_qthreadexec.py index 27f0710..b9c21aa 100644 --- a/tests/test_qthreadexec.py +++ b/tests/test_qthreadexec.py @@ -6,15 +6,15 @@ @pytest.fixture -def executor(request): - exe = quamash.QThreadExecutor(5) +def executor(request, qtcore): + exe = quamash.QThreadExecutor(qtcore, 5) request.addfinalizer(exe.shutdown) return exe @pytest.fixture -def shutdown_executor(): - exe = quamash.QThreadExecutor(5) +def shutdown_executor(qtcore): + exe = quamash.QThreadExecutor(qtcore, 5) exe.shutdown() return exe @@ -33,3 +33,16 @@ def test_ctx_after_shutdown(shutdown_executor): def test_submit_after_shutdown(shutdown_executor): with pytest.raises(RuntimeError): shutdown_executor.submit(None) + + +def test_run_in_executor_without_loop(executor): + f = executor.submit(lambda x: 2 + x, 2) + r = f.result() + assert r == 4 + + +def test_run_in_executor_as_ctx_manager(qtcore): + with quamash.QThreadExecutor(qtcore) as executor: + f = executor.submit(lambda x: 2 + x, 2) + r = f.result() + assert r == 4 diff --git a/tox.ini b/tox.ini index 4b44c8c..bb4c03f 100644 --- a/tox.ini +++ b/tox.ini @@ -12,11 +12,10 @@ sitepackages=True deps= pytest py33: asyncio -commands=py.test -setenv= - pyqt4: QUAMASH_QTIMPL=PyQt4 - pyqt5: QUAMASH_QTIMPL=PyQt5 - pyside: QUAMASH_QTIMPL=PySide +commands= + pyside: py.test --qtimpl PySide + pyqt4: py.test --qtimpl PyQt4 + pyqt5: py.test --qtimpl PyQt5 [pytest] addopts=--doctest-modules quamash quamash tests