Skip to content

It runs and tracks jobs without a daemon. This is featherweight CLI job management.

License

Notifications You must be signed in to change notification settings

JustinGirard/nodejobs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

112 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Introduction

The nodejobs repository runs, tracks, and logs data about external commands without requiring a persistent daemon.

tests Linty

image

As for the code, its clean and extensible-- it has been for us at least. Core components such as BaseData, JobRecord, and JobFilter define and validate the schema for each job records, helping to prevent key-mismatch errors and ensuring data consistency. Status updates occur whenever you invoke a job command, so there’s no background service to manage. Common use cases include automated install scripts, deployment tasks, and data-transfer operations. Designed for minimalism and extensibility, nodejobs can function as a standalone utility or as the foundation for a bespoke job-management solution. If you are looking for a small job running to build on top of, this might be a good fit. Its large enough to have structure, and safety, but small enough you can choose what you want to add in.

versions

  • 0.1.0 - Initial release
  • 0.2.0 - Better quality linting, and pip package deploy
  • 0.3.0 - Improvement in stop behaviour with process trees. Safer recursive process cleanup.
  • 0.4.0 - (in progress) streaming support and testing (not on pip yet, in evaluation)

Install

pip install nodejobs
# or
python -m pip install nodejobs

Use

import sys
from nodejobs import Jobs, JobRecord

# Create a Jobs manager with a specified database path
jobs_manager = Jobs(db_path="/path/to/job_db/dir")

# Starts your job -- its status is returned as job_record
# Tip: use unbuffered Python (-u) for real-time streaming
job_record = jobs_manager.run(command=[sys.executable, "-u", "script.py"], job_id="job_001")

# Stream live output (optional). Yields stdout/stderr/status/heartbeat events
for ev in jobs_manager.bind(job_id="job_001", include=("stdout","stderr"), from_beginning=True, heartbeat_interval=5.0):
    if ev.type == "stdout":
        print(ev.text, end="")
    elif ev.type == "stderr":
        print(ev.text, end="")
    elif ev.type == "status" and ev.status in (
        JobRecord.Status.c_finished,
        JobRecord.Status.c_stopped,
        JobRecord.Status.c_failed,
        JobRecord.Status.c_failed_start,
    ):
        break

# Pull and verify job status
job_record: JobRecord = jobs_manager.get_status(job_id="job_001")
assert job_record.status == JobRecord.Status.c_finished

# How to stop a job
stdout, stderr = jobs_manager.job_logs(job_id="job_001")
jobs_manager.stop(job_id="job_001")

Live Streaming (Bind + SSE)

You can stream live job output as it is produced.

  • Local generator (no web server needed): Jobs.bind(job_id, ...) yields events (stdout, stderr, status, heartbeat) with monotonic seq and ISO timestamps.
  • HTTP streaming (SSE): Jobs.sse(job_id, ...) yields correctly framed Server-Sent Events. Works with browsers (EventSource) and Python (sseclient-py).

Quick start (local bind):

import sys
from nodejobs import Jobs

jobs = Jobs(db_path="./job_db")
job_id = "demo1"

# Start a job that prints numbers unbuffered
jobs.run([sys.executable, "-u", "-c", "import time,sys; [print(i,flush=True) or time.sleep(0.5) for i in range(1,50)]"], job_id)

for ev in jobs.bind(job_id, include=("stdout","stderr"), from_beginning=True, poll_interval=0.1, heartbeat_interval=5.0):
    if ev.type == "stdout":
        print(">", ev.text, end="")
    elif ev.type == "stderr":
        print("!>", ev.text, end="")
    elif ev.type == "status":
        print("[status]", ev.status)

HTTP (SSE) example (FastAPI):

from fastapi import FastAPI
from starlette.responses import StreamingResponse
from nodejobs import Jobs

app = FastAPI()
jobs = Jobs(db_path="./job_db")

@app.get("/jobs/{job_id}/stream")
def stream(job_id: str):
    headers = {
        "Cache-Control": "no-cache",
        "X-Accel-Buffering": "no",
    }
    return StreamingResponse(
        jobs.sse(job_id, include=("stdout","stderr"), heartbeat_interval=5.0),
        media_type="text/event-stream",
        headers=headers,
    )

Browser:

<script>
const es = new EventSource("/jobs/demo1/stream");
es.addEventListener("stdout", e => {
  const payload = JSON.parse(e.data);
  console.log("STDOUT:", payload.text);
});
es.addEventListener("stderr", e => { /* ... */ });
es.addEventListener("status", e => { /* ... */ });
</script>

Python client (optional dependency):

from nodejobs import Jobs
try:
    for event in Jobs.subscribe_sse("http://127.0.0.1:8000/jobs/demo1/stream"):
        print(event.event, event.id, event.data)
except ImportError as e:
    # [stream] events stream error: sseclient-py is required for event streaming
    print(e)

Event types:

  • stdout, stderr: streamed chunks of output
  • status: job state transitions
  • heartbeat: periodic keep-alive (default every 5s)

Resume: The stream emits id: <seq>. Browsers automatically send Last-Event-ID on reconnect. Servers should pass that through to jobs.sse(..., last_event_id=...) so streaming resumes at the next id.

Troubleshooting Streaming & Buffering

If you see output delivered in large clumps “at the end”, the job process is likely buffering:

  • Prefer unbuffered modes: Python -u + print(..., flush=True), grep --line-buffered, etc.
  • POSIX (optional): running under a PTY often forces line-buffering (not available on Windows).
  • HTTP proxies: ensure Content-Type: text/event-stream, Cache-Control: no-cache, and disable proxy buffering (e.g., X-Accel-Buffering: no for Nginx).

If your test prints do not appear live:

  • Run the test runner in unbuffered mode: python -u -m unittest -v
  • Add flush=True to your print() calls: print("x", end="-", flush=True)

Motivation

It felt silly to write yet another job runner, however I always felt like I needed something more than subprocess, but something way less complex than a full on task managent solution. Importantly, I write code that works on edge devices, and so working towards pi and micropython support is important for me as well. Overall, if I need some little set up stages to run, or if I need a script to kick off instructions, I just import and run a nodejob. Its called "nodejobs" as it is an internal tool on a Decelium Node - a server we use internally.

These are the motivations:

  1. Light and Lazy by Design. Most job runners require a daemon running outside python. Running yet another process that raises the complexity of your application. nodejobs does not; when you start and stop a nodejob command, it updates the status of all jobs alonside. This means out of the box, with no runtime dependencies, nodejobs can manage and investigate long running processes.
  2. Typical Use: We use noejobs to run install scripts, deploy projects, and run data transfer operations: We can simply run a backup command, and if a backup command is fresh, we can skip it. Best is, other programs that know about the job_names can also share the output.
  3. *Simple: Many times job dispatch is part of small utility applications, like a user update script, a patch deployment over servers. Nodejobs allows one to have a small history of all commands run with a few lines of code.
  4. Extensible: Since nodejobs is so small, it can serve as a good base class or template for a job management utility.
  5. Observable: We use it to run jobs on docker, or on servers -- if our python scripts fail, we can easily investigate these logs by connecting remotely and looking at raw stdout and stderr files. All the logs are plaintext, so you dont even need special tools to query the jobs -- just eyes!

It is not

  1. nodejobs is not a process manager. It doesnt work well to manage, restart, or otherwise watch ongoing processes. nodejobs is mainly for dispatch and reporting.
  2. nodejobs is not a job database. nodejobs does not handle annotations, or a rich database.
  3. nodejobs is not an event dispatcher. It does not run in the background, and can not notify you or send events when something changes proactively.

Job Lifecycle

When a new job is initiated via jobs.py.run(), it triggers processes.py to spawn a subprocess. The resulting process ID and metadata are stored in the database (jobdb.py) as a JobRecord. Status transitions (e.g., queued, running, finished, failed) are updated accordingly, as you query jobs.get_logs(job_name). Logs are written to disk, with file paths stored in job records.

Job Management Interface (jobs.py)

Serving as the main API layer, this module offers high-level methods for job lifecycle management:

  • run(command, job_id)-> JobRecord to spawn new jobs.
  • stop(job_id) -> None to send a terminate command to a running processes.
  • get_status(job_id)-> JobRecord and list_status() for monitoring.
  • job_logs(job_id)-> List[str,str] to retrieve process logs.

It coordinates with the database module to update job statuses and track metadata, and leverages data schema classes (JobRecord, JobFilter) for validation internally and externally. This module acts as the bridge between process control and data persistence.


Detailed Overview

Below are some example use cases you can likely copy and paste into your application to get started. Sometimes the hardest part of getting started with something is to get the tool set up and running.

1. Initializing the Job Management System

from nodejobs import Jobs

# Initialize the Jobs manager with the database directory
jobs_manager = Jobs(db_path="/path/to/job/database")

This sets up the environment for managing jobs, ensuring all job records are stored and retrieved from the specified path.


2. Starting a New Job

# Define the command to run and assign a unique job ID
command = "python my_script.py --arg value"
job_id = "job_12345"

# Launch the job
job_record = jobs_manager.run(command=command, job_id=job_id)

# Access and print job details
print(f"Started job with ID: {job_record.self_id}")
print(f"Initial status: {job_record.status}")
print(f"Process ID: {job_record.last_pid}")

This demonstrates how to initiate a job, assign a custom ID, and retrieve initial metadata.


3. Checking the Status of a Job

# Retrieve current status of the job
status_record = jobs_manager.get_status(job_id=job_id)
print(f"Job {status_record.self_id} is {status_record.status}")

Allows monitoring of individual job progress and state.


4. Listing and Filtering Jobs

from nodejobs.jobdb import JobRecord

# Filter to find all jobs with status 'running'
filter_criteria = {JobRecord.status: JobRecord.Status.c_running}
running_jobs = jobs_manager.list(filter=filter_criteria)

for job_id, job_info in running_jobs.items():
    print(f"Running job ID: {job_id}, Status: {job_info.status}")

Enables batch retrieval of jobs based on criteria like status, self ID patterns, or other fields.


5. Retrieving Job Logs

# Fetch stdout and stderr logs for the job
stdout_log, stderr_log = jobs_manager.job_logs(job_id=job_id)

print("Standard Output:")
print(stdout_log)

print("Standard Error:")
print(stderr_log)

Facilitates debugging and auditing by accessing runtime logs.


6. Stopping a Running Job

# Send stop signal to the job
stopped_record = jobs_manager.stop(job_id=job_id)
print(f"Job {stopped_record.self_id} stopped with status: {stopped_record.status}")

Provides control over job execution, allowing manual interruption.


7. Monitoring and Updating Job Status in a Loop

import time

while True:
    status = jobs_manager.get_status(job_id=job_id)
    print(f"Current status: {status.status}")
    if status.status in [status.Status.c_finished, status.Status.c_stopped, status.Status.c_failed]:
        break
    time.sleep(5)

Supports real-time monitoring and dynamic decision-making based on job state.


8. Handling Non-Existent Jobs Gracefully

# Attempt to get status of a job that doesn't exist
non_existent_status = jobs_manager.get_status("unknown_job_id")
if non_existent_status is None:
    print("Job not found.")

Ensures robustness against invalid references and missing records.


Enjoy!

About

It runs and tracks jobs without a daemon. This is featherweight CLI job management.

Resources

License

Stars

Watchers

Forks

Packages

No packages published