Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class PostgresConfiguration {
public static final String SSL_MODE_DEFAULT_VALUE = "allow";
public static final String JOOQ_REACTIVE_TIMEOUT = "jooq.reactive.timeout";
public static final Duration JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE = Duration.ofSeconds(10);
public static final String ATTACHMENT_STORAGE_ENABLED = "attachment.storage.enabled";
public static final boolean ATTACHMENT_STORAGE_ENABLED_DEFAULT_VALUE = true;

public static class Credential {
private final String username;
Expand Down Expand Up @@ -95,6 +97,7 @@ public static class Builder {
private Optional<Integer> byPassRLSPoolMaxSize = Optional.empty();
private Optional<String> sslMode = Optional.empty();
private Optional<Duration> jooqReactiveTimeout = Optional.empty();
private Optional<Boolean> attachmentStorageEnabled = Optional.empty();

public Builder databaseName(String databaseName) {
this.databaseName = Optional.of(databaseName);
Expand Down Expand Up @@ -241,6 +244,16 @@ public Builder jooqReactiveTimeout(Optional<Duration> jooqReactiveTimeout) {
return this;
}

public Builder attachmentStorageEnabled(Optional<Boolean> attachmentStorageEnabled) {
this.attachmentStorageEnabled = attachmentStorageEnabled;
return this;
}

public Builder attachmentStorageEnabled(Boolean attachmentStorageEnabled) {
this.attachmentStorageEnabled = Optional.of(attachmentStorageEnabled);
return this;
}

public PostgresConfiguration build() {
Preconditions.checkArgument(username.isPresent() && !username.get().isBlank(), "You need to specify username");
Preconditions.checkArgument(password.isPresent() && !password.get().isBlank(), "You need to specify password");
Expand All @@ -251,18 +264,19 @@ public PostgresConfiguration build() {
}

return new PostgresConfiguration(host.orElse(HOST_DEFAULT_VALUE),
port.orElse(PORT_DEFAULT_VALUE),
databaseName.orElse(DATABASE_NAME_DEFAULT_VALUE),
databaseSchema.orElse(DATABASE_SCHEMA_DEFAULT_VALUE),
new Credential(username.get(), password.get()),
new Credential(byPassRLSUser.orElse(username.get()), byPassRLSPassword.orElse(password.get())),
port.orElse(PORT_DEFAULT_VALUE),
databaseName.orElse(DATABASE_NAME_DEFAULT_VALUE),
databaseSchema.orElse(DATABASE_SCHEMA_DEFAULT_VALUE),
new Credential(username.get(), password.get()),
new Credential(byPassRLSUser.orElse(username.get()), byPassRLSPassword.orElse(password.get())),
rowLevelSecurityEnabled.filter(rlsEnabled -> rlsEnabled).map(rlsEnabled -> RowLevelSecurity.ENABLED).orElse(RowLevelSecurity.DISABLED),
poolInitialSize.orElse(POOL_INITIAL_SIZE_DEFAULT_VALUE),
poolMaxSize.orElse(POOL_MAX_SIZE_DEFAULT_VALUE),
byPassRLSPoolInitialSize.orElse(BY_PASS_RLS_POOL_INITIAL_SIZE_DEFAULT_VALUE),
byPassRLSPoolMaxSize.orElse(BY_PASS_RLS_POOL_MAX_SIZE_DEFAULT_VALUE),
SSLMode.fromValue(sslMode.orElse(SSL_MODE_DEFAULT_VALUE)),
jooqReactiveTimeout.orElse(JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE));
poolInitialSize.orElse(POOL_INITIAL_SIZE_DEFAULT_VALUE),
poolMaxSize.orElse(POOL_MAX_SIZE_DEFAULT_VALUE),
byPassRLSPoolInitialSize.orElse(BY_PASS_RLS_POOL_INITIAL_SIZE_DEFAULT_VALUE),
byPassRLSPoolMaxSize.orElse(BY_PASS_RLS_POOL_MAX_SIZE_DEFAULT_VALUE),
SSLMode.fromValue(sslMode.orElse(SSL_MODE_DEFAULT_VALUE)),
jooqReactiveTimeout.orElse(JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE),
attachmentStorageEnabled.orElse(ATTACHMENT_STORAGE_ENABLED_DEFAULT_VALUE));
}
}

Expand All @@ -272,23 +286,24 @@ public static Builder builder() {

public static PostgresConfiguration from(Configuration propertiesConfiguration) {
return builder()
.databaseName(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_NAME)))
.databaseSchema(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_SCHEMA)))
.host(Optional.ofNullable(propertiesConfiguration.getString(HOST)))
.port(propertiesConfiguration.getInt(PORT, PORT_DEFAULT_VALUE))
.username(Optional.ofNullable(propertiesConfiguration.getString(USERNAME)))
.password(Optional.ofNullable(propertiesConfiguration.getString(PASSWORD)))
.byPassRLSUser(Optional.ofNullable(propertiesConfiguration.getString(BY_PASS_RLS_USERNAME)))
.byPassRLSPassword(Optional.ofNullable(propertiesConfiguration.getString(BY_PASS_RLS_PASSWORD)))
.rowLevelSecurityEnabled(propertiesConfiguration.getBoolean(RLS_ENABLED, false))
.poolInitialSize(Optional.ofNullable(propertiesConfiguration.getInteger(POOL_INITIAL_SIZE, null)))
.poolMaxSize(Optional.ofNullable(propertiesConfiguration.getInteger(POOL_MAX_SIZE, null)))
.byPassRLSPoolInitialSize(Optional.ofNullable(propertiesConfiguration.getInteger(BY_PASS_RLS_POOL_INITIAL_SIZE, null)))
.byPassRLSPoolMaxSize(Optional.ofNullable(propertiesConfiguration.getInteger(BY_PASS_RLS_POOL_MAX_SIZE, null)))
.sslMode(Optional.ofNullable(propertiesConfiguration.getString(SSL_MODE)))
.jooqReactiveTimeout(Optional.ofNullable(propertiesConfiguration.getString(JOOQ_REACTIVE_TIMEOUT))
.map(value -> DurationParser.parse(value, ChronoUnit.SECONDS)))
.build();
.databaseName(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_NAME)))
.databaseSchema(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_SCHEMA)))
.host(Optional.ofNullable(propertiesConfiguration.getString(HOST)))
.port(propertiesConfiguration.getInt(PORT, PORT_DEFAULT_VALUE))
.username(Optional.ofNullable(propertiesConfiguration.getString(USERNAME)))
.password(Optional.ofNullable(propertiesConfiguration.getString(PASSWORD)))
.byPassRLSUser(Optional.ofNullable(propertiesConfiguration.getString(BY_PASS_RLS_USERNAME)))
.byPassRLSPassword(Optional.ofNullable(propertiesConfiguration.getString(BY_PASS_RLS_PASSWORD)))
.rowLevelSecurityEnabled(propertiesConfiguration.getBoolean(RLS_ENABLED, false))
.poolInitialSize(Optional.ofNullable(propertiesConfiguration.getInteger(POOL_INITIAL_SIZE, null)))
.poolMaxSize(Optional.ofNullable(propertiesConfiguration.getInteger(POOL_MAX_SIZE, null)))
.byPassRLSPoolInitialSize(Optional.ofNullable(propertiesConfiguration.getInteger(BY_PASS_RLS_POOL_INITIAL_SIZE, null)))
.byPassRLSPoolMaxSize(Optional.ofNullable(propertiesConfiguration.getInteger(BY_PASS_RLS_POOL_MAX_SIZE, null)))
.sslMode(Optional.ofNullable(propertiesConfiguration.getString(SSL_MODE)))
.jooqReactiveTimeout(Optional.ofNullable(propertiesConfiguration.getString(JOOQ_REACTIVE_TIMEOUT))
.map(value -> DurationParser.parse(value, ChronoUnit.SECONDS)))
.attachmentStorageEnabled(propertiesConfiguration.getBoolean(ATTACHMENT_STORAGE_ENABLED, ATTACHMENT_STORAGE_ENABLED_DEFAULT_VALUE))
.build();
}

private final String host;
Expand All @@ -304,12 +319,13 @@ public static PostgresConfiguration from(Configuration propertiesConfiguration)
private final Integer byPassRLSPoolMaxSize;
private final SSLMode sslMode;
private final Duration jooqReactiveTimeout;
private final boolean attachmentStorageEnabled;

private PostgresConfiguration(String host, int port, String databaseName, String databaseSchema,
Credential defaultCredential, Credential byPassRLSCredential, RowLevelSecurity rowLevelSecurity,
Integer poolInitialSize, Integer poolMaxSize,
Integer byPassRLSPoolInitialSize, Integer byPassRLSPoolMaxSize,
SSLMode sslMode, Duration jooqReactiveTimeout) {
SSLMode sslMode, Duration jooqReactiveTimeout, boolean attachmentStorageEnabled) {
this.host = host;
this.port = port;
this.databaseName = databaseName;
Expand All @@ -323,6 +339,7 @@ private PostgresConfiguration(String host, int port, String databaseName, String
this.byPassRLSPoolMaxSize = byPassRLSPoolMaxSize;
this.sslMode = sslMode;
this.jooqReactiveTimeout = jooqReactiveTimeout;
this.attachmentStorageEnabled = attachmentStorageEnabled;
}

public String getHost() {
Expand Down Expand Up @@ -377,9 +394,13 @@ public Duration getJooqReactiveTimeout() {
return jooqReactiveTimeout;
}

public boolean isAttachmentStorageEnabled() {
return attachmentStorageEnabled;
}

@Override
public final int hashCode() {
return Objects.hash(host, port, databaseName, databaseSchema, defaultCredential, byPassRLSCredential, rowLevelSecurity, poolInitialSize, poolMaxSize, sslMode, jooqReactiveTimeout);
return Objects.hash(host, port, databaseName, databaseSchema, defaultCredential, byPassRLSCredential, rowLevelSecurity, poolInitialSize, poolMaxSize, sslMode, jooqReactiveTimeout, attachmentStorageEnabled);
}

@Override
Expand All @@ -388,17 +409,18 @@ public final boolean equals(Object o) {
PostgresConfiguration that = (PostgresConfiguration) o;

return Objects.equals(this.rowLevelSecurity, that.rowLevelSecurity)
&& Objects.equals(this.host, that.host)
&& Objects.equals(this.port, that.port)
&& Objects.equals(this.defaultCredential, that.defaultCredential)
&& Objects.equals(this.byPassRLSCredential, that.byPassRLSCredential)
&& Objects.equals(this.databaseName, that.databaseName)
&& Objects.equals(this.databaseSchema, that.databaseSchema)
&& Objects.equals(this.poolInitialSize, that.poolInitialSize)
&& Objects.equals(this.poolMaxSize, that.poolMaxSize)
&& Objects.equals(this.sslMode, that.sslMode)
&& Objects.equals(this.jooqReactiveTimeout, that.jooqReactiveTimeout);
&& Objects.equals(this.host, that.host)
&& Objects.equals(this.port, that.port)
&& Objects.equals(this.defaultCredential, that.defaultCredential)
&& Objects.equals(this.byPassRLSCredential, that.byPassRLSCredential)
&& Objects.equals(this.databaseName, that.databaseName)
&& Objects.equals(this.databaseSchema, that.databaseSchema)
&& Objects.equals(this.poolInitialSize, that.poolInitialSize)
&& Objects.equals(this.poolMaxSize, that.poolMaxSize)
&& Objects.equals(this.sslMode, that.sslMode)
&& Objects.equals(this.jooqReactiveTimeout, that.jooqReactiveTimeout)
&& Objects.equals(this.attachmentStorageEnabled, that.attachmentStorageEnabled);
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import jakarta.inject.Inject;

import org.apache.james.backends.postgres.PostgresConfiguration;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.core.Username;
import org.apache.james.events.Event;
Expand Down Expand Up @@ -63,20 +64,23 @@ public static class DeleteMessageListenerGroup extends Group {
private final PostgresMailboxMessageDAO.Factory mailboxMessageDAOFactory;
private final PostgresAttachmentDAO.Factory attachmentDAOFactory;
private final PostgresThreadDAO.Factory threadDAOFactory;
private final PostgresConfiguration postgresConfiguration;

@Inject
public DeleteMessageListener(BlobStore blobStore,
PostgresMailboxMessageDAO.Factory mailboxMessageDAOFactory,
PostgresMessageDAO.Factory messageDAOFactory,
PostgresAttachmentDAO.Factory attachmentDAOFactory,
PostgresThreadDAO.Factory threadDAOFactory,
PostgresConfiguration postgresConfiguration,
Set<DeletionCallback> deletionCallbackList) {
this.messageDAOFactory = messageDAOFactory;
this.mailboxMessageDAOFactory = mailboxMessageDAOFactory;
this.blobStore = blobStore;
this.deletionCallbackList = deletionCallbackList;
this.attachmentDAOFactory = attachmentDAOFactory;
this.threadDAOFactory = threadDAOFactory;
this.postgresConfiguration = postgresConfiguration;
}

@Override
Expand Down Expand Up @@ -109,9 +113,9 @@ private Mono<Void> handleMailboxDeletion(MailboxDeletion event) {
PostgresThreadDAO threadDAO = threadDAOFactory.create(event.getUsername().getDomainPart());

return postgresMailboxMessageDAO.deleteByMailboxId((PostgresMailboxId) event.getMailboxId())
.flatMap(msgId -> handleMessageDeletion(postgresMessageDAO, postgresMailboxMessageDAO, attachmentDAO, threadDAO, msgId, event.getMailboxId(), event.getMailboxPath().getUser()),
LOW_CONCURRENCY)
.then();
.flatMap(msgId -> handleMessageDeletion(postgresMessageDAO, postgresMailboxMessageDAO, attachmentDAO, threadDAO, msgId, event.getMailboxId(), event.getMailboxPath().getUser()),
LOW_CONCURRENCY)
.then();
}

private Mono<Void> handleMessageDeletion(Expunged event) {
Expand All @@ -121,11 +125,11 @@ private Mono<Void> handleMessageDeletion(Expunged event) {
PostgresThreadDAO threadDAO = threadDAOFactory.create(event.getUsername().getDomainPart());

return Flux.fromIterable(event.getExpunged()
.values())
.map(MessageMetaData::getMessageId)
.map(PostgresMessageId.class::cast)
.flatMap(msgId -> handleMessageDeletion(postgresMessageDAO, postgresMailboxMessageDAO, attachmentDAO, threadDAO, msgId, event.getMailboxId(), event.getMailboxPath().getUser()), LOW_CONCURRENCY)
.then();
.values())
.map(MessageMetaData::getMessageId)
.map(PostgresMessageId.class::cast)
.flatMap(msgId -> handleMessageDeletion(postgresMessageDAO, postgresMailboxMessageDAO, attachmentDAO, threadDAO, msgId, event.getMailboxId(), event.getMailboxPath().getUser()), LOW_CONCURRENCY)
.then();
}

private Mono<Void> handleMessageDeletion(PostgresMessageDAO postgresMessageDAO,
Expand All @@ -136,40 +140,47 @@ private Mono<Void> handleMessageDeletion(PostgresMessageDAO postgresMessageDAO,
MailboxId mailboxId,
Username owner) {
return Mono.just(messageId)
.filterWhen(msgId -> isUnreferenced(msgId, postgresMailboxMessageDAO))
.flatMap(msgId -> postgresMessageDAO.retrieveMessage(messageId)
.flatMap(executeDeletionCallbacks(mailboxId, owner))
.then(deleteBodyBlob(msgId, postgresMessageDAO))
.then(deleteAttachment(msgId, attachmentDAO))
.then(threadDAO.deleteSome(owner, msgId))
.then(postgresMessageDAO.deleteByMessageId(msgId)));
.filterWhen(msgId -> isUnreferenced(msgId, postgresMailboxMessageDAO))
.flatMap(msgId -> postgresMessageDAO.retrieveMessage(messageId)
.flatMap(executeDeletionCallbacks(mailboxId, owner))
.then(deleteBodyBlob(msgId, postgresMessageDAO))
.then(deleteAttachmentIfEnabled(msgId, attachmentDAO))
.then(threadDAO.deleteSome(owner, msgId))
.then(postgresMessageDAO.deleteByMessageId(msgId)));
}

private Function<MessageRepresentation, Mono<Void>> executeDeletionCallbacks(MailboxId mailboxId, Username owner) {
return messageRepresentation -> Flux.fromIterable(deletionCallbackList)
.concatMap(callback -> callback.forMessage(messageRepresentation, mailboxId, owner))
.then();
.concatMap(callback -> callback.forMessage(messageRepresentation, mailboxId, owner))
.then();
}

private Mono<Void> deleteBodyBlob(PostgresMessageId id, PostgresMessageDAO postgresMessageDAO) {
return postgresMessageDAO.getBodyBlobId(id)
.flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId))
.then());
.flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId))
.then());
}

private Mono<Boolean> isUnreferenced(PostgresMessageId id, PostgresMailboxMessageDAO postgresMailboxMessageDAO) {
return postgresMailboxMessageDAO.existsByMessageId(id)
.map(FunctionalUtils.negate());
.map(FunctionalUtils.negate());
}

private Mono<Void> deleteAttachment(PostgresMessageId messageId, PostgresAttachmentDAO attachmentDAO) {
return deleteAttachmentBlobs(messageId, attachmentDAO)
.then(attachmentDAO.deleteByMessageId(messageId));
.then(attachmentDAO.deleteByMessageId(messageId));
}

private Mono<Void> deleteAttachmentBlobs(PostgresMessageId messageId, PostgresAttachmentDAO attachmentDAO) {
return attachmentDAO.listBlobsByMessageId(messageId)
.flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)), ReactorUtils.DEFAULT_CONCURRENCY)
.then();
.flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)), ReactorUtils.DEFAULT_CONCURRENCY)
.then();
}
}

private Mono<Void> deleteAttachmentIfEnabled(PostgresMessageId messageId, PostgresAttachmentDAO attachmentDAO) {
if (postgresConfiguration.isAttachmentStorageEnabled()) {
return deleteAttachment(messageId, attachmentDAO);
}
return Mono.empty();
}
}
Loading