diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/xml/StaxEventItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/xml/StaxEventItemWriter.java index 1bb4a0d833..286134d48b 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/xml/StaxEventItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/xml/StaxEventItemWriter.java @@ -133,16 +133,6 @@ public class StaxEventItemWriter extends AbstractItemStreamItemWriter // TRUE private boolean overwriteOutput = true; - // file channel - private @Nullable FileChannel channel; - - // wrapper for XML event writer that swallows StartDocument and EndDocument - // events - private @Nullable XMLEventWriter eventWriter; - - // XML event writer - private @Nullable XMLEventWriter delegateEventWriter; - // current count of processed records private long currentRecordCount = 0; @@ -152,8 +142,6 @@ public class StaxEventItemWriter extends AbstractItemStreamItemWriter private @Nullable StaxWriterCallback footerCallback; - private @Nullable Writer bufferedWriter; - private boolean transactional = true; private boolean forceSync; @@ -164,6 +152,8 @@ public class StaxEventItemWriter extends AbstractItemStreamItemWriter private boolean initialized = false; + private @Nullable OutputState state; + // List holding the QName of elements that were opened in the header callback, but not // closed private List unclosedHeaderCallbackElements = Collections.emptyList(); @@ -432,7 +422,7 @@ public void open(ExecutionContext executionContext) { try { if (headerCallback != null) { UnclosedElementCollectingEventWriter headerCallbackWriter = new UnclosedElementCollectingEventWriter( - delegateEventWriter); + state.getDelegateEventWriter()); headerCallback.write(headerCallbackWriter); unclosedHeaderCallbackElements = headerCallbackWriter.getUnclosedElements(); } @@ -449,72 +439,10 @@ public void open(ExecutionContext executionContext) { /** * Helper method for opening output source at given file position */ - @SuppressWarnings("DataFlowIssue") private void open(long position) { - - File file; - FileOutputStream os; - FileChannel fileChannel; - - try { - file = resource.getFile(); - FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput); - Assert.state(resource.exists(), "Output resource must exist"); - os = new FileOutputStream(file, true); - fileChannel = os.getChannel(); - channel = os.getChannel(); - setPosition(position); - } - catch (IOException ioe) { - throw new ItemStreamException("Unable to write to file resource: [" + resource + "]", ioe); - } - - XMLOutputFactory outputFactory = createXmlOutputFactory(); - - if (outputFactory.isPropertySupported("com.ctc.wstx.automaticEndElements")) { - // If the current XMLOutputFactory implementation is supplied by - // Woodstox >= 3.2.9 we want to disable its - // automatic end element feature (see: - // https://jira.codehaus.org/browse/WSTX-165) per - // https://jira.spring.io/browse/BATCH-761). - outputFactory.setProperty("com.ctc.wstx.automaticEndElements", Boolean.FALSE); - } - if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) { - // On restart we don't write the root element so we have to disable - // structural validation (see: - // https://jira.spring.io/browse/BATCH-1681). - outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE); - } - - try { - if (transactional) { - TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(fileChannel, - this::closeStream); - - writer.setEncoding(encoding); - writer.setForceSync(forceSync); - bufferedWriter = writer; - } - else { - bufferedWriter = new BufferedWriter(new OutputStreamWriter(os, encoding)); - } - delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter); - eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter); - initNamespaceContext(delegateEventWriter); - if (!restarted) { - startDocument(delegateEventWriter); - if (forceSync) { - fileChannel.force(false); - } - } - } - catch (UnsupportedEncodingException e) { - throw new ItemStreamException( - "Unable to write to file resource: [" + resource + "] with encoding=[" + encoding + "]", e); - } - catch (XMLStreamException | IOException xse) { - throw new ItemStreamException("Unable to write to file resource: [" + resource + "]", xse); - } + Assert.notNull(resource, "Output resource must be set"); + state = new OutputState(); + state.open(position, restarted); } /** @@ -556,7 +484,7 @@ protected XMLEventFactory createXmlEventFactory() throws FactoryConfigurationErr */ @SuppressWarnings("DataFlowIssue") protected Result createStaxResult() { - return StaxUtils.createStaxResult(eventWriter); + return state.createStaxResult(); } /** @@ -658,26 +586,6 @@ protected void startDocument(XMLEventWriter writer) throws XMLStreamException { } - /** - * Writes the EndDocument tag manually. - * @param writer XML event writer - * @throws XMLStreamException thrown if error occurs. - */ - @SuppressWarnings("DataFlowIssue") - protected void endDocument(XMLEventWriter writer) throws XMLStreamException { - - // writer.writeEndDocument(); <- this doesn't work after restart - // we need to write end tag of the root element manually - - String nsPrefix = !StringUtils.hasText(getRootTagNamespacePrefix()) ? "" : getRootTagNamespacePrefix() + ":"; - try { - bufferedWriter.write(""); - } - catch (IOException ioe) { - throw new XMLStreamException("Unable to close file resource: [" + resource + "]", ioe); - } - } - /** * Flush and close the output source. * @@ -688,52 +596,9 @@ protected void endDocument(XMLEventWriter writer) throws XMLStreamException { public void close() { super.close(); - XMLEventFactory factory = createXmlEventFactory(); try { - delegateEventWriter.add(factory.createCharacters("")); - } - catch (XMLStreamException e) { - log.error(e); - } - - try { - if (footerCallback != null) { - XMLEventWriter footerCallbackWriter = delegateEventWriter; - if (restarted && !unclosedHeaderCallbackElements.isEmpty()) { - footerCallbackWriter = new UnopenedElementClosingEventWriter(delegateEventWriter, bufferedWriter, - unclosedHeaderCallbackElements); - } - footerCallback.write(footerCallbackWriter); - } - delegateEventWriter.flush(); - endDocument(delegateEventWriter); - } - catch (IOException e) { - throw new ItemStreamException("Failed to write footer items", e); - } - catch (XMLStreamException e) { - throw new ItemStreamException("Failed to write end document tag", e); - } - finally { - - try { - delegateEventWriter.close(); - } - catch (XMLStreamException e) { - log.error("Unable to close file resource: [" + resource + "] " + e); - } - finally { - try { - bufferedWriter.close(); - } - catch (IOException e) { - log.error("Unable to close file resource: [" + resource + "] " + e); - } - finally { - if (!transactional) { - closeStream(); - } - } + if (state != null) { + state.close(footerCallback, unclosedHeaderCallbackElements); } if (currentRecordCount == 0 && shouldDeleteIfEmpty) { try { @@ -744,17 +609,9 @@ public void close() { } } } - - this.initialized = false; - } - - @SuppressWarnings("DataFlowIssue") - private void closeStream() { - try { - channel.close(); - } - catch (IOException ioe) { - log.error("Unable to close file resource: [" + resource + "] " + ioe); + finally { + state = null; + initialized = false; } } @@ -768,7 +625,7 @@ private void closeStream() { @Override public void write(Chunk items) throws XmlMappingException, IOException { - if (!this.initialized) { + if (!initialized || state == null || !state.isInitialized()) { throw new WriterNotOpenException("Writer must be open before it can be written to"); } @@ -780,15 +637,8 @@ public void write(Chunk items) throws XmlMappingException, IOExcept Result result = createStaxResult(); marshaller.marshal(object, result); } - try { - eventWriter.flush(); - if (forceSync) { - channel.force(false); - } - } - catch (XMLStreamException | IOException e) { - throw new WriteFailedException("Failed to flush the events", e); - } + + state.flush(); } /** @@ -819,35 +669,260 @@ public void update(ExecutionContext executionContext) { */ @SuppressWarnings("DataFlowIssue") private long getPosition() { - long position; + if (state == null) { + return 0; + } + return state.position(); + } - try { - eventWriter.flush(); - position = channel.position(); - if (bufferedWriter instanceof TransactionAwareBufferedWriter transactionAwareBufferedWriter) { - position += transactionAwareBufferedWriter.getBufferSize(); + protected class OutputState { + + private @Nullable FileOutputStream os; + + // file channel + private @Nullable FileChannel channel; + + private @Nullable Writer bufferedWriter; + + // wrapper for XML event writer that swallows StartDocument and EndDocument + // events + private @Nullable XMLEventWriter eventWriter; + + // XML event writer + private @Nullable XMLEventWriter delegateEventWriter; + + private boolean initialized = false; + + private boolean restarted = false; + + public boolean isInitialized() { + return initialized; + } + + @SuppressWarnings("DataFlowIssue") + public void open(long position, boolean restart) { + + restarted = restart; + + File file; + FileOutputStream osLocal; + FileChannel fileChannel; + + try { + file = resource.getFile(); + FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput); + Assert.state(resource.exists(), "Output resource must exist"); + osLocal = new FileOutputStream(file, true); + fileChannel = osLocal.getChannel(); + os = osLocal; + channel = fileChannel; + setPosition(position); } + catch (IOException ioe) { + throw new ItemStreamException("Unable to write to file resource: [" + resource + "]", ioe); + } + + XMLOutputFactory outputFactory = createXmlOutputFactory(); + + if (outputFactory.isPropertySupported("com.ctc.wstx.automaticEndElements")) { + // If the current XMLOutputFactory implementation is supplied by + // Woodstox >= 3.2.9 we want to disable its + // automatic end element feature (see: + // https://jira.codehaus.org/browse/WSTX-165) per + // https://jira.spring.io/browse/BATCH-761). + outputFactory.setProperty("com.ctc.wstx.automaticEndElements", Boolean.FALSE); + } + if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) { + // On restart we don't write the root element so we have to disable + // structural validation (see: + // https://jira.spring.io/browse/BATCH-1681). + outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE); + } + + try { + if (transactional) { + TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(fileChannel, + this::closeStream); + + writer.setEncoding(encoding); + writer.setForceSync(forceSync); + bufferedWriter = writer; + } + else { + bufferedWriter = new BufferedWriter(new OutputStreamWriter(osLocal, encoding)); + } + delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter); + eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter); + initNamespaceContext(delegateEventWriter); + if (!restarted) { + startDocument(delegateEventWriter); + if (forceSync) { + fileChannel.force(false); + } + } + } + catch (UnsupportedEncodingException e) { + throw new ItemStreamException( + "Unable to write to file resource: [" + resource + "] with encoding=[" + encoding + "]", e); + } + catch (XMLStreamException | IOException xse) { + throw new ItemStreamException("Unable to write to file resource: [" + resource + "]", xse); + } + + initialized = true; } - catch (Exception e) { - throw new ItemStreamException("Unable to write to file resource: [" + resource + "]", e); + + public XMLEventWriter getDelegateEventWriter() { + Assert.state(delegateEventWriter != null, "Delegate event writer has not been initialized"); + return delegateEventWriter; } - return position; - } + @SuppressWarnings("DataFlowIssue") + public Result createStaxResult() { + return StaxUtils.createStaxResult(eventWriter); + } - /** - * Set the file channel position. - * @param newPosition new file channel position - */ - @SuppressWarnings("DataFlowIssue") - private void setPosition(long newPosition) { - try { - channel.truncate(newPosition); - channel.position(newPosition); + @SuppressWarnings("DataFlowIssue") + public void flush() { + try { + eventWriter.flush(); + if (forceSync) { + channel.force(false); + } + } + catch (XMLStreamException | IOException e) { + throw new WriteFailedException("Failed to flush the events", e); + } + } + + @SuppressWarnings("DataFlowIssue") + public long position() { + long position; + + try { + eventWriter.flush(); + position = channel.position(); + if (bufferedWriter instanceof TransactionAwareBufferedWriter transactionAwareBufferedWriter) { + position += transactionAwareBufferedWriter.getBufferSize(); + } + } + catch (Exception e) { + throw new ItemStreamException("Unable to write to file resource: [" + resource + "]", e); + } + + return position; + } + + @SuppressWarnings("DataFlowIssue") + public void close(@Nullable StaxWriterCallback footerCallback, List unclosedHeaderCallbackElements) { + + XMLEventFactory factory = createXmlEventFactory(); + try { + delegateEventWriter.add(factory.createCharacters("")); + } + catch (XMLStreamException e) { + log.error(e); + } + + try { + if (footerCallback != null) { + XMLEventWriter footerCallbackWriter = delegateEventWriter; + if (restarted && !unclosedHeaderCallbackElements.isEmpty()) { + footerCallbackWriter = new UnopenedElementClosingEventWriter(delegateEventWriter, + bufferedWriter, unclosedHeaderCallbackElements); + } + footerCallback.write(footerCallbackWriter); + } + delegateEventWriter.flush(); + endDocument(); + } + catch (IOException e) { + throw new ItemStreamException("Failed to write footer items", e); + } + catch (XMLStreamException e) { + throw new ItemStreamException("Failed to write end document tag", e); + } + finally { + + try { + delegateEventWriter.close(); + } + catch (XMLStreamException e) { + log.error("Unable to close file resource: [" + resource + "] " + e); + } + finally { + try { + bufferedWriter.close(); + } + catch (IOException e) { + log.error("Unable to close file resource: [" + resource + "] " + e); + } + finally { + if (!transactional) { + closeStream(); + } + } + } + } + + initialized = false; + } + + @SuppressWarnings("DataFlowIssue") + private void closeStream() { + try { + channel.close(); + } + catch (IOException ioe) { + log.error("Unable to close file resource: [" + resource + "] " + ioe); + } + finally { + try { + if (os != null) { + os.close(); + } + } + catch (IOException ioe) { + log.error("Unable to close file resource: [" + resource + "] " + ioe); + } + } } - catch (IOException e) { - throw new ItemStreamException("Unable to write to file resource: [" + resource + "]", e); + + /** + * Writes the EndDocument tag manually. + * @throws XMLStreamException thrown if error occurs. + */ + @SuppressWarnings("DataFlowIssue") + private void endDocument() throws XMLStreamException { + + // writer.writeEndDocument(); <- this doesn't work after restart + // we need to write end tag of the root element manually + + String nsPrefix = !StringUtils.hasText(getRootTagNamespacePrefix()) ? "" + : getRootTagNamespacePrefix() + ":"; + try { + bufferedWriter.write(""); + } + catch (IOException ioe) { + throw new XMLStreamException("Unable to close file resource: [" + resource + "]", ioe); + } } + + /** + * Set the file channel position. + * @param newPosition new file channel position + */ + @SuppressWarnings("DataFlowIssue") + private void setPosition(long newPosition) { + try { + channel.truncate(newPosition); + channel.position(newPosition); + } + catch (IOException e) { + throw new ItemStreamException("Unable to write to file resource: [" + resource + "]", e); + } + } + } } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/xml/TransactionalStaxEventItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/xml/TransactionalStaxEventItemWriterTests.java index a2582e7805..5f2fb56065 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/xml/TransactionalStaxEventItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/xml/TransactionalStaxEventItemWriterTests.java @@ -18,6 +18,10 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.List; import javax.xml.stream.XMLEventFactory; import javax.xml.stream.XMLStreamException; import javax.xml.transform.Result; @@ -39,7 +43,9 @@ import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -235,4 +241,89 @@ private StaxEventItemWriter createItemWriter() throws Exception { return source; } + @Test + void shouldWriteThreeSeparateFilesWhenMultipleOpenCloseAndResourceSwitchInSingleTransaction() throws Exception { + WritableResource r1 = new FileSystemResource(File.createTempFile("stax-tx-rot-1", ".xml")); + WritableResource r2 = new FileSystemResource(File.createTempFile("stax-tx-rot-2", ".xml")); + WritableResource r3 = new FileSystemResource(File.createTempFile("stax-tx-rot-3", ".xml")); + + assertDoesNotThrow( + () -> new TransactionTemplate(transactionManager).execute((TransactionCallback) status -> { + try { + writer.setResource(r1); + writer.open(new ExecutionContext()); + writer.write(items); + writer.close(); + + writer.setResource(r2); + writer.open(new ExecutionContext()); + writer.write(items); + writer.close(); + + writer.setResource(r3); + writer.open(new ExecutionContext()); + writer.write(items); + writer.close(); + + return null; + } + catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + @Test + void shouldCloseAllFileChannelsAfterTransaction() throws Exception { + WritableResource r1 = new FileSystemResource(File.createTempFile("stax-tx-leak-1", ".xml")); + WritableResource r2 = new FileSystemResource(File.createTempFile("stax-tx-leak-2", ".xml")); + WritableResource r3 = new FileSystemResource(File.createTempFile("stax-tx-leak-3", ".xml")); + + List opened = new ArrayList<>(); + + try { + new TransactionTemplate(transactionManager).execute((TransactionCallback) status -> { + try { + writer.setResource(r1); + writer.open(new ExecutionContext()); + opened.add(extractChannelFromStaxWriter(writer)); + writer.write(items); + writer.close(); + + writer.setResource(r2); + writer.open(new ExecutionContext()); + opened.add(extractChannelFromStaxWriter(writer)); + writer.write(items); + writer.close(); + + writer.setResource(r3); + writer.open(new ExecutionContext()); + opened.add(extractChannelFromStaxWriter(writer)); + writer.write(items); + writer.close(); + } + catch (Exception ignored) { + } + return null; + }); + } + catch (Exception ignored) { + } + + assertEquals(3, opened.size(), "Expected 3 opened channels"); + for (FileChannel ch : opened) { + assertFalse(ch.isOpen(), "FileChannel should be closed after transaction"); + } + } + + private static FileChannel extractChannelFromStaxWriter(StaxEventItemWriter w) throws Exception { + Field stateField = StaxEventItemWriter.class.getDeclaredField("state"); + stateField.setAccessible(true); + Object state = stateField.get(w); + + Field channelField = state.getClass().getDeclaredField("channel"); + channelField.setAccessible(true); + return (FileChannel) channelField.get(state); + } + } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/xml/builder/StaxEventItemWriterBuilderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/xml/builder/StaxEventItemWriterBuilderTests.java index e2c0a76181..d364d3fa9f 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/xml/builder/StaxEventItemWriterBuilderTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/xml/builder/StaxEventItemWriterBuilderTests.java @@ -128,7 +128,8 @@ void testTransactional() { staxEventItemWriter.open(executionContext); - Object writer = ReflectionTestUtils.getField(staxEventItemWriter, "bufferedWriter"); + Object state = ReflectionTestUtils.getField(staxEventItemWriter, "state"); + Object writer = ReflectionTestUtils.getField(state, "bufferedWriter"); assertTrue(writer instanceof TransactionAwareBufferedWriter);