diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 04ad8287ec2f..5884811fa9bb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -2076,7 +2076,13 @@ public ContainerCommandResponseProto readBlock( } try { readBlockImpl(request, blockFile, kvContainer, streamObserver, false); - // TODO metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen()); + final ReadBlockRequestProto readBlock = request.getReadBlock(); + final BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID()); + final BlockData blockData = getBlockManager().getBlock(kvContainer, blockID); + final long bytesRead = readBlock.hasLength() && readBlock.getLength() > 0 + ? readBlock.getLength() + : blockData.getSize(); + metrics.incContainerBytesStats(Type.ReadBlock, bytesRead); } catch (StorageContainerException ex) { responseProto = ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ioe) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 6afee1c5d77f..f427575e8188 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.verifyAllDataChecksumsMatch; import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData; import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; +import static org.apache.ozone.test.MetricsAsserts.assertCounter; +import static org.apache.ozone.test.MetricsAsserts.getMetrics; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -37,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -53,6 +56,7 @@ import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; @@ -68,11 +72,13 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; @@ -80,10 +86,16 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.token.TokenVerifier; +import org.apache.hadoop.hdds.utils.io.RandomAccessFileChannel; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; @@ -94,6 +106,7 @@ import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; @@ -106,6 +119,7 @@ import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.GenericTestUtils.LogCapturer; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -938,4 +952,134 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException { return kvHandler; } + + private static class HandlerWithVolumeSet { + private final KeyValueHandler handler; + private final MutableVolumeSet volumeSet; + private final ContainerSet containerSet; + + HandlerWithVolumeSet(KeyValueHandler handler, MutableVolumeSet volumeSet, ContainerSet containerSet) { + this.handler = handler; + this.volumeSet = volumeSet; + this.containerSet = containerSet; + } + + KeyValueHandler getHandler() { + return handler; + } + + MutableVolumeSet getVolumeSet() { + return volumeSet; + } + + ContainerSet getContainerSet() { + return containerSet; + } + } + + private HandlerWithVolumeSet createKeyValueHandlerWithVolumeSet(Path path) throws IOException { + ContainerMetrics.remove(); + final ContainerSet containerSet = newContainerSet(); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(path.toString()).conf(conf) + .clusterID(CLUSTER_ID).datanodeUuid(DATANODE_UUID) + .volumeSet(volumeSet) + .build(); + hddsVolume.format(CLUSTER_ID); + hddsVolume.createWorkingDir(CLUSTER_ID, null); + hddsVolume.createTmpDirs(CLUSTER_ID); + when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(hddsVolume)); + + final KeyValueHandler kvHandler = ContainerTestUtils.getKeyValueHandler(conf, + DATANODE_UUID, containerSet, volumeSet); + kvHandler.setClusterID(CLUSTER_ID); + hddsVolume.getVolumeInfoStats().unregister(); + hddsVolume.getVolumeIOStats().unregister(); + + ContainerController controller = new ContainerController(containerSet, + Collections.singletonMap(ContainerType.KeyValueContainer, kvHandler)); + OnDemandContainerScanner onDemandScanner = new OnDemandContainerScanner( + conf.getObject(ContainerScannerConfiguration.class), controller); + containerSet.registerOnDemandScanner(onDemandScanner); + + return new HandlerWithVolumeSet(kvHandler, volumeSet, containerSet); + } + + @Test + public void testReadBlockMetrics() throws Exception { + Path testDir = Files.createTempDirectory("testReadBlockMetrics"); + try { + conf.set(OZONE_SCM_CONTAINER_LAYOUT_KEY, ContainerLayoutVersion.FILE_PER_BLOCK.name()); + HandlerWithVolumeSet handlerWithVolume = createKeyValueHandlerWithVolumeSet(testDir); + KeyValueHandler kvHandler = handlerWithVolume.getHandler(); + MutableVolumeSet volumeSet = handlerWithVolume.getVolumeSet(); + ContainerSet containerSet = handlerWithVolume.getContainerSet(); + + long containerID = ContainerTestHelper.getTestContainerID(); + KeyValueContainerData containerData = new KeyValueContainerData( + containerID, ContainerLayoutVersion.FILE_PER_BLOCK, + (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), + DATANODE_UUID); + KeyValueContainer container = new KeyValueContainer(containerData, conf); + container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), CLUSTER_ID); + containerSet.addContainer(container); + + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + BlockData blockData = new BlockData(blockID); + ChunkInfo chunkInfo = new ChunkInfo("chunk1", 0, 1024); + blockData.addChunk(chunkInfo.getProtoBufMessage()); + kvHandler.getBlockManager().putBlock(container, blockData); + + ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(1024)); + kvHandler.getChunkManager().writeChunk(container, blockID, chunkInfo, data, + DispatcherContext.getHandleWriteChunk()); + + ContainerCommandRequestProto readBlockRequest = + ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadBlock) + .setContainerID(containerID) + .setDatanodeUuid(DATANODE_UUID) + .setReadBlock(ContainerProtos.ReadBlockRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setOffset(0) + .setLength(1024) + .build()) + .build(); + + final AtomicInteger responseCount = new AtomicInteger(0); + + StreamObserver streamObserver = + new StreamObserver() { + @Override + public void onNext(ContainerCommandResponseProto response) { + assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + responseCount.incrementAndGet(); + } + + @Override + public void onError(Throwable t) { + fail("ReadBlock failed", t); + } + + @Override + public void onCompleted() { + } + }; + + RandomAccessFileChannel blockFile = new RandomAccessFileChannel(); + ContainerCommandResponseProto response = kvHandler.readBlock( + readBlockRequest, container, blockFile, streamObserver); + + assertNull(response, "ReadBlock should return null on success"); + assertTrue(responseCount.get() > 0, "Should receive at least one response"); + + MetricsRecordBuilder containerMetrics = getMetrics( + ContainerMetrics.STORAGE_CONTAINER_METRICS); + assertCounter("bytesReadBlock", 1024L, containerMetrics); + } finally { + FileUtils.deleteDirectory(testDir.toFile()); + ContainerMetrics.remove(); + } + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index 2f64167e02cf..845ee1a3f9e2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -207,6 +207,7 @@ static void runTestClientServer( assertCounter("numReadChunk", 1L, containerMetrics); assertCounter("bytesWriteChunk", 1024L, containerMetrics); assertCounter("bytesReadChunk", 1024L, containerMetrics); + // bytesReadBlock is tested in TestKeyValueHandler.testReadBlockMetrics String sec = DFS_METRICS_PERCENTILES_INTERVALS + "s"; Thread.sleep((DFS_METRICS_PERCENTILES_INTERVALS + 1) * 1000);