diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 0aaa2cbf6..1e57c53cd 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -17,6 +17,7 @@ import 'package:gcloud/storage.dart' show Bucket; import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError; import 'package:indexed_blob/indexed_blob.dart' show BlobIndex, FileRange; import 'package:logging/logging.dart' show Logger; +import 'package:meta/meta.dart'; import 'package:pana/models.dart' show Summary; import 'package:pool/pool.dart' show Pool; import 'package:pub_dev/package/api_export/api_exporter.dart'; @@ -128,17 +129,17 @@ class TaskBackend { final scanLoop = _createLoop( name: 'scan-packages', aborted: aborted, - fn: _runOneScanPackagesUpdate, + fn: runOneScanPackagesUpdate, ); final deleteLoop = _createLoop( name: 'delete-instances', aborted: aborted, - fn: _runOneInstanceDeletion, + fn: runOneInstanceDeletion, ); final createLoop = _createLoop( name: 'create-instances', aborted: aborted, - fn: _runOneInstanceCreation, + fn: runOneInstanceCreation, ); scheduleMicrotask(() async { @@ -296,9 +297,8 @@ class TaskBackend { } } - Future _runOneScanPackagesUpdate( - bool Function() isAbortedFn, - ) async { + @visibleForTesting + Future runOneScanPackagesUpdate(bool Function() isAbortedFn) async { final next = await runOneScanPackagesUpdatedCycle( _scanPackagesUpdatedState, _db.packages.listUpdatedSince(_scanPackagesUpdatedState.since), @@ -317,7 +317,8 @@ class TaskBackend { return Duration(minutes: 10); // TODO: consider if we scan more frequently. } - Future _runOneInstanceDeletion(bool Function() isAbortedFn) async { + @visibleForTesting + Future runOneInstanceDeletion(bool Function() isAbortedFn) async { _deleteInstancesState = await runOneDeleteInstancesCycle( _deleteInstancesState, taskWorkerCloudCompute, @@ -327,7 +328,8 @@ class TaskBackend { return Duration(minutes: 10); // TODO: consider if this should be dynamic } - Future _runOneInstanceCreation(bool Function() isAbortedFn) async { + @visibleForTesting + Future runOneInstanceCreation(bool Function() isAbortedFn) async { final result = await runOneCreateInstancesCycle( taskWorkerCloudCompute, _db, @@ -337,6 +339,14 @@ class TaskBackend { return result.$2; } + @visibleForTesting + Future runOneLoopCycle() async { + bool isAbortedFn() => false; + await runOneScanPackagesUpdate(isAbortedFn); + await runOneInstanceDeletion(isAbortedFn); + await runOneInstanceCreation(isAbortedFn); + } + Future trackPackage( String packageName, { bool updateDependents = false, diff --git a/app/lib/task/clock_control.dart b/app/lib/task/clock_control.dart index 4d9212217..e4891ee9d 100644 --- a/app/lib/task/clock_control.dart +++ b/app/lib/task/clock_control.dart @@ -469,6 +469,31 @@ final class ClockController { await Future.delayed(Duration(microseconds: 0)); } } + + void incrOffset({int hours = 0, int minutes = 0, int seconds = 0}) { + _offset += Duration(hours: hours, minutes: minutes, seconds: seconds); + } + + Future incrUntil( + FutureOr Function() condition, { + Duration? timeout, + Duration? minimumStep, + }) async { + final deadline = timeout != null ? clock.fromNowBy(timeout) : null; + + bool shouldLoop() => deadline == null || clock.now().isBefore(deadline); + + while (shouldLoop()) { + if (await condition()) { + return; + } + _offset += minimumStep ?? Duration(minutes: 1); + } + throw TimeoutException( + 'Condition given to ClockController.incrUntil was not satisfied' + ' before timeout: $timeout', + ); + } } final class _TravelingTimer { diff --git a/app/test/task/task_test.dart b/app/test/task/task_test.dart index 90e3469ee..0e154f07c 100644 --- a/app/test/task/task_test.dart +++ b/app/test/task/task_test.dart @@ -56,10 +56,6 @@ void main() { ), fn: () async { await taskBackend.backfillTrackingState(); - await clockControl.elapse(minutes: 1); - - await taskBackend.start(); - await clockControl.elapse(minutes: 5); // Check that the log is missing. final log1 = await taskBackend.taskLog('oxygen', '1.2.0'); @@ -74,6 +70,8 @@ void main() { expect(dartdoc1, isNull); // 5 minutes after start of scheduling we expect there to be 3 instances + clockControl.incrOffset(minutes: 5); + await taskBackend.runOneLoopCycle(); final instances = await cloud.listInstances().toList(); expect(instances, hasLength(3)); @@ -81,9 +79,8 @@ void main() { cloud.fakeStartInstance(instance.instanceName); } - await clockControl.elapse(minutes: 5); - for (final instance in instances) { + clockControl.incrOffset(minutes: 1); final payload = instance.payload; for (final v in payload.versions) { @@ -146,9 +143,9 @@ void main() { // Report the task as finished await api.taskUploadFinished(payload.package, v.version); } - } - await clockControl.elapse(minutes: 5); + cloud.fakeTerminateInstance(instance.instanceName); + } // Check that we can get the log file final log2 = await taskBackend.taskLog('oxygen', '1.2.0'); @@ -169,11 +166,9 @@ void main() { // All instances should be terminated, api.taskUploadFinished terminate // when all versions for the instance is done. And fake instances take 1 // minute to simulate termination. + clockControl.incrOffset(minutes: 15); + await taskBackend.runOneLoopCycle(); expect(await cloud.listInstances().toList(), hasLength(0)); - - await taskBackend.stop(); - - await clockControl.elapse(minutes: 10); }, ); @@ -184,25 +179,32 @@ void main() { ], fn: () async { await taskBackend.backfillTrackingState(); - await clockControl.elapse(minutes: 1); - - await taskBackend.start(); // We are going to let the task timeout, if this happens we should only // try to scheduled it until we hit the [taskRetryLimit]. for (var i = 0; i < taskRetryLimit; i++) { + await taskBackend.runOneScanPackagesUpdate(_isNotAborted); + // Within 24 hours an instance should be created - await clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, + await clockControl.incrUntil( + () async { + await taskBackend.runOneInstanceCreation(_isNotAborted); + return cloud.listInstances().isNotEmpty; + }, timeout: Duration(days: 1), + minimumStep: Duration(minutes: 15), ); // If nothing happens, then it should be killed within 24 hours. // Actually, it'll happen much sooner, like ~2 hours, but we'll leave the // test some wiggle room. - await clockControl.elapseUntil( - () => cloud.listInstances().isEmpty, + await clockControl.incrUntil( + () async { + await taskBackend.runOneInstanceDeletion(_isNotAborted); + return cloud.listInstances().isEmpty; + }, timeout: Duration(days: 1), + minimumStep: Duration(minutes: 15), ); } @@ -210,23 +212,27 @@ void main() { // created for the next day... assert(taskRetriggerInterval > Duration(days: 1)); await expectLater( - clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, + clockControl.incrUntil( + () async { + await taskBackend.runOneLoopCycle(); + return cloud.listInstances().isNotEmpty; + }, timeout: Duration(days: 1), + minimumStep: Duration(minutes: 10), ), throwsA(isA()), ); // But the task should be retried after [taskRetriggerInterval], this is a // long time, but for sanity we do re-analyze everything occasionally. - await clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, + await clockControl.incrUntil( + () async { + await taskBackend.runOneLoopCycle(); + return cloud.listInstances().isNotEmpty; + }, timeout: taskRetriggerInterval + Duration(days: 1), + minimumStep: Duration(hours: 2), ); - - await taskBackend.stop(); - - await clockControl.elapse(minutes: 10); }, testProfile: TestProfile( defaultUser: 'admin@pub.dev', @@ -245,9 +251,7 @@ void main() { 'Limit to 5 latest major versions', fn: () async { await taskBackend.backfillTrackingState(); - await clockControl.elapse(minutes: 1); - await taskBackend.start(); - await clockControl.elapse(minutes: 15); + await taskBackend.runOneLoopCycle(); // We expect there to be one instance with less than 10 versions to be // analyzed, this even though there really is 20 versions. @@ -261,10 +265,6 @@ void main() { instances.first.payload.versions.map((vt) => vt.version).toList(), hasLength(5), ); - - await taskBackend.stop(); - - await clockControl.elapse(minutes: 10); }, testProfile: TestProfile( defaultUser: 'admin@pub.dev', @@ -294,9 +294,7 @@ void main() { 'continued scan finds new packages', fn: () async { await taskBackend.backfillTrackingState(); - await taskBackend.start(); - await clockControl.elapse(minutes: 15); - + await taskBackend.runOneLoopCycle(); expect(await cloud.listInstances().toList(), hasLength(0)); // Create a package @@ -313,13 +311,8 @@ void main() { ), ); - await clockControl.elapse(minutes: 15); - + await taskBackend.runOneLoopCycle(); expect(await cloud.listInstances().toList(), hasLength(1)); - - await taskBackend.stop(); - - await clockControl.elapse(minutes: 10); }, testProfile: TestProfile( defaultUser: 'admin@pub.dev', @@ -332,9 +325,7 @@ void main() { 'analyzed packages stay idle', fn: () async { await taskBackend.backfillTrackingState(); - await taskBackend.start(); - await clockControl.elapse(minutes: 15); - + await taskBackend.runOneLoopCycle(); final instances = await cloud.listInstances().toList(); // There is only one package, so we should only get one instance expect(instances, hasLength(1)); @@ -388,17 +379,18 @@ void main() { await api.taskUploadFinished(payload.package, v.version); // Leave time for the instance to be deleted (takes 1 min in fake cloud) - await clockControl.elapse(minutes: 5); + clockControl.incrOffset(minutes: 5); + await taskBackend.runOneLoopCycle(); // We don't expect anything to be scheduled for the next 7 days. - await clockControl.expectUntil( - () => cloud.listInstances().isEmpty, - Duration(days: 7), + await clockControl.incrUntil( + () async { + await taskBackend.runOneLoopCycle(); + return cloud.listInstances().isEmpty; + }, + timeout: Duration(days: 7), + minimumStep: Duration(minutes: 5), ); - - await taskBackend.stop(); - - await clockControl.elapse(minutes: 10); }, testProfile: TestProfile( defaultUser: 'admin@pub.dev', @@ -417,9 +409,8 @@ void main() { 'continued scan finds new versions', fn: () async { await taskBackend.backfillTrackingState(); - await taskBackend.start(); - await clockControl.elapse(minutes: 15); { + await taskBackend.runOneLoopCycle(); final instances = await cloud.listInstances().toList(); // There is only one package, so we should only get one instance expect(instances, hasLength(1)); @@ -475,14 +466,16 @@ void main() { // Report the task as finished await api.taskUploadFinished(payload.package, v.version); } + // Leave time for the instance to be deleted (takes 1 min in fake cloud) - await clockControl.elapse(minutes: 5); + clockControl.incrOffset(minutes: 5); + await taskBackend.runOneLoopCycle(); // We don't expect anything to be scheduled for the next 3 days. - await clockControl.expectUntil( - () => cloud.listInstances().isEmpty, - Duration(days: 3), - ); + await clockControl.incrUntil(() async { + await taskBackend.runOneLoopCycle(); + return cloud.listInstances().isEmpty; + }, timeout: Duration(days: 3)); // Create a new version of existing package, this should trigger analysis await importProfile( @@ -498,7 +491,8 @@ void main() { ), ); - await clockControl.elapse(minutes: 15); + clockControl.incrOffset(minutes: 15); + await taskBackend.runOneLoopCycle(); { final instances = await cloud.listInstances().toList(); @@ -519,10 +513,6 @@ void main() { final v = payload.versions.first; expect(v.version, equals('2.0.0')); } - - await taskBackend.stop(); - - await clockControl.elapse(minutes: 10); }, testProfile: TestProfile( defaultUser: 'admin@pub.dev', @@ -541,9 +531,7 @@ void main() { 're-analyzes when dependency is updated', fn: () async { await taskBackend.backfillTrackingState(); - await taskBackend.start(); - await clockControl.elapse(minutes: 15); - + await taskBackend.runOneLoopCycle(); // There should be 2 packages for analysis now expect(await cloud.listInstances().toList(), hasLength(2)); @@ -616,10 +604,12 @@ void main() { // Report the task as finished await api.taskUploadFinished(payload.package, v.version); + cloud.fakeTerminateInstance(instance.instanceName); } // Leave time for the instance to be deleted (takes 1 min in fake cloud) - await clockControl.elapse(minutes: 15); + clockControl.incrOffset(minutes: 15); + await taskBackend.runOneLoopCycle(); // We don't expect anything to be scheduled now expect(await cloud.listInstances().toList(), isEmpty); @@ -639,7 +629,8 @@ void main() { ), ); - await clockControl.elapse(minutes: 15); + clockControl.incrOffset(minutes: 15); + await taskBackend.runOneLoopCycle(); // Expect that neon is scheduled within 15 minutes expect( @@ -649,7 +640,8 @@ void main() { // Since oxygen was recently scheduled, we expect that it won't have been // scheduled yet. - await clockControl.elapse(minutes: 15); + clockControl.incrOffset(minutes: 15); + await taskBackend.runOneLoopCycle(); expect( await cloud.listInstances().map((i) => i.payload.package).toList(), isNot(contains('oxygen')), @@ -657,14 +649,16 @@ void main() { // At some point oxygen must also be retriggered, by this can be offset by // the [taskDependencyRetriggerCoolOff] delay. - await clockControl.elapseUntil( - () => cloud.listInstances().any((i) => i.payload.package == 'oxygen'), + await clockControl.incrUntil( + () async { + await taskBackend.runOneLoopCycle(); + return cloud.listInstances().any( + (i) => i.payload.package == 'oxygen', + ); + }, timeout: taskDependencyRetriggerCoolOff + Duration(minutes: 15), + minimumStep: Duration(hours: 2), ); - - await taskBackend.stop(); - - await clockControl.elapse(minutes: 10); }, testProfile: TestProfile( defaultUser: 'admin@pub.dev', @@ -698,8 +692,7 @@ void main() { ), fn: () async { await taskBackend.backfillTrackingState(); - await taskBackend.start(); - await clockControl.elapse(minutes: 15); + await taskBackend.runOneLoopCycle(); late VersionTokenPair v; { final instances = await cloud.listInstances().toList(); @@ -733,7 +726,8 @@ void main() { ), ); - await clockControl.elapse(minutes: 15); + clockControl.incrOffset(minutes: 15); + await taskBackend.runOneLoopCycle(); // verify token is now aborted final ps = await dbService.lookupValue( @@ -761,10 +755,11 @@ void main() { ); } // Leave time for the instance to be deleted (takes 1 min in fake cloud) - await clockControl.elapse(minutes: 5); + clockControl.incrOffset(minutes: 5); + await taskBackend.runOneLoopCycle(); { - await clockControl.elapseTime(maxTaskExecutionTime); + clockControl.incrOffset(minutes: maxTaskExecutionTime.inMinutes + 1); // Create new version, removing the token from the aborted list await importProfile( profile: TestProfile( @@ -791,10 +786,6 @@ void main() { message: 'The provided token is invalid or expired.', ); } - - await taskBackend.stop(); - - await clockControl.elapse(minutes: 10); }, ); } @@ -842,3 +833,5 @@ Future upload( // Unhandled response code -> retry fail('Unhandled HTTP status = ${res.statusCode}, body: ${res.body}'); } + +bool _isNotAborted() => false;