Skip to content
This repository was archived by the owner on Feb 4, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ install:
jupyter serverextension enable --py jupyter_spark
jupyter nbextension install --py jupyter_spark
jupyter nbextension enable --py jupyter_spark
@echo ""
@echo "NOTE: Copy ./src/magic/spark_progress.py into the startup folder, e.g. ~/.ipython/profile_default/startup/"
@echo ""

uninstall:
jupyter serverextension disable --py jupyter_spark
jupyter nbextension disable --py jupyter_spark
jupyter nbextension uninstall --py jupyter_spark
pip uninstall -y jupyter-spark
@echo ""
@echo "NOTE: Remove spark_progress.py from startup folder, e.g. ~/.ipython/profile_default/startup/"
@echo ""

clean: uninstall
rm -rf dist/
Expand Down
26 changes: 23 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,33 @@ pip uninstall jupyter-spark

## Configuration

To change the URL of the Spark API that the job metadata is fetched from
override the `Spark.url` config value, e.g. on the command line:
The Spark API that the job metadata is fetched from can be different for each SparkContext. As default, for the first Spark context uses port 4040, for the second 4041 and so on. If however `spark.ui.port` is set to 0 in SparkConf, Spark will choose a random ephemeral port for the API.

In order to support this behaviour (and allow more than one tab in Jupyter with a SparkContext) load the jupyter spark extension

```python
%load_ext jupyter_spark
```
jupyter notebook --Spark.url="http://localhost:4040"

For Spark 2 use the provided magic without parameter

```python
%spark_progress
```
For Spark 1 provide the Spark API URL (e.g. `http://localhost:4040`)

```python
%spark_progress http://<spark-api-server>:<port>
```

To turn it off again use

```python
%spark_progress None
```

Note, these commands are line magics and need to have their own cell.

## Example

There is a simple `pyspark` example included in `examples` to confirm that your
Expand Down
42 changes: 33 additions & 9 deletions examples/Jupyter Spark example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,40 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"`partitions` is the number of spark workers to partition the work into."
"Load the jupyter spark extension and turn on the Spark progress monitor"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Spark progress monitoring turned on\n"
]
}
],
"source": [
"%load_ext jupyter_spark"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`partitions` is the number of spark workers to partition the work into."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"partitions = 2"
"partitions = 5"
]
},
{
Expand All @@ -72,7 +96,7 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -88,7 +112,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -107,7 +131,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -119,14 +143,14 @@
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pi is roughly 3.141880\n"
"Pi is roughly 3.141669\n"
]
}
],
Expand All @@ -143,7 +167,7 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -153,7 +177,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3.6",
"language": "python",
"name": "python3"
},
Expand Down
9 changes: 9 additions & 0 deletions src/jupyter_spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ def load_jupyter_server_extension(nbapp): # pragma: no cover
[(spark.proxy_url + '.*', SparkHandler, {'spark': spark})]
)
nbapp.log.info("Jupyter-Spark enabled!")


def load_ipython_extension(ip):
from .magic import SparkProgress

ip.register_magics(SparkProgress)

# Immediately start Spark Progress
SparkProgress().init()
13 changes: 10 additions & 3 deletions src/jupyter_spark/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@ def get(self):
the verbatim response.
"""
http = httpclient.AsyncHTTPClient()
url = self.spark.backend_url(self.request)
self.spark.log.debug('Fetching from Spark %s', url)
http.fetch(url, self.handle_response)
spark_url = self.get_argument("spark_url", None)
if spark_url is not None:
url = self.spark.backend_url(spark_url, self.request.path)
http.fetch(url, self.handle_response)
else:
content_type = 'application/json'
content = json.dumps({'error': 'SPARK_URL_MISSING'})
self.set_header('Content-Type', content_type)
self.write(content)
self.finish()

def handle_response(self, response):
if response.error:
Expand Down
37 changes: 37 additions & 0 deletions src/jupyter_spark/magic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import print_function

from ipykernel.comm import Comm
from IPython.core.magic import Magics, line_magic, magics_class


@magics_class
class SparkProgress(Magics):

def init(self, line=""):
"""Start Spark progress if possible"""
comm = Comm(target_name='spark_comm')
if line.startswith("http"):
url = line
else:
from pyspark import SparkContext
# Using an internal API to avoid asking the user for the SparkContext variable
# TODO: Try to find a way without using an internal API
sc = SparkContext._active_spark_context
if sc is not None:
url = sc.uiWebUrl
else:
url = None

comm.send({'uiWebUrl': url})

if url is None:
print("No Spark Context found")
else:
print("Spark progress monitoring turned on")

comm.close()

@line_magic
def spark_progress(self, line):
"""Toggle Spark progress being shown under cells"""
self.init(line)
11 changes: 3 additions & 8 deletions src/jupyter_spark/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ class Spark(LoggingConfigurable):
A config object that is able to replace URLs of the Spark frontend
on the fly.
"""
url = Unicode(
'http://localhost:4040',
help='The URL of Spark API',
).tag(config=True)

proxy_root = Unicode(
'/spark',
help='The URL path under which the Spark API will be proxied',
Expand All @@ -43,9 +38,9 @@ def __init__(self, *args, **kwargs):
super(Spark, self).__init__(*args, **kwargs)
self.proxy_url = url_path_join(self.base_url, self.proxy_root)

def backend_url(self, request):
request_path = request.uri[len(self.proxy_url):]
return url_path_join(self.url, request_path)
def backend_url(self, url, path):
request_path = path[len(self.proxy_url):]
return url_path_join(url, request_path)

def replace(self, content):
"""
Expand Down
64 changes: 40 additions & 24 deletions src/jupyter_spark/static/extension.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var cache = [];
var current_update_frequency;

var spark_is_running = false;
var spark_ui_url = null;
var cell_queue = [];
var current_cell;
var cell_jobs_counter = 0;
Expand All @@ -28,31 +29,36 @@ var update_cache = function(api_url, callbacks) {
cbs = $.Callbacks();
cbs.add(callbacks);
}
$.getJSON(api_url + '/applications').done(function(applications) {
var num_applications = cache.length;
var num_completed = 0;
// Check if Spark is running before processing applications
if(!applications.hasOwnProperty('error')){
spark_is_running = true;
applications.forEach(function(application, i) {
$.getJSON(api_url + '/applications/' + application.id + '/jobs').done(function (jobs) {
cache[i] = application;
cache[i].jobs = jobs;

num_completed++;
if (num_completed === num_applications && cbs) {
cbs.fire(cache);
}
// Update progress bars if jobs have been run and there are cells to be updated
if (jobs.length > jobs_in_cache && cell_queue.length > 0 ) {
$(document).trigger('update.progress.bar');
}
if (spark_ui_url != null) {
var build_url = function (api_url, path) {
return api_url + path + '?spark_url=' + escape(spark_ui_url)
};
$.getJSON(build_url(api_url, '/applications')).done(function(applications) {
var num_applications = cache.length;
var num_completed = 0;
// Check if Spark is running before processing applications
if(!applications.hasOwnProperty('error')){
spark_is_running = true;
applications.forEach(function(application, i) {
$.getJSON(build_url(api_url, '/applications/' + application.id + '/jobs')).done(function (jobs) {
cache[i] = application;
cache[i].jobs = jobs;

num_completed++;
if (num_completed === num_applications && cbs) {
cbs.fire(cache);
}
// Update progress bars if jobs have been run and there are cells to be updated
if (jobs.length > jobs_in_cache && cell_queue.length > 0 ) {
$(document).trigger('update.progress.bar');
}
});
});
});
} else {
spark_is_running = false;
}
});
} else {
spark_is_running = false;
}
});
}
};

var update_dialog_contents = function() {
Expand Down Expand Up @@ -155,6 +161,16 @@ define([
var base_url = utils.get_body_data('baseUrl') || '/';
var api_url = base_url + 'spark/api/v1';

Jupyter.notebook.events.on('kernel_ready.Kernel', function () {
console.log("jupyter_spark: kernel ready, register comm target");
Jupyter.notebook.kernel.comm_manager.register_target('spark_comm', function (comm, msg) {
comm.on_msg(function (msg) {
console.log("spark_comm: spark_ui_url = " + msg.content.data.uiWebUrl)
spark_ui_url = msg.content.data.uiWebUrl;
});
});
});

var show_running_jobs = function() {
var element = $('<div/>').attr('id', 'dialog_contents');
dialog.modal({
Expand Down
Loading