From ecd36866574770f840d2561b397ae9c861999015 Mon Sep 17 00:00:00 2001 From: "pratik.pandey1" Date: Thu, 18 Dec 2025 13:42:36 +0100 Subject: [PATCH 1/3] fix: handle uncompressed transaction payloads beyond 2GB --- .../event/TransactionPayloadEventData.java | 12 +- ...ansactionPayloadEventDataDeserializer.java | 53 +++--- .../TransactionPayloadIntegrationTest.java | 179 ++++++++++++++++++ 3 files changed, 216 insertions(+), 28 deletions(-) create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/TransactionPayloadIntegrationTest.java diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java index f5014cc8..573b7712 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java @@ -4,8 +4,8 @@ public class TransactionPayloadEventData implements EventData { - private int payloadSize; - private int uncompressedSize; + private long payloadSize; + private long uncompressedSize; private int compressionType; private byte[] payload; private ArrayList uncompressedEvents; @@ -18,19 +18,19 @@ public void setUncompressedEvents(ArrayList uncompressedEvents) { this.uncompressedEvents = uncompressedEvents; } - public int getPayloadSize() { + public long getPayloadSize() { return payloadSize; } - public void setPayloadSize(int payloadSize) { + public void setPayloadSize(long payloadSize) { this.payloadSize = payloadSize; } - public int getUncompressedSize() { + public long getUncompressedSize() { return uncompressedSize; } - public void setUncompressedSize(int uncompressedSize) { + public void setUncompressedSize(long uncompressedSize) { this.uncompressedSize = uncompressedSize; } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java index a8e84876..4c70fc8b 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java @@ -15,18 +15,18 @@ */ package com.github.shyiko.mysql.binlog.event.deserialization; -import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdInputStream; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; /** * @author Somesh Malviya * @author Debjeet Sarkar + * @author Pratik Pandey */ public class TransactionPayloadEventDataDeserializer implements EventDataDeserializer { public static final int OTW_PAYLOAD_HEADER_END_MARK = 0; @@ -49,14 +49,15 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) if (fieldType == OTW_PAYLOAD_HEADER_END_MARK) { break; } - // Read the size of the field + // Read the size of the field (use readPackedLong to support large field sizes) if (inputStream.available() >= 1) { - fieldLen = inputStream.readPackedInteger(); + long fieldLenLong = inputStream.readPackedLong(); + fieldLen = (int) fieldLenLong; } switch (fieldType) { case OTW_PAYLOAD_SIZE_FIELD: // Fetch the payload size - eventData.setPayloadSize(inputStream.readPackedInteger()); + eventData.setPayloadSize(inputStream.readPackedLong()); break; case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD: // Fetch the compression type @@ -64,7 +65,7 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) break; case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD: // Fetch the uncompressed size - eventData.setUncompressedSize(inputStream.readPackedInteger()); + eventData.setUncompressedSize(inputStream.readPackedLong()); break; default: // Ignore unrecognized field @@ -76,27 +77,35 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) // Default the uncompressed to the payload size eventData.setUncompressedSize(eventData.getPayloadSize()); } - // set the payload to the rest of the input buffer - eventData.setPayload(inputStream.read(eventData.getPayloadSize())); + // Validate compressed payload size doesn't exceed Java array limit + // The compressed payload must fit in a byte array, but the uncompressed size can exceed 2GB + // since we use streaming decompression + long payloadSize = eventData.getPayloadSize(); + // Read the compressed payload into memory + eventData.setPayload(inputStream.read((int) payloadSize)); - // Decompress the payload - byte[] src = eventData.getPayload(); - byte[] dst = ByteBuffer.allocate(eventData.getUncompressedSize()).array(); - Zstd.decompressByteArray(dst, 0, dst.length, src, 0, src.length); + // Use streaming decompression to handle uncompressed sizes up to 4GB + // This avoids hitting Java's 2GB array limit by processing events as they're decompressed + ArrayList decompressedEvents = getDecompressedEvents(eventData); - // Read and store events from decompressed byte array into input stream + eventData.setUncompressedEvents(decompressedEvents); + + return eventData; + } + + private static ArrayList getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException { ArrayList decompressedEvents = new ArrayList<>(); EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer(); - ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst); - Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); - while(internalEvent != null) { - decompressedEvents.add(internalEvent); - internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); - } + try (ZstdInputStream zstdInputStream = new ZstdInputStream(new java.io.ByteArrayInputStream(eventData.getPayload()))) { + ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(zstdInputStream); - eventData.setUncompressedEvents(decompressedEvents); - - return eventData; + Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); + while(internalEvent != null) { + decompressedEvents.add(internalEvent); + internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); + } + } + return decompressedEvents; } } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/TransactionPayloadIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/TransactionPayloadIntegrationTest.java new file mode 100644 index 00000000..a4afd19a --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/TransactionPayloadIntegrationTest.java @@ -0,0 +1,179 @@ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import org.testng.SkipException; +import org.testng.annotations.Test; + +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import static org.testng.Assert.*; + +/** + * @author Pratik Pandey + * Integration test for TRANSACTION_PAYLOAD event deserialization. + * Requires MySQL 8.0.20+ with binlog_transaction_compression enabled. + * Run Using: MYSQL_VERSION=8.0 mvn test -Dtest=TransactionPayloadIntegrationTest + */ +public class TransactionPayloadIntegrationTest extends AbstractIntegrationTest { + + @Override + protected MysqlOnetimeServerOptions getOptions() { + MysqlOnetimeServerOptions options = super.getOptions(); + // Enable transaction compression (requires MySQL 8.0.20+) + options.extraParams = "--binlog-transaction-compression=ON --binlog-transaction-compression-level-zstd=3"; + return options; + } + + @Test + public void testVeryLargeTransactionNear2GB() throws Exception { + // This test simulates a transaction with uncompressed size approaching 2GB + // to validate streaming decompression handles the upper limits correctly + + // Expected behavior: + // - Uncompressed size can exceed 2GB (uses streaming decompression) + // - Compressed size must stay under 2GB (Java array limit) + + if (!mysqlVersion.atLeast(8, 0)) { + throw new SkipException("Transaction compression requires MySQL 8.0.20+"); + } + + CapturingEventListener capturingEventListener = new CapturingEventListener(); + client.registerEventListener(capturingEventListener); + client.unregisterEventListener(eventListener); + client.registerEventListener(eventListener); + + try { + // Create table with large BLOB column to generate big transactions + master.execute(new BinaryLogClientIntegrationTest.Callback() { + public void execute(Statement statement) throws SQLException { + statement.execute("drop table if exists very_large_txn_test"); + // LONGTEXT can store up to 4GB, perfect for our test + statement.execute("create table very_large_txn_test (" + + "id int primary key, " + + "data1 LONGTEXT, " + + "data2 LONGTEXT, " + + "data3 LONGTEXT)"); + } + }); + eventListener.waitForAtLeast(EventType.QUERY, 2, BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT); + eventListener.reset(); + + // Generate large repeating data that compresses well + // We want uncompressed to be ~2-3GB but compressed to stay under 2GB + final StringBuilder largeChunk = new StringBuilder(); + final String pattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz!@#$%^&*()_+REPEATING_PATTERN_"; + + // Create a 1MB chunk of repeating data (compresses very well) + int targetChunkSize = 1024 * 1024; // 1MB target + int iterations = targetChunkSize / pattern.length(); // Calculate exact iterations needed + for (int i = 0; i < iterations; i++) { + largeChunk.append(pattern); + } + final String chunk = largeChunk.toString(); + + System.out.println("Starting very large transaction test..."); + System.out.println("Chunk size: " + chunk.length() + " bytes"); + + long startTime = System.currentTimeMillis(); + + // Insert rows with large data + // With 3 columns of 1MB each per row, and good compression: + // - ~700 rows = ~2GB uncompressed + // - Should compress to ~200-500MB depending on compression ratio + final int numRows = 800; + System.out.println("Inserting " + numRows + " rows with ~3MB each..."); + + master.execute(new BinaryLogClientIntegrationTest.Callback() { + public void execute(Statement statement) throws SQLException { + statement.execute("BEGIN"); + for (int i = 0; i < numRows; i++) { + // Use prepared statement to handle large data efficiently + String sql = String.format( + "insert into very_large_txn_test values(%d, '%s', '%s', '%s')", + i, chunk, chunk, chunk + ); + statement.execute(sql); + + if (i % 50 == 0 && i > 0) { + System.out.println(" Inserted " + i + " rows (" + + ((long) i * 3 * chunk.length() / (1024 * 1024)) + " MB uncompressed)..."); + } + } + System.out.println("Committing transaction..."); + statement.execute("COMMIT"); + } + }); + + long insertTime = System.currentTimeMillis() - startTime; + System.out.println("Transaction committed in " + (insertTime / 1000) + " seconds"); + + // Wait for transaction payload event (give it more time for large transactions) + long largeTimeout = BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT * 1000; // 30 seconds + eventListener.waitFor(EventType.TRANSACTION_PAYLOAD, 1, largeTimeout); + + // Verify the large payload was handled correctly + List payloadEvents = + capturingEventListener.getEvents(TransactionPayloadEventData.class); + + assertTrue(payloadEvents.size() > 0, "Should have captured TRANSACTION_PAYLOAD event"); + + TransactionPayloadEventData payloadEventData = payloadEvents.get(0); + assertNotNull(payloadEventData, "TRANSACTION_PAYLOAD event data should not be null"); + + long uncompressedSize = payloadEventData.getUncompressedSize(); + long compressedSize = payloadEventData.getPayloadSize(); + + // Validate sizes + assertTrue(compressedSize > 0, "Compressed size should be > 0"); + assertTrue(compressedSize < Integer.MAX_VALUE, + "Compressed size must be < 2GB (Java array limit): " + compressedSize); + assertTrue(uncompressedSize > 1024L * 1024 * 1024, + "Uncompressed size should be > 1GB: " + (uncompressedSize / (1024 * 1024)) + " MB"); + + // Verify compression ratio + double compressionRatio = (double) uncompressedSize / compressedSize; + assertTrue(compressionRatio > 2.0, + "Should have good compression ratio (>2x) for repetitive data, got: " + + String.format("%.2fx", compressionRatio)); + + // Verify all events were decompressed successfully via streaming + List uncompressedEvents = payloadEventData.getUncompressedEvents(); + assertNotNull(uncompressedEvents, "Should have uncompressed events"); + assertFalse(uncompressedEvents.isEmpty(), "Should have decompressed events successfully"); + + // Count WriteRowsEventData events + int writeRowsCount = 0; + for (Event event : uncompressedEvents) { + if (event.getData() instanceof WriteRowsEventData) { + writeRowsCount++; + } + } + assertTrue(writeRowsCount > 0, "Should have WriteRowsEventData in the payload"); + + long totalTime = System.currentTimeMillis() - startTime; + + System.out.println("\n=== Very Large Transaction Test Results ==="); + System.out.printf("Rows inserted: %d%n", numRows); + System.out.printf("Estimated uncompressed data: ~%d MB%n", + ((long) numRows * 3 * chunk.length()) / (1024 * 1024)); + System.out.printf("Actual uncompressed size: %d MB%n", + uncompressedSize / (1024 * 1024)); + System.out.printf("Compressed size: %.2f MB (%.1f%% of limit)%n", + compressedSize / (1024.0 * 1024.0), + (compressedSize * 100.0 / Integer.MAX_VALUE)); + System.out.printf("Compression ratio: %.2fx%n", compressionRatio); + System.out.printf("Events decompressed: %d%n", uncompressedEvents.size()); + System.out.printf("Write events: %d%n", writeRowsCount); + System.out.printf("Total time: %.1f seconds%n", totalTime / 1000.0); + System.out.println("===========================================\n"); + + } finally { + client.unregisterEventListener(capturingEventListener); + } + } +} From b709b9fc65609d4fec157edaa0bc854db5fdd3fe Mon Sep 17 00:00:00 2001 From: "pratik.pandey1" Date: Fri, 19 Dec 2025 12:15:39 +0100 Subject: [PATCH 2/3] fix: review comments --- .../TransactionPayloadEventDataDeserializer.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java index 4c70fc8b..618b5054 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java @@ -51,13 +51,12 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) } // Read the size of the field (use readPackedLong to support large field sizes) if (inputStream.available() >= 1) { - long fieldLenLong = inputStream.readPackedLong(); - fieldLen = (int) fieldLenLong; + fieldLen = inputStream.readPackedInt(); } switch (fieldType) { case OTW_PAYLOAD_SIZE_FIELD: // Fetch the payload size - eventData.setPayloadSize(inputStream.readPackedLong()); + eventData.setPayloadSize(inputStream.readPackedInteger()); break; case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD: // Fetch the compression type @@ -77,12 +76,8 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) // Default the uncompressed to the payload size eventData.setUncompressedSize(eventData.getPayloadSize()); } - // Validate compressed payload size doesn't exceed Java array limit - // The compressed payload must fit in a byte array, but the uncompressed size can exceed 2GB - // since we use streaming decompression - long payloadSize = eventData.getPayloadSize(); - // Read the compressed payload into memory - eventData.setPayload(inputStream.read((int) payloadSize)); + + eventData.setPayload(inputStream.read(eventData.getPayloadSize())); // Use streaming decompression to handle uncompressed sizes up to 4GB // This avoids hitting Java's 2GB array limit by processing events as they're decompressed From 370abccae6c1efd53573fa5e8bf9a80c9af09027 Mon Sep 17 00:00:00 2001 From: "pratik.pandey1" Date: Mon, 22 Dec 2025 11:26:23 +0100 Subject: [PATCH 3/3] fix: review comments --- .../mysql/binlog/event/TransactionPayloadEventData.java | 6 +++--- .../TransactionPayloadEventDataDeserializer.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java index 573b7712..dd0911a6 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/TransactionPayloadEventData.java @@ -4,7 +4,7 @@ public class TransactionPayloadEventData implements EventData { - private long payloadSize; + private int payloadSize; private long uncompressedSize; private int compressionType; private byte[] payload; @@ -18,11 +18,11 @@ public void setUncompressedEvents(ArrayList uncompressedEvents) { this.uncompressedEvents = uncompressedEvents; } - public long getPayloadSize() { + public int getPayloadSize() { return payloadSize; } - public void setPayloadSize(long payloadSize) { + public void setPayloadSize(int payloadSize) { this.payloadSize = payloadSize; } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java index 618b5054..5bbbd47f 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java @@ -51,7 +51,7 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) } // Read the size of the field (use readPackedLong to support large field sizes) if (inputStream.available() >= 1) { - fieldLen = inputStream.readPackedInt(); + fieldLen = inputStream.readPackedInteger(); } switch (fieldType) { case OTW_PAYLOAD_SIZE_FIELD: