Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* A string pool like {@link String#intern()}, but more flexible as we can create multiple instances
* and use them in difference places, where {@link String#intern()} is global.
* <p>
* We use {@link WeakReference} so when there are no actual reference to the String, it will be GCed
* to reduce memory pressure.
* <p>
* The difference between {@link WeakObjectPool} is that, we also need to use {@link WeakReference}
* as key, not only value, because the key(a String) is exactly what we want to deduplicate.
*/
@InterfaceAudience.Private
public class FastStringPool {

private static final class WeakKey extends WeakReference<String> {

private final int hash;

WeakKey(String referent, ReferenceQueue<String> queue) {
super(Preconditions.checkNotNull(referent), queue);
// must calculate it here, as later the reference may be GCed
this.hash = referent.hashCode();
}

@Override
public int hashCode() {
return hash;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof WeakKey)) {
return false;
}

String a = this.get();
String b = ((WeakKey) obj).get();
// In ConcurrentHashMap, we will always compare references(like entry.key == key) before
// calling actual equals method, so this will not cause problems for clean up. And in normal
// intern path, the reference will never be null, so there is no problem too.
if (a == null || b == null) {
return false;
}
return a.equals(b);
}
}

private final ConcurrentHashMap<WeakKey, WeakReference<String>> map = new ConcurrentHashMap<>();

private final ReferenceQueue<String> refQueue = new ReferenceQueue<>();

private final Lock cleanupLock = new ReentrantLock();

// only call cleanup every 256 times
private static final int CLEANUP_MASK = 0xFF;
private final AtomicInteger counter = new AtomicInteger();

public String intern(String s) {
Preconditions.checkNotNull(s);
maybeCleanup();

WeakKey lookupKey = new WeakKey(s, null);
WeakReference<String> ref = map.get(lookupKey);
if (ref != null) {
String v = ref.get();
if (v != null) {
return v;
}
}

WeakKey storeKey = new WeakKey(s, refQueue);
WeakReference<String> storeVal = new WeakReference<>(s);
// Used to store the return value. The return value of compute method is a WeakReference, the
// value of the WeakReference may be GCed just before we get the it for returning.
MutableObject<String> ret = new MutableObject<>();

map.compute(storeKey, (k, prevVal) -> {
if (prevVal == null) {
ret.setValue(s);
return storeVal;
} else {
String prevRef = prevVal.get();
if (prevRef != null) {
ret.setValue(prevRef);
return prevVal;
} else {
ret.setValue(s);
return storeVal;
}
}
});
assert ret.get() != null;
return ret.get();
}

private void cleanup() {
if (!cleanupLock.tryLock()) {
// a cleanup task is ongoing, give up
return;
}
try {
for (;;) {
WeakKey k = (WeakKey) refQueue.poll();
if (k == null) {
return;
}
map.remove(k);
}
} finally {
cleanupLock.unlock();
}
}

private void maybeCleanup() {
if ((counter.incrementAndGet() & CLEANUP_MASK) != 0) {
return;
}
cleanup();
}

public int size() {
// size method is not on critical path, so always call cleanup here to reduce memory pressure
cleanup();
return map.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(SmallTests.TAG)
public class TestFastStringPool {

private static final Logger LOG = LoggerFactory.getLogger(TestFastStringPool.class);

@Test
public void testMultiThread() throws InterruptedException {
FastStringPool pool = new FastStringPool();
List<String> list1 = new ArrayList<>();
List<String> list2 = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list1.add("list1-" + i);
list2.add("list2-" + i);
}
Map<String, String> interned1 = new HashMap<>();
Map<String, String> interned2 = new HashMap<>();
AtomicBoolean failed = new AtomicBoolean(false);
int numThreads = 10;
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread(() -> {
ThreadLocalRandom rand = ThreadLocalRandom.current();
for (int j = 0; j < 1000000; j++) {
List<String> list;
Map<String, String> interned;
if (rand.nextBoolean()) {
list = list1;
interned = interned1;
} else {
list = list2;
interned = interned2;
}
// create a new reference
String k = new String(list.get(rand.nextInt(list.size())));
String v = pool.intern(k);
synchronized (interned) {
String prev = interned.get(k);
if (prev != null) {
// should always return the same reference
if (prev != v) {
failed.set(true);
String msg = "reference not equal, intern failed on string " + k;
LOG.error(msg);
throw new AssertionError(msg);
}
} else {
interned.put(k, v);
}
}
}
}));
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
LOG.info("interned1 size {}, interned2 size {}, pool size {}", interned1.size(),
interned2.size(), pool.size());
assertEquals(interned1.size() + interned2.size(), pool.size());
interned1.clear();
list1.clear();
LOG.info("clear interned1");
// wait for at most 30 times
for (int i = 0; i < 30; i++) {
// invoke gc manually
LOG.info("trigger GC");
System.gc();
Thread.sleep(1000);
// should have cleaned up all the references for list1
if (interned2.size() == pool.size()) {
return;
}
}
fail("should only have list2 strings in pool, expected pool size " + interned2.size()
+ ", but got " + pool.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.FilePathStringPool;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FastStringPool;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -29,17 +29,22 @@
*/
@InterfaceAudience.Private
public class BlockCacheKey implements HeapSize, java.io.Serializable {
private static final long serialVersionUID = -5199992013113130535L; // Changed due to format
// change
// Changed due to format change
private static final long serialVersionUID = -5199992013113130535L;

private static final FastStringPool HFILE_NAME_POOL = new FastStringPool();

private static final FastStringPool REGION_NAME_POOL = new FastStringPool();

private static final FastStringPool CF_NAME_POOL = new FastStringPool();

// New compressed format using integer file ID (when codec is available)
private final int hfileNameId;

private transient final FilePathStringPool stringPool;
private final String hfileName;

private final int regionId;
private final String regionName;

private final int cfId;
private final String cfName;

private final long offset;

Expand Down Expand Up @@ -91,11 +96,10 @@ public BlockCacheKey(String hfileName, String cfName, String regionName, long of
this.isPrimaryReplicaBlock = isPrimaryReplica;
this.offset = offset;
this.blockType = blockType;
this.stringPool = FilePathStringPool.getInstance();
// Use string pool for file, region and cf values
this.hfileNameId = stringPool.encode(hfileName);
this.regionId = (regionName != null) ? stringPool.encode(regionName) : -1;
this.cfId = (cfName != null) ? stringPool.encode(cfName) : -1;
this.hfileName = HFILE_NAME_POOL.intern(hfileName);
this.regionName = (regionName != null) ? REGION_NAME_POOL.intern(regionName) : null;
this.cfName = (cfName != null) ? CF_NAME_POOL.intern(cfName) : null;
this.archived = archived;
}

Expand All @@ -116,7 +120,7 @@ public BlockCacheKey(Path hfilePath, long offset, boolean isPrimaryReplica, Bloc

@Override
public int hashCode() {
return hfileNameId * 127 + (int) (offset ^ (offset >>> 32));
return hfileName.hashCode() * 127 + (int) (offset ^ (offset >>> 32));
}

@Override
Expand Down Expand Up @@ -152,23 +156,23 @@ public long heapSize() {
* @return The file name
*/
public String getHfileName() {
return stringPool.decode(hfileNameId);
return hfileName;
}

/**
* Returns the region name portion of this cache key.
* @return The region name
*/
public String getRegionName() {
return stringPool.decode(regionId);
return regionName;
}

/**
* Returns the column family name portion of this cache key.
* @return The column family name
*/
public String getCfName() {
return stringPool.decode(cfId);
return cfName;
}

public boolean isPrimary() {
Expand Down
Loading