diff --git a/src/main/groovy/io/seqera/wave/configuration/ContainerRequestConfig.groovy b/src/main/groovy/io/seqera/wave/configuration/ContainerRequestConfig.groovy
new file mode 100644
index 000000000..cede3d9bd
--- /dev/null
+++ b/src/main/groovy/io/seqera/wave/configuration/ContainerRequestConfig.groovy
@@ -0,0 +1,107 @@
+/*
+ * Wave, containers provisioning service
+ * Copyright (c) 2023-2024, Seqera Labs
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package io.seqera.wave.configuration
+
+import java.time.Duration
+
+import groovy.transform.ToString
+import io.micronaut.context.annotation.Value
+import io.seqera.wave.util.DurationUtils
+import jakarta.inject.Inject
+import jakarta.inject.Singleton
+/**
+ * Configuration to be used by {@link io.seqera.wave.service.request.ContainerRequestService}
+ *
+ * @author: Paolo Di Tommaso
+ *
+ */
+@ToString(includePackage = false, includeNames = true)
+@Singleton
+class ContainerRequestConfig {
+
+ @Inject
+ Cache cache
+
+ @Inject
+ Watcher watcher
+
+ /**
+ * Model container token caching configuration settings
+ */
+ @ToString(includePackage = false, includeNames = true)
+ @Singleton
+ static class Cache {
+
+ /**
+ * The default duration of a container request time-to-live.
+ * This determines how long a container token is valid, and
+ * therefore an ephemeral container can be accessed.
+ */
+ @Value('${wave.tokens.cache.duration:3h}')
+ Duration duration
+
+ /**
+ * The maximum duration of a container request time-to-live.
+ * This determines how long a container token is valid, and
+ * therefore an ephemeral container can be accessed.
+ */
+ @Value('${wave.tokens.cache.max-duration:2d}')
+ Duration maxDuration
+
+ /**
+ * This method returns the period of time between two consecutive check events.
+ * The interval determines how frequently a refresh operation is triggered.
+ * A shorter interval means more frequent checks, while a longer interval reduces checks frequency.
+ */
+ @Value('${wave.tokens.cache.check-interval:30h}')
+ Duration checkInterval
+
+ }
+
+ /**
+ * Model container request watcher configuration settings
+ */
+ @ToString(includePackage = false, includeNames = true)
+ static class Watcher {
+
+ /**
+ * Determine the delay after which the container request watcher service is run
+ */
+ @Value('${wave.tokens.watcher.interval:10s}')
+ Duration interval
+
+ /**
+ * Determine the delay after which the watcher service is launched after the bootstrap
+ */
+ @Value('${wave.tokens.watcher.delay:5s}')
+ Duration delay
+
+ /**
+ * Determine the number of container requests that are processed in watcher cycle
+ */
+ @Value('${wave.tokens.watcher.count:250}')
+ int count
+
+ Duration getDelayRandomized() {
+ DurationUtils.randomDuration(getDelay(), 0.4f)
+ }
+
+ }
+
+}
diff --git a/src/main/groovy/io/seqera/wave/controller/ViewController.groovy b/src/main/groovy/io/seqera/wave/controller/ViewController.groovy
index 9ba1f4745..99f86f96a 100644
--- a/src/main/groovy/io/seqera/wave/controller/ViewController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/ViewController.groovy
@@ -51,9 +51,9 @@ import io.seqera.wave.service.inspect.ContainerInspectService
import io.seqera.wave.service.logs.BuildLogService
import io.seqera.wave.service.mirror.ContainerMirrorService
import io.seqera.wave.service.mirror.MirrorResult
-import io.seqera.wave.service.persistence.PersistenceService
import io.seqera.wave.service.persistence.WaveBuildRecord
import io.seqera.wave.service.persistence.WaveScanRecord
+import io.seqera.wave.service.request.ContainerRequestService
import io.seqera.wave.service.scan.ContainerScanService
import io.seqera.wave.service.scan.ScanEntry
import io.seqera.wave.service.scan.ScanType
@@ -82,7 +82,7 @@ class ViewController {
private String serverUrl
@Inject
- private PersistenceService persistenceService
+ private ContainerRequestService containerService
@Inject
@Nullable
@@ -275,8 +275,10 @@ class ViewController {
@View("container-view")
@Get('/containers/{token}')
- HttpResponse viewContainer(String token) {
- final data = persistenceService.loadContainerRequest(token)
+ HttpResponse> viewContainer(String token) {
+ final data = containerService.loadContainerRecord(token)
+ if( !data )
+ throw new NotFoundException("Unknown container token: $token")
// return the response
final binding = new HashMap(20)
if( !data )
diff --git a/src/main/groovy/io/seqera/wave/encoder/DateTimeAdapter.groovy b/src/main/groovy/io/seqera/wave/encoder/DateTimeAdapter.groovy
index 395811ad0..6179fcdce 100644
--- a/src/main/groovy/io/seqera/wave/encoder/DateTimeAdapter.groovy
+++ b/src/main/groovy/io/seqera/wave/encoder/DateTimeAdapter.groovy
@@ -20,6 +20,7 @@ package io.seqera.wave.encoder
import java.time.Duration
import java.time.Instant
+import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
import com.squareup.moshi.FromJson
@@ -58,4 +59,14 @@ class DateTimeAdapter {
final val0 = value.contains('.') ? Math.round(value.toDouble() * 1_000_000_000) : value.toLong()
return value != null ? Duration.ofNanos(val0) : null
}
+
+ @ToJson
+ String serializeOffsetDateTime(OffsetDateTime value) {
+ return value.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ }
+
+ @FromJson
+ OffsetDateTime deserializeOffsetDateTime(String value) {
+ return OffsetDateTime.parse(value, DateTimeFormatter.ISO_OFFSET_DATE_TIME)
+ }
}
diff --git a/src/main/groovy/io/seqera/wave/service/persistence/WaveContainerRecord.groovy b/src/main/groovy/io/seqera/wave/service/persistence/WaveContainerRecord.groovy
index e0ee15ddf..e257d7027 100644
--- a/src/main/groovy/io/seqera/wave/service/persistence/WaveContainerRecord.groovy
+++ b/src/main/groovy/io/seqera/wave/service/persistence/WaveContainerRecord.groovy
@@ -48,134 +48,134 @@ class WaveContainerRecord {
* This is container token and it is named as id for surrealdb requirement
*/
@PostgresIgnore
- final String id
+ String id
/**
* The Tower user associated with the request
*/
- final User user
+ User user
/**
* The Tower workspace associated with the request
*/
- final Long workspaceId
+ Long workspaceId
/**
* The container image requested. this can be null null when a build request was submitted
*/
- final String containerImage
+ String containerImage
/**
* The container file (aka Dockerfile) content associated with the request
*/
- final String containerFile
+ String containerFile
/**
* The container config associated with the request
*/
- final ContainerConfig containerConfig
+ ContainerConfig containerConfig
/**
* The conda file associated with the request
*/
- final String condaFile
+ String condaFile
/**
* The container arch platform
*/
- final String platform
+ String platform
/**
* The Tower endpoint associated with the request
*/
- final String towerEndpoint
+ String towerEndpoint
/**
* The repository where the build image is uploaded
*/
- final String buildRepository
+ String buildRepository
/**
* The repository where container layers are cached
*/
- final String cacheRepository
+ String cacheRepository
/**
* The request fingerprint
*/
- final String fingerprint
+ String fingerprint
/**
* The request timestamp
*/
- final Instant timestamp
+ Instant timestamp
/**
* The time zone id where the request was originated
*/
- final String zoneId
+ String zoneId
/**
* The IP address originating the request
*/
- final String ipAddress
+ String ipAddress
/**
* The container image associated with this Wave container, it can be the container image
* as provide by the user, or a container image built by Wave
*/
- final String sourceImage
+ String sourceImage
/**
* The container SHA256 digest of the container image associated with this request
*/
- final String sourceDigest
+ String sourceDigest
/**
* The resulting Wave container image name
*/
- final String waveImage
+ String waveImage
/**
* The resulting Wave container image digest
*/
- final String waveDigest
+ String waveDigest
/**
* The timestamp of the Wave container expiration
*/
- final Instant expiration
+ Instant expiration
/**
* The ID of the build if the Wave request triggered a container build, null otherwise
*/
- final String buildId
+ String buildId
/**
* Whenever a new build was triggered for this Wave request, or the container was built by a previous request
*/
- final Boolean buildNew
+ Boolean buildNew
/**
* Whenever the request is a Wave container freeze
*/
- final Boolean freeze
+ Boolean freeze
/**
* Whenever the request is for container with fusion
*/
@JsonDeserialize(using = FusionVersionStringDeserializer.class)
- final String fusionVersion
+ String fusionVersion
/**
* Whenever it's a "mirror" build request
*/
- final Boolean mirror
+ Boolean mirror
/**
* The scan id associated with this request
*/
- final String scanId
+ String scanId
WaveContainerRecord(SubmitContainerTokenRequest request, ContainerRequest data, String waveImage, String addr, Instant expiration) {
this.id = data.requestId
diff --git a/src/main/groovy/io/seqera/wave/service/request/ContainerRequestRange.groovy b/src/main/groovy/io/seqera/wave/service/request/ContainerRequestRange.groovy
new file mode 100644
index 000000000..fc57cb562
--- /dev/null
+++ b/src/main/groovy/io/seqera/wave/service/request/ContainerRequestRange.groovy
@@ -0,0 +1,81 @@
+/*
+ * Wave, containers provisioning service
+ * Copyright (c) 2023-2024, Seqera Labs
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package io.seqera.wave.service.request
+
+import java.time.Duration
+import java.time.Instant
+
+import groovy.transform.Canonical
+import groovy.transform.CompileStatic
+import groovy.util.logging.Slf4j
+import io.seqera.wave.encoder.MoshiEncodeStrategy
+import io.seqera.wave.store.range.AbstractRangeStore
+import io.seqera.wave.store.range.impl.RangeProvider
+import jakarta.inject.Singleton
+/**
+ * Model a range store for container request ids.
+ *
+ * @author Paolo Di Tommaso
+ */
+@Slf4j
+@Singleton
+@CompileStatic
+class ContainerRequestRange extends AbstractRangeStore {
+
+ @Canonical
+ static class Entry {
+ final String requestId
+ final String workflowId
+ final Instant expiration
+
+ Entry withExpiration(Instant instant) {
+ new Entry(requestId, workflowId, instant)
+ }
+ }
+
+
+ private MoshiEncodeStrategy encoder
+
+ ContainerRequestRange(RangeProvider provider) {
+ super(provider)
+ encoder = new MoshiEncodeStrategy() {}
+ }
+
+ @Override
+ protected String getKey() {
+ return 'container-requests-range/v1'
+ }
+
+ void add(Entry entry, Duration future) {
+ assert future
+ add(entry, Instant.now().plus(future))
+ }
+
+ void add(Entry entry, Instant expire) {
+ assert entry
+ assert expire
+ super.add(encoder.encode(entry), expire.epochSecond)
+ }
+
+ List getEntriesUntil(Instant instant, int max) {
+ final result = getRange(0, instant.epochSecond, max)
+ return result ? result.collect((json)-> encoder.decode(json)) : List.of()
+ }
+
+}
diff --git a/src/main/groovy/io/seqera/wave/service/request/ContainerRequestServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/request/ContainerRequestServiceImpl.groovy
index 071abcb7d..302860ea0 100644
--- a/src/main/groovy/io/seqera/wave/service/request/ContainerRequestServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/service/request/ContainerRequestServiceImpl.groovy
@@ -18,13 +18,20 @@
package io.seqera.wave.service.request
+import java.time.Duration
import java.time.Instant
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
-import io.seqera.wave.configuration.TokenConfig
+import io.micronaut.scheduling.TaskScheduler
+import io.seqera.wave.configuration.ContainerRequestConfig
import io.seqera.wave.service.persistence.PersistenceService
import io.seqera.wave.service.persistence.WaveContainerRecord
+import io.seqera.wave.service.request.ContainerRequestRange.Entry
+import io.seqera.wave.tower.auth.JwtAuth
+import io.seqera.wave.tower.client.TowerClient
+import io.seqera.wave.tower.client.Workflow
+import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Singleton
/**
@@ -41,7 +48,7 @@ class ContainerRequestServiceImpl implements ContainerRequestService {
private ContainerRequestStore containerRequestStore
@Inject
- private TokenConfig config
+ private ContainerRequestConfig config
@Inject
private PersistenceService persistenceService
@@ -49,7 +56,14 @@ class ContainerRequestServiceImpl implements ContainerRequestService {
@Override
TokenData computeToken(ContainerRequest request) {
final expiration = Instant.now().plus(config.cache.duration)
- containerRequestStore.put(request.requestId, request)
+ // put in the container store
+ containerRequestStore.put(request.requestId, request, config.cache.duration)
+ // when the workflowId is available schedule a refresh event
+ if( request.type==ContainerRequest.Type.Container && request.identity.workflowId ) {
+ final entry = new ContainerRequestRange.Entry(request.requestId, request.identity.workflowId, expiration)
+ scheduleRefresh(entry)
+ }
+ // return the token data
return new TokenData(request.requestId, expiration)
}
@@ -74,4 +88,127 @@ class ContainerRequestServiceImpl implements ContainerRequestService {
WaveContainerRecord loadContainerRecord(String requestId) {
persistenceService.loadContainerRequest(requestId)
}
+
+ // =============== watcher implementation ===============
+
+ private static final String PREFIX = 'request/v1/'
+
+ @Inject
+ private TaskScheduler scheduler
+
+ @Inject
+ private ContainerRequestRange containerRequestRange
+
+ @Inject
+ private TowerClient towerClient
+
+ protected void scheduleRefresh(Entry entry) {
+ final future = Instant.now() + config.cache.checkInterval
+ log.trace "Scheduling container request $entry - event ts=$future"
+ containerRequestRange.add(entry, future)
+ }
+
+ @PostConstruct
+ private void init() {
+ log.info "Creating Container request watcher - ${config}"
+ // use randomize initial delay to prevent multiple replicas running at the same time
+ scheduler.scheduleAtFixedRate(
+ config.watcher.delayRandomized,
+ config.watcher.interval,
+ this.&watch )
+ }
+
+ protected void watch() {
+ final now = Instant.now()
+ final keys = containerRequestRange.getEntriesUntil(now, config.watcher.count)
+ for( Entry it : keys ) {
+ try {
+ check0(it, now)
+ }
+ catch (InterruptedException e) {
+ Thread.interrupted()
+ }
+ catch (Throwable t) {
+ log.error("Unexpected error in container request watcher while processing key: $it", t)
+ }
+ }
+ }
+
+ protected void check0(final Entry entry, final Instant now) {
+ // 1. some sanity checks
+ if( !entry.requestId ) {
+ log.error "Missing refresh entry request id - offending entry=$entry"
+ return
+ }
+ if( !entry.workflowId ) {
+ log.error "Missing refresh entry workflow id - offending entry=$entry"
+ return
+ }
+ if( !entry.expiration ) {
+ log.error "Missing refresh entry expiration - offending entry=$entry"
+ return
+ }
+
+ // 2. check if the request is near to expiration
+ final deadline = entry.expiration - config.cache.checkInterval
+ if( now < deadline ) {
+ log.debug "Container request '${entry.requestId}' does not requires refresh - deadline=${deadline}; expiration=${entry.expiration}"
+ scheduleRefresh(entry)
+ return
+ }
+
+ // 3. check the request is still available
+ final request = getRequest(entry.requestId)
+ if( !request ) {
+ log.error "Unable to find any container request for id '${entry.requestId}'"
+ return
+ }
+
+ // 4. check the workflow is still running
+ final workflow = describeWorkflow(request)
+ if( !isWorkflowActive(workflow) ) {
+ log.debug "Container request '${entry.requestId}' does not require refresh - workflow ${workflow.id} is not running"
+ return
+ }
+
+ // 5. check the expiration is not beyond the max allowed
+ final newExpire = entry.expiration + config.cache.checkInterval.multipliedBy(2)
+ if(Duration.between(request.creationTime, newExpire) > config.cache.maxDuration) {
+ log.info "Container request '${entry.requestId}' reached max allowed duration - expiration=${entry.expiration}; new expiration=${newExpire}; worklow=${workflow.id}"
+ return
+ }
+
+ // 6. load the container persisted record
+ final requestRecord = loadContainerRecord(entry.requestId)
+ if( !requestRecord ) {
+ log.error "Unable to find any container record for request '${entry.requestId}'"
+ return
+ }
+
+ // 7. store the request with update expiration
+ final newTtl = Duration.between(Instant.now(), newExpire)
+ log.info "Container request '${entry.requestId}' expiration is extended by: ${newTtl}; at: ${newExpire}; (was: ${entry.expiration})"
+ containerRequestStore.put(entry.requestId, request, newTtl)
+ // update the expiration record
+ requestRecord.expiration = newExpire
+ persistenceService.saveContainerRequestAsync(requestRecord)
+ // schedule a new refresh event
+ scheduleRefresh(entry.withExpiration(newExpire))
+ }
+
+ protected Workflow describeWorkflow(ContainerRequest request) {
+ final resp = towerClient.describeWorkflow(
+ request.identity.towerEndpoint,
+ JwtAuth.of(request.identity),
+ request.identity.workspaceId,
+ request.identity.workflowId)
+ return resp?.workflow
+ }
+
+ protected boolean isWorkflowActive(Workflow workflow) {
+ return workflow
+ ? workflow.status==Workflow.WorkflowStatus.SUBMITTED || workflow.status==Workflow.WorkflowStatus.RUNNING
+ : null
+ }
+
}
diff --git a/src/main/groovy/io/seqera/wave/service/request/ContainerRequestStore.groovy b/src/main/groovy/io/seqera/wave/service/request/ContainerRequestStore.groovy
index 9fa670e31..0ccebf63f 100644
--- a/src/main/groovy/io/seqera/wave/service/request/ContainerRequestStore.groovy
+++ b/src/main/groovy/io/seqera/wave/service/request/ContainerRequestStore.groovy
@@ -17,6 +17,9 @@
*/
package io.seqera.wave.service.request
+
+import java.time.Duration
+
/**
* Define the container request token persistence operations
*
@@ -27,6 +30,8 @@ interface ContainerRequestStore {
void put(String key, ContainerRequest request)
+ void put(String key, ContainerRequest request, Duration ttl)
+
ContainerRequest get(String key)
void remove(String key)
diff --git a/src/main/groovy/io/seqera/wave/service/request/ContainerRequestStoreImpl.groovy b/src/main/groovy/io/seqera/wave/service/request/ContainerRequestStoreImpl.groovy
index a1d4cd97b..ed4fab28d 100644
--- a/src/main/groovy/io/seqera/wave/service/request/ContainerRequestStoreImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/service/request/ContainerRequestStoreImpl.groovy
@@ -22,7 +22,7 @@ import java.time.Duration
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
-import io.seqera.wave.configuration.TokenConfig
+import io.seqera.wave.configuration.ContainerRequestConfig
import io.seqera.wave.encoder.MoshiEncodeStrategy
import io.seqera.wave.store.state.AbstractStateStore
import io.seqera.wave.store.state.impl.StateProvider
@@ -37,12 +37,12 @@ import jakarta.inject.Singleton
@CompileStatic
class ContainerRequestStoreImpl extends AbstractStateStore implements ContainerRequestStore {
- private TokenConfig tokenConfig
+ private ContainerRequestConfig config
- ContainerRequestStoreImpl(StateProvider delegate, TokenConfig tokenConfig) {
+ ContainerRequestStoreImpl(StateProvider delegate, ContainerRequestConfig config) {
super(delegate, new MoshiEncodeStrategy(){})
- this.tokenConfig = tokenConfig
- log.info "Creating Tokens cache store ― duration=${tokenConfig.cache.duration}"
+ this.config = config
+ log.info "Creating Tokens cache store ― duration=${config.cache.duration}"
}
@Override
@@ -52,7 +52,7 @@ class ContainerRequestStoreImpl extends AbstractStateStore imp
@Override
protected Duration getDuration() {
- return tokenConfig.cache.duration
+ return config.cache.duration
}
@Override
@@ -65,6 +65,11 @@ class ContainerRequestStoreImpl extends AbstractStateStore imp
super.put(key, value)
}
+ @Override
+ void put(String key, ContainerRequest value, Duration ttl) {
+ super.put(key, value, ttl)
+ }
+
@Override
void remove(String key) {
super.remove(key)
diff --git a/src/main/groovy/io/seqera/wave/store/range/AbstractRangeStore.groovy b/src/main/groovy/io/seqera/wave/store/range/AbstractRangeStore.groovy
index de1a6cf23..8f9bc9d85 100644
--- a/src/main/groovy/io/seqera/wave/store/range/AbstractRangeStore.groovy
+++ b/src/main/groovy/io/seqera/wave/store/range/AbstractRangeStore.groovy
@@ -36,11 +36,17 @@ abstract class AbstractRangeStore implements RangeStore {
this.delegate = provider
}
+ /**
+ * {@inheritDoc}
+ */
@Override
- void add(String name, double score) {
- delegate.add(getKey(), name, score)
+ void add(String entry, double score) {
+ delegate.add(getKey(), entry, score)
}
+ /**
+ * {@inheritDoc}
+ */
@Override
List getRange(double min, double max, int count) {
return getRange(min, max, count, true)
diff --git a/src/main/groovy/io/seqera/wave/store/range/RangeStore.groovy b/src/main/groovy/io/seqera/wave/store/range/RangeStore.groovy
index 94e1f905e..d1a35c3e3 100644
--- a/src/main/groovy/io/seqera/wave/store/range/RangeStore.groovy
+++ b/src/main/groovy/io/seqera/wave/store/range/RangeStore.groovy
@@ -24,7 +24,27 @@ package io.seqera.wave.store.range
*/
interface RangeStore {
- void add(String member, double score)
+ /**
+ * Add an entry to the range with the specified score
+ *
+ * @param name
+ * The name of the entry to be added
+ * @param score
+ * The score of the entry as {@code double} value
+ */
+ void add(String entry, double score)
+ /**
+ * Get a list of entries having a score within the specified range
+ *
+ * @param min
+ * The range lower bound
+ * @param max
+ * The range upper bound
+ * @param count
+ * The max number of entries that can be returned
+ * @return
+ * The list of entries matching the specified range or an empty list if no entry matches the range specified
+ */
List getRange(double min, double max, int count)
}
diff --git a/src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy b/src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy
index 9e040da47..d7b9ed389 100644
--- a/src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy
+++ b/src/main/groovy/io/seqera/wave/tower/auth/JwtConfig.groovy
@@ -50,7 +50,7 @@ class JwtConfig {
Duration monitorInterval
/**
- * Determine the delay after which the JWT monitor service is launcher on bootstrap
+ * Determine the delay after which the JWT monitor service is launched after bootstrap
*/
@Value('${wave.jwt.monitor.delay:5s}')
Duration monitorDelay
diff --git a/src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy b/src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy
index 69c0ffd66..89566a1e9 100644
--- a/src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy
+++ b/src/main/groovy/io/seqera/wave/tower/auth/JwtMonitor.groovy
@@ -24,7 +24,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Context
import io.micronaut.scheduling.TaskScheduler
-import io.seqera.wave.configuration.TokenConfig
+import io.seqera.wave.configuration.ContainerRequestConfig
import io.seqera.wave.tower.client.TowerClient
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
@@ -58,7 +58,7 @@ class JwtMonitor implements Runnable {
private JwtConfig jwtConfig
@Inject
- private TokenConfig tokenConfig
+ private ContainerRequestConfig tokenConfig
@PostConstruct
private init() {
diff --git a/src/main/groovy/io/seqera/wave/configuration/TokenConfig.groovy b/src/main/groovy/io/seqera/wave/tower/client/DescribeWorkflowResponse.groovy
similarity index 54%
rename from src/main/groovy/io/seqera/wave/configuration/TokenConfig.groovy
rename to src/main/groovy/io/seqera/wave/tower/client/DescribeWorkflowResponse.groovy
index 8331d8d58..c0d06fcbf 100644
--- a/src/main/groovy/io/seqera/wave/configuration/TokenConfig.groovy
+++ b/src/main/groovy/io/seqera/wave/tower/client/DescribeWorkflowResponse.groovy
@@ -16,36 +16,23 @@
* along with this program. If not, see .
*/
-package io.seqera.wave.configuration
+package io.seqera.wave.tower.client
-import java.time.Duration
-import io.micronaut.core.annotation.Nullable
+import groovy.transform.CompileStatic
+import groovy.transform.EqualsAndHashCode
+import groovy.transform.ToString
+import io.seqera.wave.encoder.MoshiSerializable
-import io.micronaut.context.annotation.ConfigurationProperties
-import io.micronaut.core.bind.annotation.Bindable
/**
- * Configuration to be used by a TokenService
- *
- * @author : jorge
+ * Model a Platform workflow response
*
+ * @author Paolo Di Tommaso
*/
-@ConfigurationProperties('wave.tokens')
-interface TokenConfig {
-
- Cache getCache()
-
- @ConfigurationProperties('cache')
- interface Cache {
-
- @Bindable(defaultValue = "1h")
- @Nullable
- Duration getDuration()
-
- @Deprecated
- @Bindable(defaultValue = "10000")
- @Nullable
- int getMaxSize()
+@EqualsAndHashCode
+@CompileStatic
+@ToString(includePackage = false, includeNames = true)
+class DescribeWorkflowResponse implements MoshiSerializable {
- }
+ Workflow workflow
}
diff --git a/src/main/groovy/io/seqera/wave/tower/client/TowerClient.groovy b/src/main/groovy/io/seqera/wave/tower/client/TowerClient.groovy
index 9a1e3ff68..8bd9cd238 100644
--- a/src/main/groovy/io/seqera/wave/tower/client/TowerClient.groovy
+++ b/src/main/groovy/io/seqera/wave/tower/client/TowerClient.groovy
@@ -56,6 +56,8 @@ class TowerClient {
@Value('${wave.pairing.cache.long.duration:24h}')
private Duration cacheLongDuration
+ @Value('${wave.pairing.cache.workflow.duration:10m}')
+ private Duration cacheWorkflowDuration
protected CompletableFuture getAsync(URI uri, String endpoint, @Nullable JwtAuth authorization, Class type) {
assert uri, "Missing uri argument"
@@ -149,6 +151,21 @@ class TowerClient {
return URI.create(uri)
}
+ DescribeWorkflowResponse describeWorkflow(String endpoint, JwtAuth authorization, Long workspaceId, String workflowId) {
+ final uri = workflowDescribeEndpoint(endpoint,workspaceId,workflowId)
+ final k = RegHelper.sipHash(uri, authorization.key, workspaceId, workflowId)
+ // NOTE: it assumes the workflow definition cannot change for the specified 'workflowId'
+ // and therefore the *long* expiration cached is used
+ return get0(uri, endpoint, authorization, DescribeWorkflowResponse.class, k, cacheWorkflowDuration) as DescribeWorkflowResponse
+ }
+
+ protected static URI workflowDescribeEndpoint(String endpoint, Long workspaceId, String workflowId) {
+ def uri = "${checkEndpoint(endpoint)}/workflow/${workflowId}"
+ if( workspaceId!=null )
+ uri += "?workspaceId=$workspaceId"
+ return URI.create(uri)
+ }
+
/** Only for testing - do not use */
protected void invalidateCache() {
cache.invalidateAll()
diff --git a/src/main/groovy/io/seqera/wave/tower/client/Workflow.groovy b/src/main/groovy/io/seqera/wave/tower/client/Workflow.groovy
new file mode 100644
index 000000000..a9ac5f49b
--- /dev/null
+++ b/src/main/groovy/io/seqera/wave/tower/client/Workflow.groovy
@@ -0,0 +1,58 @@
+/*
+ * Wave, containers provisioning service
+ * Copyright (c) 2023-2024, Seqera Labs
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package io.seqera.wave.tower.client
+
+import java.time.OffsetDateTime
+
+import groovy.transform.CompileStatic
+import groovy.transform.EqualsAndHashCode
+import groovy.transform.ToString
+import io.seqera.wave.encoder.MoshiSerializable
+
+/**
+ * Model a Platform workflow run
+ *
+ * @author Paolo Di Tommaso
+ */
+@EqualsAndHashCode
+@CompileStatic
+@ToString(includePackage = false, includeNames = true)
+class Workflow implements MoshiSerializable {
+
+ enum WorkflowStatus {
+ SUBMITTED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ CANCELLED,
+ UNKNOWN
+ }
+
+ String id
+ OffsetDateTime submit
+ OffsetDateTime start
+ OffsetDateTime complete
+ OffsetDateTime dateCreated
+ OffsetDateTime lastUpdated
+ String runName
+ String sessionId
+ String workDir
+ String launchId
+ WorkflowStatus status
+}
diff --git a/src/main/groovy/io/seqera/wave/tower/client/cache/ClientCache.groovy b/src/main/groovy/io/seqera/wave/tower/client/cache/ClientCache.groovy
index 5a8dbcd95..a0b535f96 100644
--- a/src/main/groovy/io/seqera/wave/tower/client/cache/ClientCache.groovy
+++ b/src/main/groovy/io/seqera/wave/tower/client/cache/ClientCache.groovy
@@ -31,8 +31,10 @@ import io.seqera.wave.store.cache.AbstractTieredCache
import io.seqera.wave.store.cache.L2TieredCache
import io.seqera.wave.tower.User
import io.seqera.wave.tower.client.CredentialsDescription
+import io.seqera.wave.tower.client.DescribeWorkflowResponse
import io.seqera.wave.tower.client.GetCredentialsKeysResponse
import io.seqera.wave.tower.client.ListCredentialsResponse
+import io.seqera.wave.tower.client.Workflow
import io.seqera.wave.tower.client.GetUserInfoResponse
import io.seqera.wave.tower.compute.ComputeEnv
import io.seqera.wave.tower.compute.DescribeWorkflowLaunchResponse
@@ -86,9 +88,10 @@ class ClientCache extends AbstractTieredCache {
.withSubtype(GetUserInfoResponse.class, GetUserInfoResponse.simpleName)
.withSubtype(User.class, User.simpleName)
.withSubtype(WorkflowLaunch.class, WorkflowLaunch.simpleName)
+ .withSubtype(DescribeWorkflowResponse.class, DescribeWorkflowResponse.simpleName)
+ .withSubtype(Workflow.class, Workflow.simpleName)
// add legacy classes
.withSubtype(GetUserInfoResponse.class, 'UserInfoResponse')
.withSubtype(WorkflowLaunch.class, 'WorkflowLaunchResponse')
-
}
}
diff --git a/src/test/groovy/io/seqera/wave/service/request/ContainerRequestRangeTest.groovy b/src/test/groovy/io/seqera/wave/service/request/ContainerRequestRangeTest.groovy
new file mode 100644
index 000000000..1325134e0
--- /dev/null
+++ b/src/test/groovy/io/seqera/wave/service/request/ContainerRequestRangeTest.groovy
@@ -0,0 +1,62 @@
+/*
+ * Wave, containers provisioning service
+ * Copyright (c) 2023-2024, Seqera Labs
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package io.seqera.wave.service.request
+
+import spock.lang.Specification
+
+import java.time.Duration
+import java.time.Instant
+
+import io.micronaut.test.extensions.spock.annotation.MicronautTest
+import jakarta.inject.Inject
+
+/**
+ *
+ * @author Paolo Di Tommaso
+ */
+@MicronautTest
+class ContainerRequestRangeTest extends Specification {
+
+ @Inject
+ ContainerRequestRange range
+
+ def 'should return an empty list when no entries are avail' () {
+ expect:
+ range.getEntriesUntil(Instant.now(), 10) == []
+ }
+
+ def 'should add and retrieve some values' () {
+ given:
+ def now = Instant.now()
+ and:
+ def e1 = new ContainerRequestRange.Entry('cr-1', 'wf-1',now)
+ def e2 = new ContainerRequestRange.Entry('cr-2', 'wf-2',now)
+ def e3 = new ContainerRequestRange.Entry('cr-3', 'wf-3',now)
+ and:
+ range.add(e1, now- Duration.ofSeconds(2))
+ range.add(e2, now- Duration.ofSeconds(1))
+ range.add(e3, now+ Duration.ofMillis(600))
+
+ expect:
+ range.getEntriesUntil(now, 10) == [e1, e2]
+ and:
+ range.getEntriesUntil(now.plusSeconds(1), 10) == [e3]
+ }
+
+}
diff --git a/src/test/groovy/io/seqera/wave/service/request/ContainerRequestServiceImplTest.groovy b/src/test/groovy/io/seqera/wave/service/request/ContainerRequestServiceImplTest.groovy
index 51b95c601..32e2abd70 100644
--- a/src/test/groovy/io/seqera/wave/service/request/ContainerRequestServiceImplTest.groovy
+++ b/src/test/groovy/io/seqera/wave/service/request/ContainerRequestServiceImplTest.groovy
@@ -21,7 +21,7 @@ package io.seqera.wave.service.request
import spock.lang.Specification
import io.micronaut.test.extensions.spock.annotation.MicronautTest
-import io.seqera.wave.configuration.TokenConfig
+import io.seqera.wave.configuration.ContainerRequestConfig
import io.seqera.wave.tower.PlatformId
import io.seqera.wave.tower.User
import jakarta.inject.Inject
@@ -33,7 +33,7 @@ import jakarta.inject.Inject
class ContainerRequestServiceImplTest extends Specification {
@Inject
- private TokenConfig config
+ private ContainerRequestConfig config
@Inject
private ContainerRequestStoreImpl requestStore
diff --git a/src/test/groovy/io/seqera/wave/tower/client/DescribeWorkflowResponseTest.groovy b/src/test/groovy/io/seqera/wave/tower/client/DescribeWorkflowResponseTest.groovy
new file mode 100644
index 000000000..517dd0199
--- /dev/null
+++ b/src/test/groovy/io/seqera/wave/tower/client/DescribeWorkflowResponseTest.groovy
@@ -0,0 +1,53 @@
+/*
+ * Wave, containers provisioning service
+ * Copyright (c) 2023-2024, Seqera Labs
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package io.seqera.wave.tower.client
+
+import spock.lang.Specification
+
+import java.time.OffsetDateTime
+
+import io.seqera.wave.util.JacksonHelper
+
+/**
+ *
+ * @author Paolo Di Tommaso
+ */
+class DescribeWorkflowResponseTest extends Specification {
+
+ def 'should deserialize workflow' () {
+ given:
+ def PAYLOAD = '{"workflow":{"id":"1ApI9mt8QUZROT","submit":"2024-12-26T11:54:01.659781+01:00","start":"2024-12-26T11:54:26.386791+01:00","complete":null,"dateCreated":"2024-12-26T11:54:01.668816+01:00","lastUpdated":"2024-12-26T11:54:26.387903+01:00","runName":"angry_curie","sessionId":"8bbfd641-1992-40c1-899b-8d90d0eb86c3","profile":"standard","workDir":"/some/work","commitId":"e16e068d7e0d23cea3f520e7f3ec9d8fc5f75dd0","userName":"paolo-ditommaso","scriptId":"c86562f1d8e81f868ab5e1b02ccf0143","revision":"master","commandLine":"nextflow run \'https://github.com/pditommaso/nf-sleep\' -name angry_curie -params-file \'http://localhost:8000/api/ephemeral/vn1YShW5g3WXlm7aWPn2TA.yaml\' -with-tower \'http://localhost:8000/api\'","projectName":"pditommaso/nf-sleep","scriptName":"main.nf","launchId":"1UXIbhO3N0yZqxGtxPPHM1","status":"RUNNING","requiresAttention":false,"configFiles":["/Users/pditommaso/.nextflow/assets/pditommaso/nf-sleep/nextflow.config","/Users/pditommaso/Projects/nf-tower-cloud/tower-backend/work/nf-1ApI9mt8QUZROT.config"],"params":{"forks":1,"exit":0,"times":1,"cmd":"echo \'Hello (timeout 200)\'","timeout":200},"configText":"process {\\n container = \'quay.io/nextflow/bash\'\\n}\\n\\ntimeline {\\n enabled = true\\n file = \'timeline-1ApI9mt8QUZROT.html\'\\n}\\n\\nwave {\\n enabled = true\\n endpoint = \'https://reg.ngrok.io\'\\n}\\n\\ndocker {\\n enabled = true\\n envWhitelist = \'AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY\'\\n}\\n\\nparams {\\n timeout = 200\\n}\\n\\nrunName = \'angry_curie\'\\nworkDir = \'/Users/pditommaso/Projects/nf-tower-cloud/tower-backend/work\'\\n\\ntower {\\n enabled = true\\n endpoint = \'http://localhost:8000/api\'\\n}\\n","manifest":{"nextflowVersion":null,"defaultBranch":null,"version":null,"homePage":null,"gitmodules":null,"description":null,"name":null,"mainScript":"main.nf","author":null},"nextflow":{"version":"24.10.3","build":"5933","timestamp":"2024-12-16T15:34:00Z"},"stats":null,"errorMessage":null,"errorReport":null,"deleted":null,"peakLoadCpus":null,"peakLoadTasks":null,"peakLoadMemory":null,"projectDir":"/Users/pditommaso/.nextflow/assets/pditommaso/nf-sleep","homeDir":"/Users/pditommaso","container":"quay.io/nextflow/bash","repository":"https://github.com/pditommaso/nf-sleep","containerEngine":"docker","scriptFile":"/Users/pditommaso/.nextflow/assets/pditommaso/nf-sleep/main.nf","launchDir":"/Users/pditommaso/Projects/nf-tower-cloud/tower-backend/work","duration":null,"exitStatus":null,"resume":false,"success":null,"logFile":null,"outFile":null,"operationId":null,"ownerId":1},"progress":{"workflowProgress":{"cpus":0,"cpuTime":0,"cpuLoad":0,"memoryRss":0,"memoryReq":0,"readBytes":0,"writeBytes":0,"volCtxSwitch":0,"invCtxSwitch":0,"cost":null,"loadTasks":0,"loadCpus":1,"loadMemory":0,"peakCpus":1,"peakTasks":1,"peakMemory":0,"executors":["local"],"dateCreated":"2024-12-26T11:54:35.038805+01:00","lastUpdated":"2024-12-26T11:54:35.038806+01:00","running":1,"cached":0,"failed":0,"pending":0,"submitted":0,"succeeded":0,"memoryEfficiency":0.0,"cpuEfficiency":0.0},"processesProgress":[{"process":"foo","cpus":0,"cpuTime":0,"cpuLoad":0,"memoryRss":0,"memoryReq":0,"readBytes":0,"writeBytes":0,"volCtxSwitch":0,"invCtxSwitch":0,"loadTasks":0,"loadCpus":1,"loadMemory":0,"peakCpus":1,"peakTasks":1,"peakMemory":0,"dateCreated":"2024-12-26T11:55:36.201251+01:00","lastUpdated":"2024-12-26T11:55:36.201252+01:00","running":1,"cached":0,"failed":0,"pending":0,"submitted":0,"succeeded":0,"memoryEfficiency":0.0,"cpuEfficiency":0.0}]},"platform":{"id":"local-platform","name":"Local Launch Platform for testing"}}'
+
+ when:
+ def resp = JacksonHelper.fromJson(PAYLOAD,DescribeWorkflowResponse)
+ then:
+ resp.workflow
+ and:
+ resp.workflow.id == '1ApI9mt8QUZROT'
+ resp.workflow.submit == OffsetDateTime.parse('2024-12-26T11:54:01.659781+01:00')
+ resp.workflow.start == OffsetDateTime.parse('2024-12-26T11:54:26.386791+01:00')
+ resp.workflow.dateCreated == OffsetDateTime.parse('2024-12-26T11:54:01.668816+01:00')
+ resp.workflow.lastUpdated == OffsetDateTime.parse('2024-12-26T11:54:26.387903+01:00')
+ resp.workflow.runName == 'angry_curie'
+ resp.workflow.sessionId == '8bbfd641-1992-40c1-899b-8d90d0eb86c3'
+ resp.workflow.launchId == '1UXIbhO3N0yZqxGtxPPHM1'
+ resp.workflow.workDir == '/some/work'
+ }
+
+}
diff --git a/src/test/groovy/io/seqera/wave/tower/client/TowerClientTest.groovy b/src/test/groovy/io/seqera/wave/tower/client/TowerClientTest.groovy
index b3006d07a..dfb3aeb8a 100644
--- a/src/test/groovy/io/seqera/wave/tower/client/TowerClientTest.groovy
+++ b/src/test/groovy/io/seqera/wave/tower/client/TowerClientTest.groovy
@@ -150,4 +150,15 @@ class TowerClientTest extends Specification{
'http://foo.com' | null | 'abc' | 'http://foo.com/workflow/abc/launch'
'http://foo.com' | 12345 | 'abc' | 'http://foo.com/workflow/abc/launch?workspaceId=12345'
}
+
+ @Unroll
+ def 'should get workflow describe endpoint' () {
+ expect:
+ TowerClient.workflowDescribeEndpoint(ENDPOINT, WORKSPACE, WORKFLOW) == new URI(EXPECTED)
+
+ where:
+ ENDPOINT | WORKSPACE | WORKFLOW | EXPECTED
+ 'http://foo.com' | null | 'abc' | 'http://foo.com/workflow/abc'
+ 'http://foo.com' | 12345 | 'abc' | 'http://foo.com/workflow/abc?workspaceId=12345'
+ }
}
diff --git a/src/test/groovy/io/seqera/wave/tower/client/cache/ClientCacheTest.groovy b/src/test/groovy/io/seqera/wave/tower/client/cache/ClientCacheTest.groovy
index 6c915a0c7..25486318e 100644
--- a/src/test/groovy/io/seqera/wave/tower/client/cache/ClientCacheTest.groovy
+++ b/src/test/groovy/io/seqera/wave/tower/client/cache/ClientCacheTest.groovy
@@ -22,15 +22,20 @@ import spock.lang.Shared
import spock.lang.Specification
import java.time.Duration
+import java.time.Instant
+import java.time.OffsetDateTime
+import java.time.ZoneOffset
import io.micronaut.context.ApplicationContext
import io.seqera.fixtures.redis.RedisTestContainer
import io.seqera.wave.store.cache.RedisL2TieredCache
import io.seqera.wave.tower.User
import io.seqera.wave.tower.client.CredentialsDescription
+import io.seqera.wave.tower.client.DescribeWorkflowResponse
import io.seqera.wave.tower.client.GetCredentialsKeysResponse
import io.seqera.wave.tower.client.GetUserInfoResponse
import io.seqera.wave.tower.client.ListCredentialsResponse
+import io.seqera.wave.tower.client.Workflow
import io.seqera.wave.tower.compute.ComputeEnv
import io.seqera.wave.tower.compute.DescribeWorkflowLaunchResponse
import io.seqera.wave.tower.compute.WorkflowLaunch
@@ -98,7 +103,7 @@ class ClientCacheTest extends Specification implements RedisTestContainer {
cache2.get(k) == resp
}
- def 'should cache describe workflow response' () {
+ def 'should cache describe workflow launch response' () {
given:
def TTL = Duration.ofSeconds(1)
def store = applicationContext.getBean(RedisL2TieredCache)
@@ -115,6 +120,27 @@ class ClientCacheTest extends Specification implements RedisTestContainer {
cache2.get(k) == resp
}
+ def 'should cache describe workflow response' () {
+ given:
+ def TTL = Duration.ofSeconds(1)
+ def store = applicationContext.getBean(RedisL2TieredCache)
+ def cache1 = new ClientCache(store)
+ def cache2 = new ClientCache(store)
+ and:
+ def k = UUID.randomUUID().toString()
+ def now = Instant.now()
+ def submit = OffsetDateTime.ofInstant(now.minusSeconds(500), ZoneOffset.UTC);
+ def start = OffsetDateTime.ofInstant(now.minusSeconds(400), ZoneOffset.UTC);
+ def done = OffsetDateTime.ofInstant(now.minusSeconds(100), ZoneOffset.UTC);
+ def workflow = new Workflow(id:'wf-123', submit: submit, start: start, complete: done, status: Workflow.WorkflowStatus.SUCCEEDED)
+ def resp = new DescribeWorkflowResponse(workflow: workflow)
+
+ when:
+ cache1.put(k, resp, TTL)
+ then:
+ cache2.get(k) == resp
+ }
+
def 'should de-serialize legacy UserInfoResponse' () {
given:
def LEGACY = '{"expiresAt":1735599710424,"value":{"@type":"UserInfoResponse","user":{"email":"foo@bar.com","id":37,"userName":"foo"}}}'
@@ -130,5 +156,4 @@ class ClientCacheTest extends Specification implements RedisTestContainer {
def resp = entry.value as GetUserInfoResponse
resp.user == new User(id:37, email: "foo@bar.com", userName: "foo")
}
-
}