Distributed/parallel computing in modern Python based on the multiprocessing.Pool API (map, imap, imap_unordered).
The purpose of achilles is to make distributed/parallel computing as easy as possible by limiting the required configuration, hiding the details (server/node/controller architecture) and exposing a simple interface based on the popular multiprocessing.Pool API.
achillesprovides developers with entry-level capabilities for concurrency across a network of machines (see PEP 372 on the intent behind addingmultiprocessingto the standard library -> https://www.python.org/dev/peps/pep-0371/) using a server/node/controller architecture.
The achilles_server, achilles_node and achilles_controller are designed to run cross-platform/cross-architecture. The server/node/controller may be hosted on a single machine (for development) or deployed across heterogeneous resources.
achilles is comparable to excellent Python packages like pathos/pyina, Parallel Python and SCOOP, but different in certain ways:
- Designed for developers familiar with the
multiprocessingmodule in the standard library with simplicity and ease of use in mind. - In addition to the blocking
mapAPI which requires that developers wait for all computation to be finished before accessing results (common in such packages),imap/imap_unorderedallow developers to process results as they are returned to theachilles_controllerby theachilles_server. achillesallows for composable scalability and novel design patterns as:- Iterables including lists, lists of lists and generator functions (as first-class object - generator expressions will not work as generators cannot be serialized by
pickle/dill) are accepted as arguments.- TIP: Use generator functions together with
imaporimap_unorderedto perform distributed computation on arbitrarily large data.
- TIP: Use generator functions together with
- The
dillserializer is used to transfer data between the server/node/controller andmultiprocess(fork ofmultiprocessingthat uses thedillserializer instead ofpickle) is used to performPool.mapon theachilles_nodes, so developers are freed from some of the constraints of thepickleserializer.
- Iterables including lists, lists of lists and generator functions (as first-class object - generator expressions will not work as generators cannot be serialized by
pip install achilles
Start an achilles_server listening for connections from achilles_nodes at a certain endpoint specified as arguments or in an .env file in the achilles package's directory.
Then simply import map, imap, and/or imap_unordered from achilles_main and use them dynamically in your own code (under the hood they create and close achilles_controllers).
map, imap and imap_unordered will distribute your function to each achilles_node connected to the achilles_server. Then, the achilles_server will distribute arguments to each achilles_node (load balanced and made into a list of arguments if the arguments' type is not already a list) which will then perform your function on the arguments using multiprocess.Pool.map.
Each achilles_node finishes its work, returns the results to the achilles_server and waits to receive another argument. This process is repeated until all of the arguments have been exhausted.
-
runAchillesServer(host=None, port=None, username=None, secret_key=None)-> run on your local machine or on another machine connected to your networkin:from achilles.lineReceiver.achilles_server import runAchillesServer # host = IP address of the achilles_server # port = port to listen on for connections from achilles_nodes (must be an int) # username, secret_key used for authentication with achilles_controller runAchillesServer(host='127.0.0.1', port=9999, username='foo', secret_key='bar')
# OR generate an .env file with a default configuration so that # arguments are no longer required to runAchillesServer() # use genConfig() to overwrite from achilles.lineReceiver.achilles_server import runAchillesServer, genConfig genConfig(host='127.0.0.1', port=9999, username='foo', secret_key='bar') runAchillesServer()
out:ALERT: achilles_server initiated at 127.0.0.1:9999 Listening for connections... -
runAchillesNode(host=None, port=None)-> run on your local machine or on another machine connected to your networkin:from achilles.lineReceiver.achilles_node import runAchillesNode # genConfig() is also available in achilles_node, but only expects host and port arguments runAchillesNode(host='127.0.0.1', port=9999)
out:GREETING: Welcome! There are currently 1 open connections. Connected to achilles_server running at 127.0.0.1:9999 CLIENT_ID: 0 -
Examples of how to use the 3 most commonly used
multiprocessing.Poolmethods inachilles:Note:
map,imapandimap_unorderedcurrently accept iterables including - but not limited - to lists, lists of lists, and generator functions asachilles_args.Also note: if there isn't already a
.envconfiguration file in theachillespackage directory, must usegenConfig(host, port, username, secret_key)before using or includehost,port,usernameandsecret_keyas arguments when usingmap,imap,imap_unordered.-
map(func, args, callback=None, chunksize=1, host=None, port=None, username=None, secret_key=None)
in:from achilles.lineReceiver.achilles_main import map def achilles_function(arg): return arg ** 2 def achilles_callback(result): return result ** 2 if __name__ == "__main__": results = map(achilles_function, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], achilles_callback, chunksize=1) print(results)
out:ALERT: Connection to achilles_server at 127.0.0.1:9999 and authentication successful. [[1, 16, 81, 256, 625, 1296, 2401, 4096], [6561, 10000]] -
imap(func, args, callback=None, chunksize=1, host=None, port=None, username=None, secret_key=None)
in:from achilles.lineReceiver.achilles_main import imap def achilles_function(arg): return arg ** 2 def achilles_callback(result): return result ** 2 if __name__ == "__main__": for result in imap(achilles_function, [1,2,3,4,5,6,7,8,9,10], achilles_callback, chunksize=1): print(result)
out:ALERT: Connection to achilles_server at 127.0.0.1:9999 and authentication successful. {'ARGS_COUNTER': 0, 'RESULT': [1, 16, 81, 256, 625, 1296, 2401, 4096]} {'ARGS_COUNTER': 8, 'RESULT': [6561, 10000]} -
imap_unordered(func, args, callback=None, chunksize=1, host=None, port=None, username=None, secret_key=None)
in:from achilles.lineReceiver.achilles_main import imap_unordered def achilles_function(arg): return arg ** 2 def achilles_callback(result): return result ** 2 if __name__ == "__main__": for result in imap_unordered(achilles_function, [1,2,3,4,5,6,7,8,9,10], achilles_callback, chunksize=1): print(result)
out:ALERT: Connection to achilles_server at 127.0.0.1:9999 and authentication successful. {'ARGS_COUNTER': 8, 'RESULT': [6561, 10000]} {'ARGS_COUNTER': 0, 'RESULT': [1, 16, 81, 256, 625, 1296, 2401, 4096]}
-
Twisted- An event-driven networking engine written in Python and MIT licensed.
dilldillextends Python’spicklemodule for serializing and de-serializing Python objects to the majority of the built-in Python types.
multiprocess- multiprocess is a fork of multiprocessing that uses
dillinstead ofpicklefor serialization.multiprocessingis a package for the Python language which supports the spawning of processes using the API of the standard library’s threading module.
- multiprocess is a fork of multiprocessing that uses
See the examples directory for tutorials on various use cases, including:
- Square numbers/run multiple jobs sequentially
- Word count (TO DO)
from achilles.lineReceiver.achilles_main import killCluster
# simply use the killCluster() command and verify your intent at the prompt
# killCluster() will search for an .env configuration file in the achilles package's directory
# if it does not exist, specify host, port, username and secret_key as arguments
# a command is sent to all connected achilles_nodes to stop the Twisted reactor and exit() the process
# optionally, you can pass command_verified=True to proceed directly with killing the cluster
killCluster(command_verified=True)achilles_nodes use all of the CPU cores available on the host machine to performmultiprocess.Pool.map(pool = multiprocess.Pool(multiprocess.cpu_count())).achillesleaves it up to the developer to ensure that the correct packages are installed onachilles_nodes to perform the function distributed by theachilles_serveron behalf of theachilles_controller. Current recommended solution is to SSH into each machine andpip installarequirements.txtfile.- All import statements required by the developer's function, arguments and callback must be included in the definition of the function.
- The
achilles_serveris currently designed to handle one job at a time. For more complicated projects, I highly recommend checking outDask(especiallydask.distributed) and learning more about directed acyclic graphs (DAGs). - Fault tolerance: if some
achilles_nodedisconnects before returning expected results, the argument will be distributed to anotherachilles_nodefor computation instead of being lost. callback_errorargument has yet to be implemented, so detailed information regarding errors can only be gleaned from the interpreter used to launch theachilles_server,achilles_nodeorachilles_controller. Deploying the server/node/controller on a single machine is recommended for development.achillesperforms load balancing at runtime and assignsachilles_nodes arguments bycpu_count*chunksize.- Default
chunksizeis 1. - Increasing the
chunksizeis an easy way to speed up computation and reduce the amount of time spent transferring data between the server/node/controller.
- Default
- If your arguments are already lists, the
chunksizeargument is not used.- Instead, one argument/list will be distributed to the connected
achilles_nodes at a time.
- Instead, one argument/list will be distributed to the connected
- If your arguments are load balanced, the results returned are contained in lists of length
achilles_node's cpu_count*chunksize.map:- Final result of
mapis an ordered list of load balanced lists (the final result is not flattened).
- Final result of
imap:- Results are returned as computation is finished in dictionaries that include the following keys:
RESULT: load balanced list of results.ARGS_COUNTER: index of first argument (0-indexed).
- Results are ordered.
- The first result will correspond to the next result after the last result in the preceding results packet's list of results.
- Likely to be slower than
immap_unordereddue toachilles_controlleryielding ordered results.imap_unordered(see below) yields results as they are received, whileimapyields results as they are received only if the argument'sARGS_COUNTERis expected based on the length of theRESULTlist in the preceding results packet. Otherwise, aresult_bufferis checked for the results packet with the expectedARGS_COUNTERand the current results packet is added to theresult_buffer. If it is not found,achilles_controllerwill not yield results until a results packet with the expectedARGS_COUNTERis received.
- Results are returned as computation is finished in dictionaries that include the following keys:
imap_unordered:- Results are returned as computation is finished in dictionaries that include the following keys:
RESULT: load balanced list of results.ARGS_COUNTER: index of first argument (0-indexed).
- Results are not ordered.
- Results packets are yielded as they are received (after any
achilles_callbackhas been performed on it). - Fastest way of consuming results received from the
achilles_server.
- Results packets are yielded as they are received (after any
- Results are returned as computation is finished in dictionaries that include the following keys:
achilles is in the early stages of active development and your suggestions/contributions are kindly welcomed.
achilles is written and maintained by Alejandro Peña. Email me at adpena at gmail dot com.