Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"php": "^8.4",
"ext-mongodb": "*",
"mongodb/mongodb": "^2.1",
"recruiterphp/clock": "^4.1"
"recruiterphp/clock": "^5.0"
},
"require-dev": {
"ergebnis/composer-normalize": "^2.47",
Expand Down
49 changes: 20 additions & 29 deletions src/Recruiter/Concurrency/MongoLock.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,22 @@
use MongoDB\BSON\UTCDateTime;
use MongoDB\Collection;
use MongoDB\Driver\Exception\BulkWriteException;
use Recruiter\Clock\SystemClock;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Clock\NativeClock;

class MongoLock implements Lock
{
public const DUPLICATE_KEY = 11000;

private $collection;
private $processName;
private $programName;
private $clock;
private $sleep;

public function __construct(Collection $collection, $programName, $processName, $clock = null, $sleep = 'sleep')
{
$this->collection = $collection;
private const int DUPLICATE_KEY = 11000;
private ClockInterface $clock;

public function __construct(
private readonly Collection $collection,
private readonly string $programName,
private readonly string $processName,
?ClockInterface $clock = null
) {
$this->collection->createIndex(['program' => 1], ['unique' => true]);
$this->programName = $programName;
$this->processName = $processName;
if (null === $clock) {
$clock = new SystemClock();
}
$this->clock = $clock;
$this->sleep = $sleep;
$this->clock = $clock ?? new NativeClock();
}

public static function forProgram($programName, Collection $collection): self
Expand All @@ -39,12 +32,11 @@ public static function forProgram($programName, Collection $collection): self

public function acquire(int $duration = 3600): void
{
$now = $this->clock->current();
$now = $this->clock->now();

$this->removeExpiredLocks($now);

$expiration = clone $now;
$expiration->add(new \DateInterval("PT{$duration}S"));
$expiration = $now->add(new \DateInterval("PT{$duration}S"));

try {
$document = [
Expand All @@ -64,12 +56,11 @@ public function acquire(int $duration = 3600): void

public function refresh(int $duration = 3600): void
{
$now = $this->clock->current();
$now = $this->clock->now();

$this->removeExpiredLocks($now);

$expiration = clone $now;
$expiration->add(new \DateInterval("PT{$duration}S"));
$expiration = $now->add(new \DateInterval("PT{$duration}S"));

$result = $this->collection->updateOne(
['program' => $this->programName, 'process' => $this->processName],
Expand Down Expand Up @@ -115,9 +106,9 @@ public function release(bool $force = false): void
*/
public function wait(int $polling = 30, int $maximumWaitingTime = 3600): void
{
$timeLimit = $this->clock->current()->add(new \DateInterval("PT{$maximumWaitingTime}S"));
$timeLimit = $this->clock->now()->add(new \DateInterval("PT{$maximumWaitingTime}S"));
while (true) {
$now = $this->clock->current();
$now = $this->clock->now();
$result = $this->collection->count($query = [
'program' => $this->programName,
'expires_at' => ['$gte' => new UTCDateTime($now)],
Expand All @@ -127,7 +118,7 @@ public function wait(int $polling = 30, int $maximumWaitingTime = 3600): void
if ($now > $timeLimit) {
throw new LockNotAvailableException("I have been waiting up until {$timeLimit->format(\DateTime::ATOM)} for the lock $this->programName ($maximumWaitingTime seconds polling every $polling seconds), but it is still not available (now is {$now->format(\DateTime::ATOM)}).");
}
call_user_func($this->sleep, $polling);
$this->clock->sleep($polling);
} else {
break;
}
Expand All @@ -139,7 +130,7 @@ public function __toString(): string
return var_export($this->show(), true);
}

private function removeExpiredLocks(\DateTime $now): void
private function removeExpiredLocks(\DateTimeImmutable $now): void
{
$this->collection->deleteMany($query = [
'program' => $this->programName,
Expand Down
16 changes: 6 additions & 10 deletions src/Recruiter/Concurrency/PeriodicalCheck.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,20 @@

namespace Recruiter\Concurrency;

use Recruiter\Clock;
use Recruiter\Clock\SystemClock;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Clock\NativeClock;

class PeriodicalCheck
{
private array|\Closure $check;
private int $lastCheck;

public static function every(int $seconds, ?Clock $clock = null): self
public static function every(int $seconds, ?ClockInterface $clock = null): self
{
if (null === $clock) {
$clock = new SystemClock();
}

return new self($seconds, $clock);
return new self($seconds, $clock ?? new NativeClock());
}

private function __construct(private readonly int $seconds, private readonly Clock $clock)
private function __construct(private readonly int $seconds, private readonly ClockInterface $clock)
{
$this->lastCheck = 0;
}
Expand All @@ -43,7 +39,7 @@ public function __invoke(): void

public function execute(): void
{
$now = $this->clock->current()->getTimestamp();
$now = $this->clock->now()->getTimestamp();
if ($now - $this->lastCheck >= $this->seconds) {
call_user_func($this->check);
$this->lastCheck = $now;
Expand Down
82 changes: 35 additions & 47 deletions tests/Recruiter/Concurrency/MongoLockTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,23 @@
use Phake;
use PHPUnit\Framework\Attributes\Group;
use PHPUnit\Framework\TestCase;
use Recruiter\Clock;
use Recruiter\Clock\ProgressiveClock;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Process\Process;

class MongoLockTest extends TestCase
{
use Eris\TestTrait;

private MongoDB\Collection $lockCollection;
private Clock&Phake\IMock $clock;
private array $slept;
private \Closure $sleep;
private (ClockInterface&Phake\IMock)|ProgressiveClock $clock;
private int $iteration;

protected function setUp(): void
{
$uri = getenv('MONGODB_URI') ?: null;
$this->lockCollection = new MongoDB\Client($uri)->selectCollection('concurrency-test', 'lock');
$this->clock = \Phake::mock(Clock::class);

$this->slept = [];
$this->sleep = function ($amount): void {
$this->slept[] = $amount;
};
$this->clock = \Phake::mock(ClockInterface::class);
}

protected function tearDown(): void
Expand Down Expand Up @@ -78,9 +72,9 @@ public function testAnAlreadyAcquiredLockCannotBeAcquiredAgainEvenWithRefreshMet

public function testAnAlreadyAcquiredLockCanExpireSoThatItCanBeAcquiredAgain()
{
\Phake::when($this->clock)->current()
->thenReturn(new \DateTime('2014-01-01T10:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T11:00:01Z'))
\Phake::when($this->clock)->now()
->thenReturn(new \DateTimeImmutable('2014-01-01T10:00:00Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T11:00:01Z'))
;
$first = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
$first->acquire(3600);
Expand Down Expand Up @@ -139,8 +133,8 @@ public function testALockCanBeForcedToBeReleasedIfYouReallyKnowWhatYouReDoing()

public function testALockCanBeShownEvenByOtherProcessesWorkingOnTheSameProgram()
{
\Phake::when($this->clock)->current()
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
\Phake::when($this->clock)->now()
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:00Z'))
;
$first = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
$first->acquire(3600);
Expand All @@ -159,35 +153,29 @@ public function testALockCanBeShownEvenByOtherProcessesWorkingOnTheSameProgram()

public function testALockCanBeWaitedOnUntilItsDisappearance()
{
$allCalls = \Phake::when($this->clock)->current()
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTime('2014-01-01T00:01:00Z'))
$allCalls = \Phake::when($this->clock)->now()
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:01:00Z'))
;
$first = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
$first->acquire(45);

$second = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock, $this->sleep);
$second = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
$second->wait($polling = 30);
$this->assertEquals([30, 30], $this->slept);
\Phake::verify($this->clock, \Phake::times(2))->sleep(30);
}

public function testALockShouldNotBeWaitedUponForever()
{
$allCalls = \Phake::when($this->clock)->current()
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:50Z'))
->thenReturn(new \DateTime('2014-01-01T00:01:01Z'))
->thenThrow(new \LogicException('Should not call anymore'))
;
$this->clock = new ProgressiveClock(new \DateTimeImmutable('2014-01-01T00:00:00Z'), \DateInterval::createFromDateString('500 milliseconds'));

$first = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
$first->acquire(3600);

$second = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock, $this->sleep);
$second = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
try {
$second->wait($polling = 30, $maximumInterval = 60);
$this->fail('Should fail after 60 seconds');
Expand All @@ -201,28 +189,28 @@ public function testALockShouldNotBeWaitedUponForever()

public function testALockWaitedUponCanBeImmediatelyReacquired()
{
$allCalls = \Phake::when($this->clock)->current()
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:31Z'))
->thenReturn(new \DateTime('2014-01-01T00:00:31Z'))
$allCalls = \Phake::when($this->clock)->now()
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:30Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:31Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:31Z'))
;
$first = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
$first->acquire(30);

$second = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock, $this->sleep);
$second = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
$second->wait($polling = 1);
$second->acquire();
$this->expectNotToPerformAssertions();
}

public function testAnAlreadyAcquiredLockCanBeRefreshed()
{
\Phake::when($this->clock)->current()
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T00:10:00Z'))
\Phake::when($this->clock)->now()
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T00:10:00Z'))
;

$first = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
Expand All @@ -244,9 +232,9 @@ public function testAnAlreadyAcquiredLockCanBeRefreshed()

public function testAnExpiredLockCannotBeRefreshed()
{
\Phake::when($this->clock)->current()
->thenReturn(new \DateTime('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTime('2014-01-01T02:00:00Z'))
\Phake::when($this->clock)->now()
->thenReturn(new \DateTimeImmutable('2014-01-01T00:00:00Z'))
->thenReturn(new \DateTimeImmutable('2014-01-01T02:00:00Z'))
;

$first = new MongoLock($this->lockCollection, 'windows_defrag', 'ws-a-25:42', $this->clock);
Expand All @@ -261,7 +249,7 @@ public function testAnExpiredLockCannotBeRefreshed()

private function givenTimeIsFixed()
{
\Phake::when($this->clock)->current()->thenReturn(new \DateTime('2014-01-01'));
\Phake::when($this->clock)->now()->thenReturn(new \DateTimeImmutable('2014-01-01'));
}

#[Group('long')]
Expand Down
8 changes: 4 additions & 4 deletions tests/Recruiter/Concurrency/PeriodicalCheckTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Eris\Generator;
use Eris\Listener;
use PHPUnit\Framework\TestCase;
use Recruiter\Clock\SettableClock;
use Symfony\Component\Clock\MockClock;

class PeriodicalCheckTest extends TestCase
{
Expand All @@ -27,16 +27,16 @@ public function testDoesNotPerformTheCheckTooManyTimes()
),
)
// ->hook(Listener\collectFrequencies())
->then(function ($startingDate, $period, $deltas): void {
$clock = new SettableClock($startingDate);
->then(function (\DateTime $startingDate, int $period, array $deltas): void {
$clock = new MockClock(\DateTimeImmutable::createFromMutable($startingDate));
$check = PeriodicalCheck::every($period, $clock);
$this->counter = 0;
$check->onFire(function (): void {
++$this->counter;
});
$check->__invoke();
foreach ($deltas as $delta) {
$clock->advance($delta);
$clock->sleep($delta);
$check->__invoke();
}
$totalInterval = array_sum($deltas);
Expand Down