Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.

Commit 9eccec9

Browse files
committed
Feature: Add extra spark-submit options
The following `spark-submit` options are really useful for tweaking the spark configuration/environment on a per-app/job basis: - `--packages` - `--exclude-packages` - `--repositories` - `--conf` - `--properties-file` The changes to the code that enable them are simply additional arguments to the existing API/CLI and just pass them straight through to the spark submit command generator. To make things consistent with the `spark-submit` CLI, the corresponding arguments in AZTK maintain the same names.
1 parent 72c7833 commit 9eccec9

File tree

6 files changed

+123
-0
lines changed

6 files changed

+123
-0
lines changed

aztk/node_scripts/scheduling/submit.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,16 @@ def __app_submit_cmd(application):
3333
spark_submit_cmd.add_option("--name", application.name)
3434
spark_submit_cmd.add_option("--class", application.main_class)
3535
spark_submit_cmd.add_option("--jars", jars and ",".join(jars))
36+
spark_submit_cmd.add_option("--packages", application.packages and '"{}"'.format(",".join(application.packages)))
37+
spark_submit_cmd.add_option("--exclude-packages", application.exclude_packages
38+
and '"{}"'.format(",".join(application.exclude_packages)))
39+
spark_submit_cmd.add_option("--repositories", application.repositories
40+
and '"{}"'.format(",".join(application.repositories)))
3641
spark_submit_cmd.add_option("--py-files", py_files and ",".join(py_files))
3742
spark_submit_cmd.add_option("--files", files and ",".join(files))
43+
for key, val in application.conf.items():
44+
spark_submit_cmd.add_option("--conf", '"{key}={val}"'.format(key=key, val=val))
45+
spark_submit_cmd.add_option("--properties-file", application.properties_file)
3846
spark_submit_cmd.add_option("--driver-java-options", application.driver_java_options)
3947
spark_submit_cmd.add_option("--driver-library-path", application.driver_library_path)
4048
spark_submit_cmd.add_option("--driver-class-path", application.driver_class_path)

aztk/spark/models/models.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,13 @@ def __init__(
125125
application_args=None,
126126
main_class=None,
127127
jars=None,
128+
packages=None,
129+
exclude_packages=None,
130+
repositories=None,
128131
py_files=None,
129132
files=None,
133+
conf=None,
134+
properties_file=None,
130135
driver_java_options=None,
131136
driver_library_path=None,
132137
driver_class_path=None,
@@ -141,8 +146,13 @@ def __init__(
141146
self.application_args = application_args
142147
self.main_class = main_class
143148
self.jars = jars or []
149+
self.packages = packages or []
150+
self.exclude_packages = exclude_packages or []
151+
self.repositories = repositories or []
144152
self.py_files = py_files or []
145153
self.files = files or []
154+
self.conf = conf or {}
155+
self.properties_file = properties_file
146156
self.driver_java_options = driver_java_options
147157
self.driver_library_path = driver_library_path
148158
self.driver_class_path = driver_class_path

aztk_cli/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,13 @@ def _merge_dict(self, config):
213213
application_args=application.get("application_args"),
214214
main_class=application.get("main_class"),
215215
jars=application.get("jars"),
216+
packages=application.get("packages"),
217+
exclude_packages=application.get("exclude_packages"),
218+
repositories=application.get("repositories"),
216219
py_files=application.get("py_files"),
217220
files=application.get("files"),
221+
conf=application.get("conf"),
222+
properties_file=application.get("properties_file"),
218223
driver_java_options=application.get("driver_java_options"),
219224
driver_library_path=application.get("driver_library_path"),
220225
driver_class_path=application.get("driver_class_path"),

aztk_cli/config/job.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,20 @@ job:
4040
main_class:
4141
jars:
4242
-
43+
packages:
44+
-
45+
exclude_packages:
46+
-
47+
repositories:
48+
-
4349
py_files:
4450
-
4551
files:
4652
-
53+
conf: # extra spark config options
54+
# key1: value1
55+
# key2: value2
56+
properties_file:
4757
driver_java_options:
4858
driver_library_path:
4959
driver_class_path:
@@ -59,10 +69,20 @@ job:
5969
main_class:
6070
jars:
6171
-
72+
packages:
73+
-
74+
exclude_packages:
75+
-
76+
repositories:
77+
-
6278
py_files:
6379
-
6480
files:
6581
-
82+
conf: # extra spark config options
83+
# key1: value1
84+
# key2: value2
85+
properties_file:
6686
driver_java_options:
6787
driver_library_path:
6888
driver_class_path:

aztk_cli/spark/endpoints/cluster/cluster_submit.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@
77
from aztk_cli import config, log, utils
88

99

10+
class AppendToDict(argparse.Action):
11+
def __call__(self, parser, namespace, values, option_string=None):
12+
key_vals = getattr(namespace, self.dest) or {}
13+
for key_val_str in values.replace(" ", "").split(","):
14+
key, val = key_val_str.split("=")
15+
key_vals[key] = val
16+
setattr(namespace, self.dest, key_vals)
17+
18+
1019
def setup_parser(parser: argparse.ArgumentParser):
1120
parser.add_argument("--id", dest="cluster_id", required=True, help="The unique id of your spark cluster")
1221

@@ -25,6 +34,32 @@ def setup_parser(parser: argparse.ArgumentParser):
2534
absolute path to reference files.",
2635
)
2736

37+
parser.add_argument(
38+
"--packages",
39+
help="Comma-separated list of maven coordinates of \
40+
jars to include on the driver and executor \
41+
classpaths. Will search the local maven repo, \
42+
then maven central and any additional remote \
43+
repositories given by --repositories. The \
44+
format for the coordinates should be \
45+
groupId:artifactId:version.",
46+
)
47+
48+
parser.add_argument(
49+
"--exclude-packages",
50+
help="Comma-separated list of groupId:artifactId, to \
51+
exclude while resolving the dependencies \
52+
provided in --packages to avoid dependency \
53+
conflicts.",
54+
)
55+
56+
parser.add_argument(
57+
"--repositories",
58+
help="Comma-separated list of additional remote \
59+
repositories to search for the maven \
60+
coordinates given with --packages.",
61+
)
62+
2863
parser.add_argument(
2964
"--py-files",
3065
help="Comma-separated list of .zip, .egg, or .py files \
@@ -39,6 +74,24 @@ def setup_parser(parser: argparse.ArgumentParser):
3974
absolute path ot reference files.",
4075
)
4176

77+
parser.add_argument(
78+
"--conf",
79+
action=AppendToDict,
80+
metavar='"PROP1=VAL1[,PROP2=VAL2...]"',
81+
help='Arbitrary Spark configuration property(/-ies). \
82+
Multiple --conf options can be added, either \
83+
by using multiple --conf flags or by supplying \
84+
a comma-separated list. All "PROP=VAL,..." \
85+
arguments should be wrapped in double quotes.',
86+
)
87+
88+
parser.add_argument(
89+
"--properties-file",
90+
help="Path to a file from which to load extra \
91+
properties. If not specified, this will look \
92+
for conf/spark-defaults.conf.",
93+
)
94+
4295
parser.add_argument("--driver-java-options", help="Extra Java options to pass to the driver.")
4396

4497
parser.add_argument("--driver-library-path", help="Extra library path entries to pass to the driver.")
@@ -105,6 +158,9 @@ def execute(args: typing.NamedTuple):
105158

106159
spark_client = aztk.spark.Client(config.load_aztk_secrets())
107160
jars = []
161+
packages = []
162+
exclude_packages = []
163+
repositories = []
108164
py_files = []
109165
files = []
110166

@@ -117,6 +173,15 @@ def execute(args: typing.NamedTuple):
117173
if args.files is not None:
118174
files = args.files.replace(" ", "").split(",")
119175

176+
if args.packages is not None:
177+
packages = args.packages.replace(" ", "").split(",")
178+
179+
if args.exclude_packages is not None:
180+
exclude_packages = args.exclude_packages.replace(" ", "").split(",")
181+
182+
if args.repositories is not None:
183+
repositories = args.repositories.replace(" ", "").split(",")
184+
120185
log_application(args, jars, py_files, files)
121186

122187
spark_client.cluster.submit(
@@ -127,8 +192,13 @@ def execute(args: typing.NamedTuple):
127192
application_args=args.app_args,
128193
main_class=args.main_class,
129194
jars=jars,
195+
packages=packages,
196+
exclude_packages=exclude_packages,
197+
repositories=repositories,
130198
py_files=py_files,
131199
files=files,
200+
conf=args.conf,
201+
properties_file=args.properties_file,
132202
driver_java_options=args.driver_java_options,
133203
driver_library_path=args.driver_library_path,
134204
driver_class_path=args.driver_class_path,

docs/70-jobs.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,20 @@ Each Job has one or more applications given as a List in Job.yaml. Applications
2020
main_class:
2121
jars:
2222
-
23+
packages:
24+
-
25+
exclude_packages:
26+
-
27+
repositories:
28+
-
2329
py_files:
2430
-
2531
files:
2632
-
33+
conf:
34+
# key1: value1
35+
# key2: value2
36+
properties_file:
2737
driver_java_options:
2838
-
2939
driver_library_path:

0 commit comments

Comments
 (0)