Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions src/taskgraph/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@
testing = False


class CreateTasksException(Exception):
"""Exception raised when one or more tasks could not be created."""

def __init__(self, errors: dict[str, Exception]):
message = ""
for label, exc in errors.items():
message += f"\nERROR: Could not create '{label}':\n\n"
message += "\n".join(f" {line}" for line in str(exc).splitlines()) + "\n"

super().__init__(message)


def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id):
taskid_to_label = {t: l for l, t in label_to_taskid.items()}

Expand Down Expand Up @@ -50,33 +62,48 @@ def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task
session = get_session()
with futures.ThreadPoolExecutor(concurrency) as e:
fs = {}
fs_to_task = {}
skipped = set()
errors = {}

# We can't submit a task until its dependencies have been submitted.
# So our strategy is to walk the graph and submit tasks once all
# their dependencies have been submitted.
tasklist = set(taskgraph.graph.visit_postorder())
alltasks = tasklist.copy()

def schedule_tasks():
# bail out early if any futures have failed
if any(f.done() and f.exception() for f in fs.values()):
return
def handle_exception(fut):
if exc := fut.exception():
task_id, label = fs_to_task[fut]
skipped.add(task_id)
errors[label] = exc

def schedule_tasks():
to_remove = set()
new = set()

def submit(task_id, label, task_def):
fut = e.submit(create_task, session, task_id, label, task_def)
new.add(fut)
fs[task_id] = fut
fs_to_task[fut] = (task_id, label)
fut.add_done_callback(handle_exception)

for task_id in tasklist:
task_def = taskgraph.tasks[task_id].task
# If we haven't finished submitting all our dependencies yet,
# come back to this later.
# Some dependencies aren't in our graph, so make sure to filter
# those out
deps = set(task_def.get("dependencies", [])) & alltasks

# If one of the dependencies didn't get created, then
# don't attempt to submit as it would fail.
if any(d in skipped for d in deps):
skipped.add(task_id)
to_remove.add(task_id)
continue

# If we haven't finished submitting all our dependencies yet,
# come back to this later.
if any((d not in fs or not fs[d].done()) for d in deps):
continue

Expand All @@ -90,16 +117,18 @@ def submit(task_id, label, task_def):
submit(slugid(), taskid_to_label[task_id], task_def)
tasklist.difference_update(to_remove)

# as each of those futures complete, try to schedule more tasks
# As each of those futures complete, try to schedule more tasks.
for f in futures.as_completed(new):
schedule_tasks()

# start scheduling tasks and run until everything is scheduled
# Start scheduling tasks and run until everything is scheduled.
schedule_tasks()

# check the result of each future, raising an exception if it failed
for f in futures.as_completed(fs.values()):
f.result()
# Wait for all futures to complete.
futures.wait(fs.values())

if errors:
raise CreateTasksException(errors)


def create_task(session, task_id, label, task_def):
Expand Down
140 changes: 118 additions & 22 deletions test/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,73 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


import json
import re
import unittest
from unittest import mock

import responses

from taskgraph import create
from taskgraph.config import GraphConfig
from taskgraph.create import CreateTasksException
from taskgraph.graph import Graph
from taskgraph.task import Task
from taskgraph.taskgraph import TaskGraph
from taskgraph.util import taskcluster as tc_util

GRAPH_CONFIG = GraphConfig({"trust-domain": "domain"}, "/var/empty")


class TestCreate(unittest.TestCase):
def setUp(self):
self.created_tasks = {}
self.old_create_task = create.create_task
create.create_task = self.fake_create_task
def mock_taskcluster_api(
created_tasks=None, error_status=None, error_message=None, error_task_ids=None
):
"""Mock the Taskcluster Queue API for create task calls."""

def request_callback(request):
task_id = request.url.split("/")[-1]

def tearDown(self):
create.create_task = self.old_create_task
# Check if this task should error
if error_status is not None:
if error_task_ids is None or task_id in error_task_ids:
# Support per-task error messages
if isinstance(error_message, dict):
message = error_message.get(task_id, "error")
else:
message = error_message or "error"
return (error_status, {}, f'{{"message": "{message}"}}')

def fake_create_task(self, session, task_id, label, task_def):
self.created_tasks[task_id] = task_def
# Success case - capture task definition if requested
if created_tasks is not None:
task_def = json.loads(request.body)
created_tasks[task_id] = task_def

return (200, {}, f'{{"status": {{"taskId": "{task_id}"}}}}')

responses.add_callback(
responses.PUT,
re.compile(r"https://tc\.example\.com/api/queue/v1/task/.*"),
callback=request_callback,
content_type="application/json",
)


class TestCreate(unittest.TestCase):
def setUp(self):
# Clear cached Taskcluster clients/sessions since we're mocking the environment
tc_util.get_taskcluster_client.cache_clear()
tc_util.get_session.cache_clear()

@responses.activate
@mock.patch.dict(
"os.environ",
{"TASKCLUSTER_ROOT_URL": "https://tc.example.com"},
clear=True,
)
def test_create_tasks(self):
created_tasks = {}
mock_taskcluster_api(created_tasks=created_tasks)

tasks = {
"tid-a": Task(
kind="test", label="a", attributes={}, task={"payload": "hello world"}
Expand All @@ -48,18 +89,28 @@ def test_create_tasks(self):
decision_task_id="decisiontask",
)

for tid, task in self.created_tasks.items():
assert created_tasks
for tid, task in created_tasks.items():
self.assertEqual(task["payload"], "hello world")
self.assertEqual(task["schedulerId"], "domain-level-4")
# make sure the dependencies exist, at least
for depid in task.get("dependencies", []):
if depid == "decisiontask":
# Don't look for decisiontask here
continue
self.assertIn(depid, self.created_tasks)

self.assertIn(depid, created_tasks)

@responses.activate
@mock.patch.dict(
"os.environ",
{"TASKCLUSTER_ROOT_URL": "https://tc.example.com"},
clear=True,
)
def test_create_task_without_dependencies(self):
"a task with no dependencies depends on the decision task"
created_tasks = {}
mock_taskcluster_api(created_tasks=created_tasks)

tasks = {
"tid-a": Task(
kind="test", label="a", attributes={}, task={"payload": "hello world"}
Expand All @@ -77,12 +128,20 @@ def test_create_task_without_dependencies(self):
decision_task_id="decisiontask",
)

for tid, task in self.created_tasks.items():
assert created_tasks
for tid, task in created_tasks.items():
self.assertEqual(task.get("dependencies"), ["decisiontask"])

@mock.patch("taskgraph.create.create_task")
def test_create_tasks_fails_if_create_fails(self, create_task):
"creat_tasks fails if a single create_task call fails"
@responses.activate
@mock.patch.dict(
"os.environ",
{"TASKCLUSTER_ROOT_URL": "https://tc.example.com"},
clear=True,
)
def test_create_tasks_fails_if_create_fails(self):
"create_tasks fails if a single create_task call fails"
mock_taskcluster_api(error_status=403, error_message="oh no!")

tasks = {
"tid-a": Task(
kind="test", label="a", attributes={}, task={"payload": "hello world"}
Expand All @@ -92,17 +151,54 @@ def test_create_tasks_fails_if_create_fails(self, create_task):
graph = Graph(nodes={"tid-a"}, edges=set())
taskgraph = TaskGraph(tasks, graph)

def fail(*args):
print("UHOH")
raise RuntimeError("oh no!")
with self.assertRaises(CreateTasksException):
create.create_tasks(
GRAPH_CONFIG,
taskgraph,
label_to_taskid,
{"level": "4"},
decision_task_id="decisiontask",
)

@responses.activate
@mock.patch.dict(
"os.environ",
{"TASKCLUSTER_ROOT_URL": "https://tc.example.com"},
clear=True,
)
def test_create_tasks_collects_multiple_errors(self):
"create_tasks collects all errors from multiple failing tasks"
mock_taskcluster_api(
error_status=409,
error_message={
"tid-a": "scope error for task a",
"tid-b": "scope error for task b",
},
error_task_ids={"tid-a", "tid-b"},
)

create_task.side_effect = fail
tasks = {
"tid-a": Task(
kind="test", label="a", attributes={}, task={"payload": "hello world"}
),
"tid-b": Task(
kind="test", label="b", attributes={}, task={"payload": "hello world"}
),
}
label_to_taskid = {"a": "tid-a", "b": "tid-b"}
graph = Graph(nodes={"tid-a", "tid-b"}, edges=set())
taskgraph = TaskGraph(tasks, graph)

with self.assertRaises(RuntimeError):
with self.assertRaises(CreateTasksException) as cm:
create.create_tasks(
GRAPH_CONFIG,
taskgraph,
label_to_taskid,
{"level": "4"},
decision_task_id="decisiontask",
)

# Verify both errors are in the exception message
exception_message = str(cm.exception)
self.assertIn("Could not create 'a'", exception_message)
self.assertIn("Could not create 'b'", exception_message)
4 changes: 4 additions & 0 deletions test/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from taskgraph import docker
from taskgraph.config import GraphConfig
from taskgraph.transforms.docker_image import IMAGE_BUILDER_IMAGE
from taskgraph.util import taskcluster as tc_util
from taskgraph.util.vcs import get_repository

from .conftest import nowin
Expand All @@ -22,6 +23,9 @@ def root_url():
def mock_environ(monkeypatch, root_url):
# Ensure user specified environment variables don't interfere with URLs.
monkeypatch.setattr(os, "environ", {"TASKCLUSTER_ROOT_URL": root_url})
# Clear cached Taskcluster clients/sessions since we're mocking the environment
tc_util.get_taskcluster_client.cache_clear()
tc_util.get_session.cache_clear()


@pytest.fixture(autouse=True, scope="module")
Expand Down
Loading