-
Notifications
You must be signed in to change notification settings - Fork 164
feat: Add WASM-based python processor #621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
5c82ab2 to
38217e5
Compare
|
this is going to be really cool! lmk if you need help |
38217e5 to
b0dd215
Compare
b0dd215 to
f212bc8
Compare
|
Thanks @aronchick! If you have time (and are feeling extra generous), I'd appreciate if you could run through the I haven't dug too deep into the limitations of the sandboxed runtime and how it responds to accessing non-permissive resources (e.g., filesystem or OS). In addition, we'd probably need to soak test the processor before making this GA in Bento since I'm unsure of performance/behavior over longer periods and under differing load profiles. |
gregfurman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jem-davies I've added a quick self-review. Let me know if you think we should be adding more documentation with what's been mentioned here.
| namespace = {"this": input_data, "root": None, "__builtins__": __builtins__, **injected_libs} | ||
| exec(compiled_code, namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this namespace and exec combintation allows us to dynamically run our compiled execution script, constraining what is accessible during runtime and ensuring no global state is ever mutated.
Also, see how we pass this and root -- leveraging the same pattern we use with bloblang scripts/mappings.
| name = lib_name.strip() | ||
| if name: | ||
| try: | ||
| injected[name] = importlib.import_module(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some python magic that allows us to dynamically extract which libraries we're making permissable to the sandboxed execution.
|
|
||
| if command -v sha256sum >/dev/null; then | ||
| (cd "$WASM_RUNTIME_DIR/runtime" && sha256sum --check --status python-3.12.0.wasm.sha256sum) | ||
| elif command -v shasum >/dev/null; then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fallback to shasum since sha256sum is not available on darwin
| wget -q "$WASM_BINARY_URL" -O "$WASM_PATH" | ||
|
|
||
| if command -v sha256sum >/dev/null; then | ||
| (cd "$WASM_RUNTIME_DIR/runtime" && sha256sum --check --status python-3.12.0.wasm.sha256sum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're downloading and embedding a WASM executable into bento, we need to verify the integrity of the executable we're intending to use. So we download and commit the SHA256 checksum, then verify the downloaded file's checksum.
| instancePool sync.Pool | ||
| } | ||
|
|
||
| func NewWasmModulePool[T api.Module](ctx context.Context, ctor constructor[T]) (*WasmModulePool[T], error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately WASM modules (at least with wazero) are not safe for concurrent executions. Instead, we create a module pool (backed by a sync.Pool) and allow the GC to dynamically spin up instances depending on load.
| } | ||
|
|
||
| func (p *WasmModulePool[T]) Put(instance T) { | ||
| _ = runtime.AddCleanup(&instance, func(inst T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the GC cleans up the instance, which is possible given sync.Pool, we need to ensure the resources are actually freed up. Hence, this gross runtime.AddCleanup hook that triggers when the module is GC'd
| p.instancePool.Put(instance) | ||
| } | ||
|
|
||
| func (p *WasmModulePool[T]) Close(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it's probably easier to just close the entire runtime to ensure all associated modules (and resources) are cleaned up, this Close will retrieve all objects from the pool and close them.
While thinking of this as a Reset or Cleanup conceptually more correct, Close feels more idiomatic 🤷
| }() | ||
|
|
||
| handshakeErr := make(chan error, 1) | ||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do a quick-and-dirty handshake to ensure the python env is up-and-running. The python module should write a "READY" signal to the output socket which we need to check is received before proceeding.
| return nil, pi.stderrBuf.Bytes(), err | ||
| } | ||
|
|
||
| header := make([]byte, 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First byte signals that the response was a success or exception. Last four bytes tell us how long the response is.
Testing InstructionsI've tested this locally and it works great! Here's how others can try it: Quick Start# 1. Checkout the PR
gh pr checkout 621
# 2. Download the Python WASM runtime (~50MB) and test configs
bash internal/impl/python/scripts/install.sh
gh gist clone https://gist.github.com/aronchick/5d9391b03f9266b994c7a36043d743a4 python-examples
# 3. Run an example (first build takes ~30-60s)
go run -tags "x_wasm" ./cmd/bento/main.go -c python-examples/hello.yamlExpected output: Example ConfigsFull test configs available in the gist: <GIST_URL>
Quick SnippetsArray processing: - python:
script: |
root = {"sum": sum(this), "doubled": [x * 2 for x in this]}With imports: - python:
imports: [math, json, re]
script: |
root = {"ceil": math.ceil(this["value"])}Filtering (set root=None to drop): - python:
script: |
root = this if this.get("status") == "active" else NoneRun Unit Testsgo test -tags "x_wasm" -v ./internal/impl/python/...All 18 tests pass (~18 seconds). Key Points
|
Motivation
Includes a new
pythonprocessor that runs within a sandboxed WASM environment.Changes
pythonprocessor that runs within a sandboxed WASM environment.WasmModulePoolininternal/impl/wasm/wasmpool/pool.gothat uses a Golangsync.Poolto execute multiple concurrent guests and have their lifecycles managed by the GC. This allows us to spin up more instances as demand increases and have the Go GC handle the guest lifecycle.entrypoint.pythat allows python executionscriptto compile and execute --stdoutandstdinis used to pass data between the guest and host.x_wasmtag is used, the WASM executable and correspondingentrypoint.pyare embedded at compile time. Using the processor without this tag panics on init.scripts/install.shthat downloads thepython-3.12.0.wasmand checks it against the SHA inruntime/python-3.12.0.wasm.sha256sum.Installation/Usage
Run the following commands to create a config, install the python runtime (and check it against the SHA), and execute a pipeline.
The output of the pipeline should be
42.TODO
x_wasmtag -- allowing the.wasmto be embedded at compile time and used in the processor.