Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -689,17 +689,9 @@ class TaskBackend {

zone = versionState.zone!;
instance = versionState.instance!;

// Remove instanceName, zone, secretToken, and set attempts = 0
state.versions![version] = PackageVersionStateInfo(
scheduled: versionState.scheduled,
state.versions![version] = versionState.complete(
docs: hasDocIndexHtml,
pana: summary != null,
finished: true,
attempts: 0,
instance: null, // version is no-longer running on this instance
secretToken: null, // TODO: Consider retaining this for idempotency
zone: null,
);

// Determine if something else was running on the instance
Expand Down Expand Up @@ -1002,13 +994,12 @@ class TaskBackend {
await for (final state in _db.tasks.listAllForCurrentRuntime()) {
final zone = taskWorkerCloudCompute.zones.first;
// ignore: invalid_use_of_visible_for_testing_member
final updated = await updatePackageStateWithPendingVersions(
final payload = await updatePackageStateWithPendingVersions(
_db,
state.package,
zone,
taskWorkerCloudCompute.generateInstanceName(),
);
final payload = updated?.$1;
if (payload == null) continue;
await processPayload(payload);
}
Expand Down Expand Up @@ -1418,7 +1409,6 @@ final class _TaskDataAccess {
Future<void> restorePreviousVersionsState(
String packageName,
String instanceName,
Map<String, PackageVersionStateInfo> previousVersionsMap,
) async {
await withRetryTransaction(_db, (tx) async {
final s = await tx.tasks.lookupOrNull(packageName);
Expand All @@ -1429,7 +1419,7 @@ final class _TaskDataAccess {
s.versions!.addEntries(
s.versions!.entries
.where((e) => e.value.instance == instanceName)
.map((e) => MapEntry(e.key, previousVersionsMap[e.key]!)),
.map((e) => MapEntry(e.key, e.value.resetAfterFailedAttempt())),
);
s.pendingAt = derivePendingAt(
versions: s.versions!,
Expand Down
54 changes: 53 additions & 1 deletion app/lib/task/models.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.

import 'dart:convert' show json;
import 'dart:math';

import 'package:clock/clock.dart';
import 'package:json_annotation/json_annotation.dart';
Expand Down Expand Up @@ -249,7 +250,7 @@ List<String> derivePendingVersions({
}

/// State of a given `version` within a [PackageState].
@JsonSerializable()
@JsonSerializable(includeIfNull: false)
class PackageVersionStateInfo {
PackageVersionStatus get status {
if (attempts == 0 && scheduled == initialTimestamp) {
Expand Down Expand Up @@ -319,6 +320,9 @@ class PackageVersionStateInfo {
/// comparison. Please use [isAuthorized] for validating a request.
final String? secretToken;

/// The previous scheduled timestamp (if we are currently in an active schedule).
final DateTime? previousScheduled;

/// Return true, if [token] matches [secretToken] and it has not expired.
///
/// This does a fixed-time comparison to mitigate timing attacks.
Expand Down Expand Up @@ -347,6 +351,7 @@ class PackageVersionStateInfo {
this.docs = false,
this.pana = false,
this.finished = false,
this.previousScheduled,
});

factory PackageVersionStateInfo.fromJson(Map<String, dynamic> m) =>
Expand All @@ -364,6 +369,53 @@ class PackageVersionStateInfo {
'secretToken: $secretToken',
].join(', ') +
')';

// Remove instanceName, zone, secretToken, and set attempts = 0
PackageVersionStateInfo complete({required bool pana, required bool docs}) {
return PackageVersionStateInfo(
scheduled: scheduled,
attempts: 0,
docs: docs,
pana: pana,
finished: true,
zone: null,
instance: null, // version is no-longer running on this instance
secretToken: null, // TODO: Consider retaining this for idempotency
previousScheduled: null,
);
}

/// Derives a new version state with scheduling information.
PackageVersionStateInfo scheduleNew({
required String zone,
required String instanceName,
}) {
return PackageVersionStateInfo(
scheduled: clock.now(),
attempts: attempts + 1,
zone: zone,
instance: instanceName,
secretToken: createUuid(),
finished: finished,
docs: docs,
pana: pana,
previousScheduled: scheduled,
);
}

/// Reverts the status of the last scheduling attempt, which has presumably failed.
PackageVersionStateInfo resetAfterFailedAttempt() {
return PackageVersionStateInfo(
scheduled: previousScheduled ?? initialTimestamp,
attempts: max(0, attempts - 1),
zone: null,
instance: null,
secretToken: null,
finished: finished,
docs: docs,
pana: pana,
);
}
}

/// A [db.Property] encoding a Map from version to [PackageVersionStateInfo] as JSON.
Expand Down
10 changes: 7 additions & 3 deletions app/lib/task/models.g.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 5 additions & 20 deletions app/lib/task/scheduler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import 'package:meta/meta.dart';
import 'package:pub_dev/package/backend.dart';
import 'package:pub_dev/shared/configuration.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/shared/utils.dart';
import 'package:pub_dev/task/backend.dart';
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
import 'package:pub_dev/task/models.dart';
Expand Down Expand Up @@ -101,13 +100,12 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle(
final instanceName = compute.generateInstanceName();
final zone = pickZone();

final updated = await updatePackageStateWithPendingVersions(
final payload = await updatePackageStateWithPendingVersions(
db,
selected.package,
zone,
instanceName,
);
final payload = updated?.$1;
if (payload == null) {
return;
}
Expand Down Expand Up @@ -174,15 +172,13 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle(
banZone(zone, minutes: 15);
}
if (rollbackPackageState) {
final oldVersionsMap = updated?.$2 ?? const {};
// Restore the state of the PackageState for versions that were
// Restire the state of the PackageState for versions that were
// suppose to run on the instance we just failed to create.
// If this doesn't work, we'll eventually retry. Hence, correctness
// does not hinge on this transaction being successful.
await db.tasks.restorePreviousVersionsState(
selected.package,
instanceName,
oldVersionsMap,
);
}
}
Expand Down Expand Up @@ -221,11 +217,8 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle(

/// Updates the package state with versions that are already pending or
/// will be pending soon.
///
/// Returns the payload and the old status of the state info version map
@visibleForTesting
Future<(Payload, Map<String, PackageVersionStateInfo>)?>
updatePackageStateWithPendingVersions(
Future<Payload?> updatePackageStateWithPendingVersions(
DatastoreDB db,
String package,
String zone,
Expand All @@ -237,7 +230,6 @@ updatePackageStateWithPendingVersions(
// presumably the package was deleted.
return null;
}
final oldVersionsMap = {...?s.versions};

final now = clock.now();
final pendingVersions = derivePendingVersions(
Expand All @@ -253,14 +245,7 @@ updatePackageStateWithPendingVersions(
// Update PackageState
s.versions!.addAll({
for (final v in pendingVersions.map((v) => v.toString()))
v: PackageVersionStateInfo(
scheduled: now,
attempts: s.versions![v]!.attempts + 1,
zone: zone,
instance: instanceName,
secretToken: createUuid(),
finished: s.versions![v]!.finished,
),
v: s.versions![v]!.scheduleNew(zone: zone, instanceName: instanceName),
});
s.pendingAt = derivePendingAt(
versions: s.versions!,
Expand All @@ -279,6 +264,6 @@ updatePackageStateWithPendingVersions(
),
),
);
return (payload, oldVersionsMap);
return payload;
});
}