Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,13 @@ public ContainerCommandResponseProto readBlock(
}
try {
readBlockImpl(request, blockFile, kvContainer, streamObserver, false);
// TODO metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen());
final ReadBlockRequestProto readBlock = request.getReadBlock();
final BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID());
final BlockData blockData = getBlockManager().getBlock(kvContainer, blockID);
final long bytesRead = readBlock.hasLength() && readBlock.getLength() > 0
? readBlock.getLength()
: blockData.getSize();
metrics.incContainerBytesStats(Type.ReadBlock, bytesRead);
} catch (StorageContainerException ex) {
responseProto = ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.verifyAllDataChecksumsMatch;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData;
import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.apache.ozone.test.MetricsAsserts.assertCounter;
import static org.apache.ozone.test.MetricsAsserts.getMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -53,6 +56,7 @@
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
Expand All @@ -68,22 +72,30 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.utils.io.RandomAccessFileChannel;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
Expand All @@ -94,6 +106,7 @@
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
Expand All @@ -106,6 +119,7 @@
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -938,4 +952,134 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException {

return kvHandler;
}

private static class HandlerWithVolumeSet {
private final KeyValueHandler handler;
private final MutableVolumeSet volumeSet;
private final ContainerSet containerSet;

HandlerWithVolumeSet(KeyValueHandler handler, MutableVolumeSet volumeSet, ContainerSet containerSet) {
this.handler = handler;
this.volumeSet = volumeSet;
this.containerSet = containerSet;
}

KeyValueHandler getHandler() {
return handler;
}

MutableVolumeSet getVolumeSet() {
return volumeSet;
}

ContainerSet getContainerSet() {
return containerSet;
}
}

private HandlerWithVolumeSet createKeyValueHandlerWithVolumeSet(Path path) throws IOException {
ContainerMetrics.remove();
final ContainerSet containerSet = newContainerSet();
final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);

HddsVolume hddsVolume = new HddsVolume.Builder(path.toString()).conf(conf)
.clusterID(CLUSTER_ID).datanodeUuid(DATANODE_UUID)
.volumeSet(volumeSet)
.build();
hddsVolume.format(CLUSTER_ID);
hddsVolume.createWorkingDir(CLUSTER_ID, null);
hddsVolume.createTmpDirs(CLUSTER_ID);
when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(hddsVolume));

final KeyValueHandler kvHandler = ContainerTestUtils.getKeyValueHandler(conf,
DATANODE_UUID, containerSet, volumeSet);
kvHandler.setClusterID(CLUSTER_ID);
hddsVolume.getVolumeInfoStats().unregister();
hddsVolume.getVolumeIOStats().unregister();

ContainerController controller = new ContainerController(containerSet,
Collections.singletonMap(ContainerType.KeyValueContainer, kvHandler));
OnDemandContainerScanner onDemandScanner = new OnDemandContainerScanner(
conf.getObject(ContainerScannerConfiguration.class), controller);
containerSet.registerOnDemandScanner(onDemandScanner);

return new HandlerWithVolumeSet(kvHandler, volumeSet, containerSet);
}

@Test
public void testReadBlockMetrics() throws Exception {
Path testDir = Files.createTempDirectory("testReadBlockMetrics");
try {
conf.set(OZONE_SCM_CONTAINER_LAYOUT_KEY, ContainerLayoutVersion.FILE_PER_BLOCK.name());
HandlerWithVolumeSet handlerWithVolume = createKeyValueHandlerWithVolumeSet(testDir);
KeyValueHandler kvHandler = handlerWithVolume.getHandler();
MutableVolumeSet volumeSet = handlerWithVolume.getVolumeSet();
ContainerSet containerSet = handlerWithVolume.getContainerSet();

long containerID = ContainerTestHelper.getTestContainerID();
KeyValueContainerData containerData = new KeyValueContainerData(
containerID, ContainerLayoutVersion.FILE_PER_BLOCK,
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
DATANODE_UUID);
KeyValueContainer container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), CLUSTER_ID);
containerSet.addContainer(container);

BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
BlockData blockData = new BlockData(blockID);
ChunkInfo chunkInfo = new ChunkInfo("chunk1", 0, 1024);
blockData.addChunk(chunkInfo.getProtoBufMessage());
kvHandler.getBlockManager().putBlock(container, blockData);

ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(1024));
kvHandler.getChunkManager().writeChunk(container, blockID, chunkInfo, data,
DispatcherContext.getHandleWriteChunk());

ContainerCommandRequestProto readBlockRequest =
ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.ReadBlock)
.setContainerID(containerID)
.setDatanodeUuid(DATANODE_UUID)
.setReadBlock(ContainerProtos.ReadBlockRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setOffset(0)
.setLength(1024)
.build())
.build();

final AtomicInteger responseCount = new AtomicInteger(0);

StreamObserver<ContainerCommandResponseProto> streamObserver =
new StreamObserver<ContainerCommandResponseProto>() {
@Override
public void onNext(ContainerCommandResponseProto response) {
assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
responseCount.incrementAndGet();
}

@Override
public void onError(Throwable t) {
fail("ReadBlock failed", t);
}

@Override
public void onCompleted() {
}
};

RandomAccessFileChannel blockFile = new RandomAccessFileChannel();
ContainerCommandResponseProto response = kvHandler.readBlock(
readBlockRequest, container, blockFile, streamObserver);

assertNull(response, "ReadBlock should return null on success");
assertTrue(responseCount.get() > 0, "Should receive at least one response");

MetricsRecordBuilder containerMetrics = getMetrics(
ContainerMetrics.STORAGE_CONTAINER_METRICS);
assertCounter("bytesReadBlock", 1024L, containerMetrics);
} finally {
FileUtils.deleteDirectory(testDir.toFile());
ContainerMetrics.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ static void runTestClientServer(
assertCounter("numReadChunk", 1L, containerMetrics);
assertCounter("bytesWriteChunk", 1024L, containerMetrics);
assertCounter("bytesReadChunk", 1024L, containerMetrics);
// bytesReadBlock is tested in TestKeyValueHandler.testReadBlockMetrics

String sec = DFS_METRICS_PERCENTILES_INTERVALS + "s";
Thread.sleep((DFS_METRICS_PERCENTILES_INTERVALS + 1) * 1000);
Expand Down