From 6d1d8354a108c0f898942dac80a081890eea26c1 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 17 Dec 2025 23:45:01 +0800 Subject: [PATCH] HBASE-29742 Compaction scan returns single cells instead of rows after 10MB --- .../hbase/mob/DefaultMobStoreCompactor.java | 5 +- .../hbase/regionserver/ScannerContext.java | 12 +++++ .../regionserver/compactions/Compactor.java | 49 ++++++++++++------- .../hbase/mob/FaultyMobStoreCompactor.java | 1 + 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index f0beea647611..87248e7bab64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -351,8 +351,8 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel .build(); throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; - long shippedCallSizeLimit = - (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); + long shippedCallSizeLimit = Math.min(compactScannerSizeLimit, + (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize()); ExtendedCell mobCell = null; List committedMobWriterFileNames = new ArrayList<>(); @@ -557,6 +557,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { ((ShipperListener) writer).beforeShipped(); kvs.shipped(); + scannerContext.clearBlockSizeProgress(); bytesWrittenProgressForShippedCall = 0; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index c681e91c615e..e8e7b6788f25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.errorprone.annotations.RestrictedApi; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -266,6 +267,17 @@ void clearProgress() { progress.setFields(0, 0, 0, getBlockSizeProgress()); } + /** + * Clear away the block size progress. Mainly used in compaction, as we will use a single + * ScannerContext across all the compaction lifetime, and we will call Shipper.shipped to clear + * the block reference, so it is safe to clear the block size progress in compaction. + */ + @RestrictedApi(explanation = "Should only be called in Compactor", link = "", + allowedOnPath = ".*/org/apache/hadoop/hbase/.*/*Compactor.java|.*/src/test/.*") + public void clearBlockSizeProgress() { + progress.setBlockSize(0); + } + /** * Note that this is not a typical setter. This setter returns the {@link NextState} that was * passed in so that methods can be invoked against the new state. Furthermore, this pattern diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 069968294b84..22b2a54693c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -445,8 +445,14 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel throughputController.start(compactionName); Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null; - long shippedCallSizeLimit = - (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); + // when hitting the block size limit, i.e, compactScannerSizeLimit, we need to reset the block + // size progress otherwise the scanner will only return 1 cell in the next method because all + // the block scanned are still referenced, so we need to call shipped to release them. + // Usually compactScannerSizeLimit will be much greater than blockSize * fileSize, but anyway, + // let's use Math.min for safety. + // See HBASE-29742 + long shippedCallSizeLimit = Math.min(compactScannerSizeLimit, + (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize()); try { do { // InternalScanner is for CPs so we do not want to leak ExtendedCell to the interface, but @@ -488,25 +494,32 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel } } writer.appendAll(cells); - if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { - if (lastCleanCell != null) { - // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. - // ShipperListener will do a clone of the last cells it refer, so need to set back - // sequence id before ShipperListener.beforeShipped - PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); + if (bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { + if (shipper != null) { + if (lastCleanCell != null) { + // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. + // ShipperListener will do a clone of the last cells it refer, so need to set back + // sequence id before ShipperListener.beforeShipped + PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); + } + // Clone the cells that are in the writer so that they are freed of references, + // if they are holding any. + ((ShipperListener) writer).beforeShipped(); + // The SHARED block references, being read for compaction, will be kept in prevBlocks + // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells + // being returned to client, we will call shipped() which can clear this list. Here by + // we are doing the similar thing. In between the compaction (after every N cells + // written with collective size of 'shippedCallSizeLimit') we will call shipped which + // may clear prevBlocks list. + shipper.shipped(); } - // Clone the cells that are in the writer so that they are freed of references, - // if they are holding any. - ((ShipperListener) writer).beforeShipped(); - // The SHARED block references, being read for compaction, will be kept in prevBlocks - // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells - // being returned to client, we will call shipped() which can clear this list. Here by - // we are doing the similar thing. In between the compaction (after every N cells - // written with collective size of 'shippedCallSizeLimit') we will call shipped which - // may clear prevBlocks list. - shipper.shipped(); + // clear the block progress in ScannerContext, so we can reuse it. In normal rpc call, + // ScannerContext will be dropped after shipping, so we do not need to clear the block + // progress there + scannerContext.clearBlockSizeProgress(); bytesWrittenProgressForShippedCall = 0; } + if (lastCleanCell != null) { // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 4dd5ad1156a5..b08a44c04e78 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -331,6 +331,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { ((ShipperListener) writer).beforeShipped(); kvs.shipped(); + scannerContext.clearBlockSizeProgress(); bytesWrittenProgressForShippedCall = 0; } }