Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions Command/CleanUpCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@

class CleanUpCommand extends Command
{
protected static $defaultName = 'jms-job-queue:clean-up';

private $jobManager;
private $registry;

public function __construct(ManagerRegistry $registry, JobManager $jobManager)
{
parent::__construct();
parent::__construct('jms-job-queue:clean-up');

$this->jobManager = $jobManager;
$this->registry = $registry;
}

protected function configure()
protected function configure(): void
{
$this
->setDescription('Cleans up jobs which exceed the maximum retention time.')
Expand All @@ -37,7 +35,7 @@ protected function configure()
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var EntityManager $em */
$em = $this->registry->getManagerForClass(Job::class);
Expand Down Expand Up @@ -99,7 +97,7 @@ private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInt
$count++;

$result = $con->executeQuery($incomingDepsSql, array('id' => $job->getId()));
if ($result->fetchColumn() !== false) {
if ($result->fetchOne() !== false) {
$em->transactional(function() use ($em, $job) {
$this->resolveDependencies($em, $job);
$em->remove($job);
Expand Down
8 changes: 3 additions & 5 deletions Command/MarkJobIncompleteCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,26 @@

class MarkJobIncompleteCommand extends Command
{
protected static $defaultName = 'jms-job-queue:mark-incomplete';

private $registry;
private $jobManager;

public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager)
{
parent::__construct();
parent::__construct('jms-job-queue:mark-incomplete');

$this->registry = $managerRegistry;
$this->jobManager = $jobManager;
}

protected function configure()
protected function configure(): void
{
$this
->setDescription('Internal command (do not use). It marks jobs as incomplete.')
->addArgument('job-id', InputArgument::REQUIRED, 'The ID of the Job.')
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var EntityManager $em */
$em = $this->registry->getManagerForClass(Job::class);
Expand Down
8 changes: 3 additions & 5 deletions Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@

class RunCommand extends Command
{
protected static $defaultName = 'jms-job-queue:run';

/** @var string */
private $env;

Expand Down Expand Up @@ -69,7 +67,7 @@ class RunCommand extends Command

public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager, EventDispatcherInterface $dispatcher, array $queueOptionsDefault, array $queueOptions)
{
parent::__construct();
parent::__construct('jms-job-queue:run');
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move the name to the constructor because the constructor calls configure at the very end. And if you then call setName after the constructor already ran you can't extend commands anymore. This normally is not a problem, but Indiana extends the RunCommand (app:jms:run) because we have to add an additional parameter ("hostname"); and this setName made our command unavailable as the original one took precendence.
And there is also some extra logic regarding aliases in the constructor that is not executed in this case, but we don't need that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh damn, good point.


$this->registry = $managerRegistry;
$this->jobManager = $jobManager;
Expand All @@ -78,7 +76,7 @@ public function __construct(ManagerRegistry $managerRegistry, JobManager $jobMan
$this->queueOptions = $queueOptions;
}

protected function configure()
protected function configure(): void
{
$this
->setDescription('Runs jobs from the queue.')
Expand All @@ -90,7 +88,7 @@ protected function configure()
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
$startTime = time();

Expand Down
31 changes: 14 additions & 17 deletions Command/ScheduleCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,21 @@

class ScheduleCommand extends Command
{
protected static $defaultName = 'jms-job-queue:schedule';

private $registry;
private $schedulers;
private $cronCommands;

public function __construct(ManagerRegistry $managerRegistry, iterable $schedulers, iterable $cronCommands)
/**
* @param ManagerRegistry $registry
* @param iterable<JobScheduler> $schedulers
* @param iterable<CronCommand> $cronCommands
*/
public function __construct(
private readonly ManagerRegistry $registry,
private readonly iterable $schedulers,
private readonly iterable $cronCommands
)
{
parent::__construct();

$this->registry = $managerRegistry;
$this->schedulers = $schedulers;
$this->cronCommands = $cronCommands;
parent::__construct('jms-job-queue:schedule');
}

protected function configure()
protected function configure(): void
{
$this
->setDescription('Schedules jobs at defined intervals')
Expand All @@ -41,7 +40,7 @@ protected function configure()
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
$maxRuntime = $input->getOption('max-runtime');
if ($maxRuntime > 300) {
Expand Down Expand Up @@ -98,7 +97,7 @@ private function scheduleJobs(OutputInterface $output, array $jobSchedulers, arr
continue;
}

list($success, $newLastRunAt) = $this->acquireLock($name, $lastRunAt);
[$success, $newLastRunAt] = $this->acquireLock($name, $lastRunAt);
$jobsLastRunAt[$name] = $newLastRunAt;

if ($success) {
Expand Down Expand Up @@ -148,14 +147,12 @@ private function populateJobSchedulers()
{
$schedulers = [];
foreach ($this->schedulers as $scheduler) {
/** @var JobScheduler $scheduler */
foreach ($scheduler->getCommands() as $name) {
$schedulers[$name] = $scheduler;
}
}

foreach ($this->cronCommands as $command) {
/** @var CronCommand $command */
if ( ! $command instanceof Command) {
throw new \RuntimeException('CronCommand should only be used on Symfony commands.');
}
Expand Down
10 changes: 8 additions & 2 deletions Console/Application.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function __construct(KernelInterface $kernel)
}
}

public function doRun(InputInterface $input, OutputInterface $output)
public function doRun(InputInterface $input, OutputInterface $output): int
{
$this->input = $input;

Expand Down Expand Up @@ -84,13 +84,19 @@ private function saveDebugInformation(\Exception $ex = null)
return;
}

try {
$trace = json_encode($ex ? FlattenException::create($ex)->toArray() : null, JSON_THROW_ON_ERROR);
} catch (\JsonException) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use this, but it's not really important.

$trace = null;
}

$this->getConnection()->executeUpdate(
"UPDATE jms_jobs SET stackTrace = :trace, memoryUsage = :memoryUsage, memoryUsageReal = :memoryUsageReal WHERE id = :id",
array(
'id' => $jobId,
'memoryUsage' => memory_get_peak_usage(),
'memoryUsageReal' => memory_get_peak_usage(true),
'trace' => serialize($ex ? FlattenException::create($ex) : null),
'trace' => $trace,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past the data was a serialized PHP object; but now it has to be valid JSON.
I also tried to add a new column for this (so that old runners that are still running can keep that column and new runners can use the new column); but this caused a lot of trouble of invalid database schemas (because of the unknown columns) and also caused trouble because of the SafeObject type; so in the end I decided that we have to make sure that either old or new runners are running exclusively and that we have to migrate the database manually if we want to switch between the runners. That is quite ugly, but the only thing that did not require implementing a custom database adapter to circumvent the validation errors

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't even think it's that ugly; it's an okay solution. Nice fix

),
array(
'id' => \PDO::PARAM_INT,
Expand Down
60 changes: 31 additions & 29 deletions Controller/JobController.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,32 @@

use Doctrine\Common\Util\ClassUtils;
use Doctrine\ORM\EntityManager;
use Doctrine\Persistence\ManagerRegistry;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Entity\Repository\JobManager;
use JMS\JobQueueBundle\View\JobFilter;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\RedirectResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\HttpException;
use Symfony\Component\Routing\Annotation\Route;
use Symfony\Component\Routing\RouterInterface;
use Twig\Environment;

class JobController extends AbstractController
{
/**
* @Route("/", name = "jms_jobs_overview")
*/
public function overviewAction(Request $request)
public function __construct(
private readonly JobManager $jobManager,
private readonly ManagerRegistry $managerRegistry,
private readonly Environment $twig,
private readonly RouterInterface $router,
private readonly bool $enableStats
) {
}

#[Route('/', name: 'jms_jobs_overview')]
public function overviewAction(Request $request): Response
{
$jobFilter = JobFilter::fromRequest($request);

Expand All @@ -27,7 +38,7 @@ public function overviewAction(Request $request)
->where($qb->expr()->isNull('j.originalJob'))
->orderBy('j.id', 'desc');

$lastJobsWithError = $jobFilter->isDefaultPage() ? $this->getRepo()->findLastJobsWithError(5) : [];
$lastJobsWithError = $jobFilter->isDefaultPage() ? $this->jobManager->findLastJobsWithError(5) : [];
foreach ($lastJobsWithError as $i => $job) {
$qb->andWhere($qb->expr()->neq('j.id', '?'.$i));
$qb->setParameter($i, $job->getId());
Expand All @@ -54,34 +65,32 @@ public function overviewAction(Request $request)

$jobs = $query->getResult();

return $this->render('@JMSJobQueue/Job/overview.html.twig', array(
return new Response($this->twig->render('@JMSJobQueue/Job/overview.html.twig', array(
'jobsWithError' => $lastJobsWithError,
'jobs' => array_slice($jobs, 0, $perPage),
'jobFilter' => $jobFilter,
'hasMore' => count($jobs) > $perPage,
'jobStates' => Job::getStates(),
));
)));
}

/**
* @Route("/{id}", name = "jms_jobs_details")
*/
public function detailsAction(Job $job)
#[Route('/{id}', name: 'jms_jobs_details')]
public function detailsAction(Job $job): Response
{
$relatedEntities = array();
foreach ($job->getRelatedEntities() as $entity) {
$class = ClassUtils::getClass($entity);
$relatedEntities[] = array(
'class' => $class,
'id' => json_encode($this->get('doctrine')->getManagerForClass($class)->getClassMetadata($class)->getIdentifierValues($entity)),
'id' => json_encode($this->managerRegistry->getManagerForClass($class)->getClassMetadata($class)->getIdentifierValues($entity)),
'raw' => $entity,
);
}

$statisticData = $statisticOptions = array();
if ($this->getParameter('jms_job_queue.statistics')) {
if ($this->enableStats) {
$dataPerCharacteristic = array();
foreach ($this->get('doctrine')->getManagerForClass(Job::class)->getConnection()->query("SELECT * FROM jms_job_statistics WHERE job_id = ".$job->getId()) as $row) {
foreach ($this->managerRegistry->getManagerForClass(Job::class)->getConnection()->query("SELECT * FROM jms_job_statistics WHERE job_id = ".$job->getId()) as $row) {
$dataPerCharacteristic[$row['characteristic']][] = array(
// hack because postgresql lower-cases all column names.
array_key_exists('createdAt', $row) ? $row['createdAt'] : $row['createdat'],
Expand Down Expand Up @@ -115,19 +124,17 @@ public function detailsAction(Job $job)
}
}

return $this->render('@JMSJobQueue/Job/details.html.twig', array(
return new Response($this->twig->render('@JMSJobQueue/Job/details.html.twig', array(
'job' => $job,
'relatedEntities' => $relatedEntities,
'incomingDependencies' => $this->getRepo()->getIncomingDependencies($job),
'incomingDependencies' => $this->jobManager->getIncomingDependencies($job),
'statisticData' => $statisticData,
'statisticOptions' => $statisticOptions,
));
)));
}

/**
* @Route("/{id}/retry", name = "jms_jobs_retry_job")
*/
public function retryJobAction(Job $job)
#[Route('/{id}/retry', name: 'jms_jobs_retry_job')]
public function retryJobAction(Job $job): RedirectResponse
{
$state = $job->getState();

Expand All @@ -144,18 +151,13 @@ public function retryJobAction(Job $job)
$this->getEm()->persist($retryJob);
$this->getEm()->flush();

$url = $this->generateUrl('jms_jobs_details', array('id' => $retryJob->getId()));
$url = $this->router->generate('jms_jobs_details', array('id' => $retryJob->getId()));

return new RedirectResponse($url, 201);
}

private function getEm(): EntityManager
{
return $this->get('doctrine')->getManagerForClass(Job::class);
}

private function getRepo(): JobManager
{
return $this->get('jms_job_queue.job_manager');
return $this->managerRegistry->getManagerForClass(Job::class);
}
}
2 changes: 1 addition & 1 deletion DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Configuration implements ConfigurationInterface
/**
* {@inheritDoc}
*/
public function getConfigTreeBuilder()
public function getConfigTreeBuilder(): TreeBuilder
{
$treeBuilder = new TreeBuilder('jms_job_queue');
$rootNode = $treeBuilder->getRootNode();
Expand Down
18 changes: 11 additions & 7 deletions Entity/CronJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@
use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name = "jms_cron_jobs")
* @ORM\ChangeTrackingPolicy("DEFERRED_EXPLICIT")
* @author Johannes M. Schmitt <schmittjoh@gmail.com>
*/
#[ORM\Entity]
#[ORM\Table(name: "jms_cron_jobs")]
#[ORM\ChangeTrackingPolicy("DEFERRED_EXPLICIT")]
#[ORM\MappedSuperclass]
class CronJob
{
/** @ORM\Id @ORM\Column(type = "integer", options = {"unsigned": true}) @ORM\GeneratedValue(strategy="AUTO") */
#[ORM\Id]
#[ORM\Column(type: "integer", options: ["unsigned" => true])]
#[ORM\GeneratedValue(strategy: "AUTO")]
private $id;

/** @ORM\Column(type = "string", length = 200, unique = true) */
#[ORM\Column(type: "string", length: 200, unique: true)]
private $command;

/** @ORM\Column(type = "datetime", name = "lastRunAt") */
#[ORM\Column(name: "lastRunAt", type: "datetime")]
private $lastRunAt;

public function __construct($command)
Expand All @@ -35,4 +39,4 @@ public function getLastRunAt()
{
return $this->lastRunAt;
}
}
}
Loading