Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions api/src/org/labkey/api/data/DbScope.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Test;
import org.labkey.api.action.ApiUsageException;
import org.labkey.api.audit.TransactionAuditProvider;
import org.labkey.api.cache.Cache;
import org.labkey.api.data.ConnectionWrapper.Closer;
Expand Down Expand Up @@ -2166,8 +2167,7 @@ public static void closeAllConnectionsForCurrentThread()
}
try
{
LOG.warn("Forcing close of still-pending transaction object. Current stack is ", new Throwable());
LOG.warn("Forcing close of still-pending transaction object started at ", t._creation);
LOG.warn("Forcing close of still-pending transaction object started at {}. Current stack is ", t._creation, new Throwable());
t.close();
}
catch (ConnectionAlreadyReleasedException ignored)
Expand Down Expand Up @@ -2385,12 +2385,17 @@ public void run(TransactionImpl transaction)
// Copy to avoid ConcurrentModificationExceptions, need to retain original order from LinkedHashMap
List<Runnable> tasks = new ArrayList<>(getRunnables(transaction).keySet());

for (Runnable task : tasks)
try
{
task.run();
for (Runnable task : tasks)
{
task.run();
}
}
finally
{
transaction.closeCaches();
}

transaction.closeCaches();
}

public <T extends Runnable> T add(TransactionImpl transaction, T task)
Expand Down Expand Up @@ -2715,16 +2720,17 @@ public void commit()
conn.commit();
conn.setAutoCommit(true);
LOG.debug("setAutoCommit(true)");
if (null != _closeOnClose)
try { _closeOnClose.close(); } catch (Exception ignore) {}
}
finally
{
if (null != _closeOnClose)
try { _closeOnClose.close(); } catch (Exception ignore) {}
if (null != conn)
conn.internalClose();
}

popCurrentTransaction();
// Make sure to pop whether we successfully committed or not
popCurrentTransaction();
}

CommitTaskOption.POSTCOMMIT.run(this);
}
Expand Down Expand Up @@ -3164,6 +3170,24 @@ public void testAutoCommitFailure()
closeAllConnectionsForCurrentThread();
}

@Test
public void tesCommitTaskFailure()
{
String message = "Expected failure";
try (Transaction t = getLabKeyScope().ensureTransaction())
{
t.addCommitTask(() -> { throw new ApiUsageException("Expected failure"); }, CommitTaskOption.PRECOMMIT);
t.commit();
fail("Shouldn't have gotten here, expected ApiUsageException");
}
catch (ApiUsageException e)
{
assertEquals("Bad message", message, e.getMessage());
}
assertFalse(getLabKeyScope().isTransactionActive());
closeAllConnectionsForCurrentThread();
}

@Test
public void testLockReleasedException()
{
Expand Down
13 changes: 12 additions & 1 deletion api/src/org/labkey/api/data/SimpleFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,19 @@ public static DatabaseIdentifier getAliasForColumnFilter(SqlDialect dialect, Col

public static class InClause extends MultiValuedFilterClause
{
private InClauseGenerator _tempTableGenerator = null;

public InClause(FieldKey fieldKey, Collection<?> params)
{
this(fieldKey, params, false, false);
}

public InClause(FieldKey fieldKey, Collection<?> params, InClauseGenerator tempTableGenerator)
{
this(fieldKey, params, false, false);
_tempTableGenerator = tempTableGenerator;
}

public InClause(FieldKey fieldKey, Collection<?> params, boolean urlClause)
{
this(fieldKey, params, urlClause, false);
Expand Down Expand Up @@ -837,7 +845,10 @@ public SQLFragment toSQLFragment(Map<FieldKey, ? extends ColumnInfo> columnMap,
in.appendIdentifier(alias);

// Dialect may want to generate database-specific SQL, especially for very large IN clauses
dialect.appendInClauseSql(in, convertedParams);
if (null == _tempTableGenerator)
dialect.appendInClauseSql(in, convertedParams);
else
dialect.appendInClauseSqlWithCustomInClauseGenerator(in, convertedParams, _tempTableGenerator);

if (isIncludeNull())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,11 @@ public String addReselect(SQLFragment sql, ColumnInfo column, @Nullable String p
@Override
public SQLFragment appendInClauseSql(SQLFragment sql, @NotNull Collection<?> params)
{
return appendInClauseSql(sql, params, _tempTableInClauseGenerator);
return appendInClauseSqlWithCustomInClauseGenerator(sql, params, _tempTableInClauseGenerator);
}

@Override
public SQLFragment appendInClauseSql(SQLFragment sql, @NotNull Collection<?> params, InClauseGenerator tempTableGenerator)
public SQLFragment appendInClauseSqlWithCustomInClauseGenerator(SQLFragment sql, @NotNull Collection<?> params, InClauseGenerator tempTableGenerator)
{
if (params.size() >= TEMPTABLE_GENERATOR_MINSIZE)
{
Expand Down
6 changes: 3 additions & 3 deletions api/src/org/labkey/api/data/dialect/SqlDialect.java
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,11 @@ protected Set<String> getJdbcKeywords(SqlExecutor executor) throws SQLException,
// Most callers should use this method
public SQLFragment appendInClauseSql(SQLFragment sql, @NotNull Collection<?> params)
{
return appendInClauseSql(sql, params, null);
return appendInClauseSqlWithCustomInClauseGenerator(sql, params, null);
}

// Use in cases where the default temp schema won't do, e.g., you need to apply a large IN clause in an external data source
public SQLFragment appendInClauseSql(SQLFragment sql, @NotNull Collection<?> params, InClauseGenerator tempTableGenerator)
// Use only in cases where the default temp-table generator won't do, e.g., you need to apply a large IN clause in an external data source
public SQLFragment appendInClauseSqlWithCustomInClauseGenerator(SQLFragment sql, @NotNull Collection<?> params, InClauseGenerator tempTableGenerator)
{
return DEFAULT_GENERATOR.appendInClauseSql(sql, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ default void beforeMigration(){}
DbScope getTargetScope();
@NotNull Set<String> getSkipSchemas();
Predicate<String> getColumnNameFilter();
@Nullable TableSelector getTableSelector(DbSchemaType schemaType, TableInfo sourceTable, TableInfo targetTable, Set<String> selectColumnNames, MigrationSchemaHandler schemaHandler);
@Nullable TableSelector getTableSelector(DbSchemaType schemaType, TableInfo sourceTable, TableInfo targetTable, Set<String> selectColumnNames, MigrationSchemaHandler schemaHandler, @Nullable MigrationTableHandler tableHandler);
default void copyAttachments(DbSchema sourceSchema, DbSchema targetSchema, MigrationSchemaHandler schemaHandler){}
default void afterMigration(){}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ default void migrate(DatabaseMigrationConfiguration configuration)

// By default, no-op implementations
default void registerSchemaHandler(MigrationSchemaHandler schemaHandler) {}
default void registerTableHandler(MigrationTableHandler tableHandler) {}
default void registerMigrationFilter(MigrationFilter filter) {}

default @Nullable MigrationFilter getMigrationFilter(String propertyName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Predicate<String> getColumnNameFilter()
}

@Override
public TableSelector getTableSelector(DbSchemaType schemaType, TableInfo sourceTable, TableInfo targetTable, Set<String> selectColumnNames, MigrationSchemaHandler schemaHandler)
public TableSelector getTableSelector(DbSchemaType schemaType, TableInfo sourceTable, TableInfo targetTable, Set<String> selectColumnNames, MigrationSchemaHandler schemaHandler, @Nullable MigrationTableHandler tableHandler)
{
return null;
}
Expand Down
43 changes: 32 additions & 11 deletions api/src/org/labkey/api/migration/DefaultMigrationSchemaHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.labkey.api.migration;

import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.labkey.api.attachments.AttachmentService;
Expand Down Expand Up @@ -27,8 +28,11 @@
import org.labkey.api.query.FieldKey;
import org.labkey.api.query.SchemaKey;
import org.labkey.api.query.TableSorter;
import org.labkey.api.util.ConfigurationException;
import org.labkey.api.util.GUID;
import org.labkey.api.util.JobRunner;
import org.labkey.api.util.StringUtilsLabKey;
import org.labkey.api.util.logging.LogHelper;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,10 +42,13 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class DefaultMigrationSchemaHandler implements MigrationSchemaHandler
{
private static final Logger LOG = LogHelper.getLogger(DefaultMigrationSchemaHandler.class, "Migration shutdown status");

private final DbSchema _schema;

public DefaultMigrationSchemaHandler(DbSchema schema)
Expand Down Expand Up @@ -77,7 +84,7 @@ public List<TableInfo> getTablesToCopy()

if (!allTables.isEmpty())
{
DatabaseMigrationService.LOG.info("These tables were removed by TableSorter: {}", allTables);
LOG.info("These tables were removed by TableSorter: {}", allTables);
}

return sortedTables.stream()
Expand Down Expand Up @@ -250,12 +257,13 @@ public void copyAttachments(DatabaseMigrationConfiguration configuration, DbSche
Collection<String> entityIds = new SqlSelector(targetSchema, sql).getCollection(String.class);
SQLFragment selectParents = new SQLFragment("Parent");
// This query against the source database is likely to contain a large IN clause, so use an alternative InClauseGenerator
sourceSchema.getSqlDialect().appendInClauseSql(selectParents, entityIds, getTempTableInClauseGenerator(sourceSchema.getScope()));
sourceSchema.getSqlDialect().appendInClauseSqlWithCustomInClauseGenerator(selectParents, entityIds, getTempTableInClauseGenerator(sourceSchema.getScope()));
copyAttachments(configuration, sourceSchema, new SQLClause(selectParents), type);
}

// TODO: fail if type.getSelectParentEntityIdsSql() returns null?
// TODO: throw if some registered AttachmentType is not seen
else
{
throw new ConfigurationException("AttachmentType \"" + type.getUniqueName() + "\" is not configured to find parent EntityIds!");
}
});
}

Expand All @@ -267,33 +275,46 @@ protected InClauseGenerator getTempTableInClauseGenerator(DbScope sourceScope)
}

private static final Set<AttachmentType> SEEN = new HashSet<>();
private static final JobRunner ATTACHMENT_JOB_RUNNER = new JobRunner("Attachment JobRunner", 1);

// Copy all core.Documents rows that match the provided filter clause
protected void copyAttachments(DatabaseMigrationConfiguration configuration, DbSchema sourceSchema, FilterClause filterClause, AttachmentType... type)
protected final void copyAttachments(DatabaseMigrationConfiguration configuration, DbSchema sourceSchema, FilterClause filterClause, AttachmentType... type)
{
SEEN.addAll(Arrays.asList(type));
String additionalMessage = " associated with " + Arrays.stream(type).map(t -> t.getClass().getSimpleName()).collect(Collectors.joining(", "));
TableInfo sourceDocumentsTable = sourceSchema.getScope().getSchema("core", DbSchemaType.Migration).getTable("Documents");
TableInfo targetDocumentsTable = CoreSchema.getInstance().getTableInfoDocuments();
DatabaseMigrationService.get().copySourceTableToTargetTable(configuration, sourceDocumentsTable, targetDocumentsTable, DbSchemaType.Module, false, additionalMessage, new DefaultMigrationSchemaHandler(CoreSchema.getInstance().getSchema())

// Queue up the core.Documents transfers and let them run in the background
ATTACHMENT_JOB_RUNNER.execute(() -> DatabaseMigrationService.get().copySourceTableToTargetTable(configuration, sourceDocumentsTable, targetDocumentsTable, DbSchemaType.Module, false, additionalMessage, new DefaultMigrationSchemaHandler(CoreSchema.getInstance().getSchema())
{
@Override
public FilterClause getTableFilterClause(TableInfo sourceTable, Set<GUID> containers)
{
return filterClause;
}
});
}));
}

public static void logUnseenAttachmentTypes()
// Global (not schema- or configuration-specific) cleanup
public static void afterMigration() throws InterruptedException
{
// Report any unseen attachment types
Set<AttachmentType> unseen = new HashSet<>(AttachmentService.get().getAttachmentTypes());
unseen.removeAll(SEEN);

if (unseen.isEmpty())
DatabaseMigrationService.LOG.info("All AttachmentTypes have been seen");
LOG.info("All AttachmentTypes have been seen");
else
throw new ConfigurationException("These AttachmentTypes have not been seen: " + unseen.stream().map(type -> type.getClass().getSimpleName()).collect(Collectors.joining(", ")));

// Shut down the attachment JobRunner
LOG.info("Waiting for core.Documents background transfer to complete");
ATTACHMENT_JOB_RUNNER.shutdown();
if (ATTACHMENT_JOB_RUNNER.awaitTermination(1, TimeUnit.HOURS))
LOG.info("core.Documents background transfer is complete");
else
DatabaseMigrationService.LOG.info("These AttachmentTypes have not been seen: {}", unseen.stream().map(type -> type.getClass().getSimpleName()).collect(Collectors.joining(", ")));
LOG.error("core.Documents background transfer did not complete after one hour! Giving up.");
}

@Override
Expand Down
18 changes: 18 additions & 0 deletions api/src/org/labkey/api/migration/MigrationTableHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.labkey.api.migration;

import org.labkey.api.data.SimpleFilter;
import org.labkey.api.data.TableInfo;
import org.labkey.api.util.GUID;

import java.util.Set;

/**
* Rarely needed, this interface lets a module filter the rows of another module's table. The specific use case: LabBook
* needs to filter the compliance.SignedSnapshots table of snapshots associated with Notebooks that are excluded by a
* NotebookFilter.
*/
public interface MigrationTableHandler
{
TableInfo getTableInfo();
void adjustFilter(TableInfo sourceTable, SimpleFilter filter, Set<GUID> containers);
}
26 changes: 14 additions & 12 deletions api/src/org/labkey/api/util/JobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.labkey.api.data.DbScope;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -51,7 +53,7 @@ public class JobRunner implements Executor
private static final JobRunner _defaultJobRunner = new JobRunner("Default", 1);

private final ScheduledThreadPoolExecutor _executor;
private final HashMap<Future, Job> _jobs = new HashMap<>();
private final Map<Future<?>, Job> _jobs = new HashMap<>();


public JobRunner(String name, int max)
Expand All @@ -77,11 +79,6 @@ public void shutdownPre()
{
_executor.shutdown();
}

@Override
public void shutdownStarted()
{
}
});
}

Expand Down Expand Up @@ -111,11 +108,16 @@ public void shutdown()
_executor.shutdown();
}

public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException
{
return _executor.awaitTermination(timeout, unit);
}

/**
* This will schedule the runnable to execute immediately, with no delay
*/
@Override
public void execute(Runnable command)
public void execute(@NotNull Runnable command)
{
execute(command, 0);
}
Expand All @@ -132,7 +134,7 @@ public void execute(Runnable command, long delay)
{
synchronized (_jobs)
{
Future task = _executor.schedule(command, delay, TimeUnit.MILLISECONDS);
Future<?> task = _executor.schedule(command, delay, TimeUnit.MILLISECONDS);
if (command instanceof Job job)
{
job._task = task;
Expand All @@ -141,7 +143,7 @@ public void execute(Runnable command, long delay)
}
}

public Future submit(Runnable run)
public Future<?> submit(Runnable run)
{
if (run instanceof Job)
{
Expand Down Expand Up @@ -221,13 +223,13 @@ protected void afterExecute(Runnable r, Throwable t)
}
else
{
if (r instanceof Future)
if (r instanceof Future<?> f)
{
if (null == t)
{
try
{
((Future)r).get();
f.get();
}
catch (ExecutionException x)
{
Expand Down Expand Up @@ -277,7 +279,7 @@ static class JobThreadFactory implements ThreadFactory
}

@Override
public Thread newThread(Runnable r)
public Thread newThread(@NotNull Runnable r)
{
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
Expand Down
Loading