diff --git a/Makefile b/Makefile index 41d450f..bc06e80 100644 --- a/Makefile +++ b/Makefile @@ -4,12 +4,18 @@ install: jupyter serverextension enable --py jupyter_spark jupyter nbextension install --py jupyter_spark jupyter nbextension enable --py jupyter_spark + @echo "" + @echo "NOTE: Copy ./src/magic/spark_progress.py into the startup folder, e.g. ~/.ipython/profile_default/startup/" + @echo "" uninstall: jupyter serverextension disable --py jupyter_spark jupyter nbextension disable --py jupyter_spark jupyter nbextension uninstall --py jupyter_spark pip uninstall -y jupyter-spark + @echo "" + @echo "NOTE: Remove spark_progress.py from startup folder, e.g. ~/.ipython/profile_default/startup/" + @echo "" clean: uninstall rm -rf dist/ diff --git a/README.md b/README.md index 928fd75..579ece5 100644 --- a/README.md +++ b/README.md @@ -77,13 +77,33 @@ pip uninstall jupyter-spark ## Configuration -To change the URL of the Spark API that the job metadata is fetched from -override the `Spark.url` config value, e.g. on the command line: +The Spark API that the job metadata is fetched from can be different for each SparkContext. As default, for the first Spark context uses port 4040, for the second 4041 and so on. If however `spark.ui.port` is set to 0 in SparkConf, Spark will choose a random ephemeral port for the API. +In order to support this behaviour (and allow more than one tab in Jupyter with a SparkContext) load the jupyter spark extension + +```python +%load_ext jupyter_spark ``` -jupyter notebook --Spark.url="http://localhost:4040" + +For Spark 2 use the provided magic without parameter + +```python +%spark_progress +``` +For Spark 1 provide the Spark API URL (e.g. `http://localhost:4040`) + +```python +%spark_progress http://: ``` +To turn it off again use + +```python +%spark_progress None +``` + +Note, these commands are line magics and need to have their own cell. + ## Example There is a simple `pyspark` example included in `examples` to confirm that your diff --git a/examples/Jupyter Spark example.ipynb b/examples/Jupyter Spark example.ipynb index dd6b9d7..570c15a 100644 --- a/examples/Jupyter Spark example.ipynb +++ b/examples/Jupyter Spark example.ipynb @@ -51,16 +51,40 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "`partitions` is the number of spark workers to partition the work into." + "Load the jupyter spark extension and turn on the Spark progress monitor" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Spark progress monitoring turned on\n" + ] + } + ], + "source": [ + "%load_ext jupyter_spark" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`partitions` is the number of spark workers to partition the work into." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, "outputs": [], "source": [ - "partitions = 2" + "partitions = 5" ] }, { @@ -72,7 +96,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ @@ -88,7 +112,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 6, "metadata": {}, "outputs": [], "source": [ @@ -107,7 +131,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 7, "metadata": {}, "outputs": [], "source": [ @@ -119,14 +143,14 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Pi is roughly 3.141880\n" + "Pi is roughly 3.141669\n" ] } ], @@ -143,7 +167,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -153,7 +177,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3.6", "language": "python", "name": "python3" }, diff --git a/src/jupyter_spark/__init__.py b/src/jupyter_spark/__init__.py index e057c84..c479dc7 100644 --- a/src/jupyter_spark/__init__.py +++ b/src/jupyter_spark/__init__.py @@ -40,3 +40,12 @@ def load_jupyter_server_extension(nbapp): # pragma: no cover [(spark.proxy_url + '.*', SparkHandler, {'spark': spark})] ) nbapp.log.info("Jupyter-Spark enabled!") + + +def load_ipython_extension(ip): + from .magic import SparkProgress + + ip.register_magics(SparkProgress) + + # Immediately start Spark Progress + SparkProgress().init() diff --git a/src/jupyter_spark/handlers.py b/src/jupyter_spark/handlers.py index a82b413..a8f34a8 100644 --- a/src/jupyter_spark/handlers.py +++ b/src/jupyter_spark/handlers.py @@ -18,9 +18,16 @@ def get(self): the verbatim response. """ http = httpclient.AsyncHTTPClient() - url = self.spark.backend_url(self.request) - self.spark.log.debug('Fetching from Spark %s', url) - http.fetch(url, self.handle_response) + spark_url = self.get_argument("spark_url", None) + if spark_url is not None: + url = self.spark.backend_url(spark_url, self.request.path) + http.fetch(url, self.handle_response) + else: + content_type = 'application/json' + content = json.dumps({'error': 'SPARK_URL_MISSING'}) + self.set_header('Content-Type', content_type) + self.write(content) + self.finish() def handle_response(self, response): if response.error: diff --git a/src/jupyter_spark/magic.py b/src/jupyter_spark/magic.py new file mode 100644 index 0000000..e8e7fbe --- /dev/null +++ b/src/jupyter_spark/magic.py @@ -0,0 +1,37 @@ +from __future__ import print_function + +from ipykernel.comm import Comm +from IPython.core.magic import Magics, line_magic, magics_class + + +@magics_class +class SparkProgress(Magics): + + def init(self, line=""): + """Start Spark progress if possible""" + comm = Comm(target_name='spark_comm') + if line.startswith("http"): + url = line + else: + from pyspark import SparkContext + # Using an internal API to avoid asking the user for the SparkContext variable + # TODO: Try to find a way without using an internal API + sc = SparkContext._active_spark_context + if sc is not None: + url = sc.uiWebUrl + else: + url = None + + comm.send({'uiWebUrl': url}) + + if url is None: + print("No Spark Context found") + else: + print("Spark progress monitoring turned on") + + comm.close() + + @line_magic + def spark_progress(self, line): + """Toggle Spark progress being shown under cells""" + self.init(line) diff --git a/src/jupyter_spark/spark.py b/src/jupyter_spark/spark.py index 38888fc..de2c17e 100644 --- a/src/jupyter_spark/spark.py +++ b/src/jupyter_spark/spark.py @@ -28,11 +28,6 @@ class Spark(LoggingConfigurable): A config object that is able to replace URLs of the Spark frontend on the fly. """ - url = Unicode( - 'http://localhost:4040', - help='The URL of Spark API', - ).tag(config=True) - proxy_root = Unicode( '/spark', help='The URL path under which the Spark API will be proxied', @@ -43,9 +38,9 @@ def __init__(self, *args, **kwargs): super(Spark, self).__init__(*args, **kwargs) self.proxy_url = url_path_join(self.base_url, self.proxy_root) - def backend_url(self, request): - request_path = request.uri[len(self.proxy_url):] - return url_path_join(self.url, request_path) + def backend_url(self, url, path): + request_path = path[len(self.proxy_url):] + return url_path_join(url, request_path) def replace(self, content): """ diff --git a/src/jupyter_spark/static/extension.js b/src/jupyter_spark/static/extension.js index 3b721ea..b5baf56 100644 --- a/src/jupyter_spark/static/extension.js +++ b/src/jupyter_spark/static/extension.js @@ -11,6 +11,7 @@ var cache = []; var current_update_frequency; var spark_is_running = false; +var spark_ui_url = null; var cell_queue = []; var current_cell; var cell_jobs_counter = 0; @@ -28,31 +29,36 @@ var update_cache = function(api_url, callbacks) { cbs = $.Callbacks(); cbs.add(callbacks); } - $.getJSON(api_url + '/applications').done(function(applications) { - var num_applications = cache.length; - var num_completed = 0; - // Check if Spark is running before processing applications - if(!applications.hasOwnProperty('error')){ - spark_is_running = true; - applications.forEach(function(application, i) { - $.getJSON(api_url + '/applications/' + application.id + '/jobs').done(function (jobs) { - cache[i] = application; - cache[i].jobs = jobs; - - num_completed++; - if (num_completed === num_applications && cbs) { - cbs.fire(cache); - } - // Update progress bars if jobs have been run and there are cells to be updated - if (jobs.length > jobs_in_cache && cell_queue.length > 0 ) { - $(document).trigger('update.progress.bar'); - } + if (spark_ui_url != null) { + var build_url = function (api_url, path) { + return api_url + path + '?spark_url=' + escape(spark_ui_url) + }; + $.getJSON(build_url(api_url, '/applications')).done(function(applications) { + var num_applications = cache.length; + var num_completed = 0; + // Check if Spark is running before processing applications + if(!applications.hasOwnProperty('error')){ + spark_is_running = true; + applications.forEach(function(application, i) { + $.getJSON(build_url(api_url, '/applications/' + application.id + '/jobs')).done(function (jobs) { + cache[i] = application; + cache[i].jobs = jobs; + + num_completed++; + if (num_completed === num_applications && cbs) { + cbs.fire(cache); + } + // Update progress bars if jobs have been run and there are cells to be updated + if (jobs.length > jobs_in_cache && cell_queue.length > 0 ) { + $(document).trigger('update.progress.bar'); + } + }); }); - }); - } else { - spark_is_running = false; - } - }); + } else { + spark_is_running = false; + } + }); + } }; var update_dialog_contents = function() { @@ -155,6 +161,16 @@ define([ var base_url = utils.get_body_data('baseUrl') || '/'; var api_url = base_url + 'spark/api/v1'; + Jupyter.notebook.events.on('kernel_ready.Kernel', function () { + console.log("jupyter_spark: kernel ready, register comm target"); + Jupyter.notebook.kernel.comm_manager.register_target('spark_comm', function (comm, msg) { + comm.on_msg(function (msg) { + console.log("spark_comm: spark_ui_url = " + msg.content.data.uiWebUrl) + spark_ui_url = msg.content.data.uiWebUrl; + }); + }); + }); + var show_running_jobs = function() { var element = $('
').attr('id', 'dialog_contents'); dialog.modal({ diff --git a/tests/test_spark.py b/tests/test_spark.py index 0870e62..c910c52 100644 --- a/tests/test_spark.py +++ b/tests/test_spark.py @@ -1,4 +1,9 @@ # -*- coding: utf-8 -*- +try: + from urllib.parse import quote # Python 3 +except ImportError: + from urllib import quote # Python 2 + import pytest import six import tornado @@ -45,14 +50,20 @@ def get_app(self): (FakeVerbatimHandler.handler_root, FakeVerbatimHandler), ]) - def test_http_fetch_error(self): + def test_http_fetch_error_url_missing(self): response = self.fetch(self.spark.proxy_root) self.assertEqual(response.code, 200) + self.assertIn(six.b('SPARK_URL_MISSING'), response.body) + + def test_http_fetch_error_not_running(self): + spark_ui_url = "http://localhost:4040" + response = self.fetch(self.spark.proxy_root + "?spark_url=%s" % quote(spark_ui_url)) + self.assertEqual(response.code, 200) self.assertIn(six.b('SPARK_NOT_RUNNING'), response.body) def test_http_fetch_replace_success(self): - self.spark.url = self.spark.base_url + FakeReplaceHandler.handler_root - response = self.fetch(self.spark.proxy_root) + url = self.spark.base_url + FakeReplaceHandler.handler_root + response = self.fetch(self.spark.proxy_root + "?spark_url=%s" % quote(url)) self.assertEqual(response.code, 200) self.assertNotEqual(response.body, FakeReplaceHandler.RESPONSE) self.assertEqual(response.body, FakeReplaceHandler.REPLACED) @@ -60,8 +71,8 @@ def test_http_fetch_replace_success(self): FakeReplaceHandler.CONTENT_TYPE) def test_http_fetch_verbatim_success(self): - self.spark.url = self.spark.base_url + FakeVerbatimHandler.handler_root - response = self.fetch(self.spark.proxy_root) + url = self.spark.base_url + FakeVerbatimHandler.handler_root + response = self.fetch(self.spark.proxy_root + "?spark_url=%s" % quote(url)) self.assertEqual(response.code, 200) self.assertEqual(response.body, FakeVerbatimHandler.RESPONSE) self.assertEqual(response.headers['Content-Type'], @@ -70,10 +81,10 @@ def test_http_fetch_verbatim_success(self): def test_spark_backend_url(self): class FakeRequest(object): # http://localhost:8888/spark/api - uri = self.spark.base_url + self.spark.proxy_root + '/api' + path = self.spark.proxy_url + '/api' fake_request = FakeRequest() - self.assertEqual(self.spark.backend_url(fake_request), - self.spark.url + '/api') + self.assertEqual(self.spark.backend_url("http://localhost:4040", fake_request.path), + "http://localhost:4040/api") @pytest.mark.parametrize('content', [