Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions lib/pmem/ARTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ uint64_t ARTree::GetLeafCount() {

void ARTree::Put(const char *key, // copy value from std::string
char *value) {
// printKey(key);
persistent_ptr<ValueWrapper> valPrstPtr =
tree->findValueInNode(tree->treeRoot->rootNode, key, false);
if (valPrstPtr == nullptr)
Expand All @@ -272,27 +271,50 @@ void ARTree::Put(const char *key, int32_t keyBytes, const char *value,
* TODO: decrement value std::atomic<int> refCounter in parent and free it if

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo can be removed? Also the one in line 378?

* counter==0 When freeing - call decrementParent recurenty.
* */
void ARTree::decrementParent(persistent_ptr<Node> node) {}
void ARTree::decrementParent(persistent_ptr<Node256> node, const char *key) {
size_t keyCalc = key[tree->treeRoot->keySize - node->depth - 1];
std::bitset<8> x(keyCalc);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like x is not used for anything

node->children[keyCalc] = nullptr;
node->refCounter--;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see refCounter is atomic, shouldn't we initialize it when loading an existing tree?

if (node->refCounter == 0) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be an compare and swap instruction? Otherwise we can have a situation in which two threads decrement refCounter in parallel before checking for zero in next line. In this situation, none will decrement the parent

if (node->depth == 0) {
return;
}

decrementParent(node->parent, key);
if (node->parent && node->parent->refCounter == 0) {
persistent_ptr<Node256> nodeParent = node->parent;
pmemobj_free(nodeParent->children[0].raw_ptr());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodeParent is declared above, but not initialized. How can this work? Do I miss something?

}
}
}

void ARTree::removeFromParent(persistent_ptr<ValueWrapper> valPrstPtr) {
// @TODO: cleaning parent node not implemented
decrementParent(valPrstPtr->parent);
void ARTree::removeFromTree(persistent_ptr<NodeLeafCompressed> compressed,
const char *key) {
persistent_ptr<Node256> current = compressed->parent;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name "current" is confusing, maybe this variable should be called "parent"?

decrementParent(current, key);
if (current->refCounter == 0) {
pmemobj_free(current->children[0].raw_ptr());
}
}

void ARTree::Remove(const char *key) {
persistent_ptr<ValueWrapper> valPrstPtr =
tree->findValueInNode(tree->treeRoot->rootNode, key, false);

if (valPrstPtr == nullptr || (valPrstPtr->location == EMPTY))
throw OperationFailedException(Status(KEY_NOT_FOUND));

try {
removeFromParent(valPrstPtr);
removeFromTree(valPrstPtr->parent, key);
if (_isLocationReservedNotPublished(valPrstPtr)) {
valPrstPtr->location = EMPTY;
pmemobj_cancel(tree->_pm_pool.get_handle(), valPrstPtr->actionValue,
1);
delete valPrstPtr->actionValue;
// TODO: transaction should be used
// removal of Node Compressed not needed - it is one bigger node
// Instead decrement counter in parent and free child when 0
// remove ValueWrapper
pmemobj_free(valPrstPtr.raw_ptr());
} else if (valPrstPtr->location == DISK) {
// TODO: jradtke need to confirm if no extra action required here

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that comment should be updated, for sure extra actions are needed, right?

Expand Down Expand Up @@ -371,7 +393,6 @@ void TreeImpl::allocateFullLevels(persistent_ptr<Node> node,
_pm_pool.get_handle(), &(actionsArray[actionsCounter]),
NODE_SIZE[node->type] * sizeof(NodeLeafCompressed), VALUE);
#endif

if (nodeLeafCompressed_new == nullptr) {
DAQ_CRITICAL("reserve nodeLeafCompressed failed actionsCounter=" +
std::to_string(actionsCounter) + " with " +
Expand Down Expand Up @@ -438,8 +459,7 @@ TreeImpl::findValueInNode(persistent_ptr<Node> current, const char *_key,
if (valPrstPtr == nullptr) {
DAQ_CRITICAL("reserve ValueWrapper failed with " +
std::string(strerror(errno)));
throw OperationFailedException(
Status(PMEM_ALLOCATION_ERROR));
throw OperationFailedException(Status(PMEM_ALLOCATION_ERROR));
}
// std::cout << "valueWrapper off="
// << (*nodeLeafCompressed->child.raw_ptr()).off
Expand All @@ -454,14 +474,13 @@ TreeImpl::findValueInNode(persistent_ptr<Node> current, const char *_key,
* parallel on the same key. This is still not thread-safe.
*/
valPrstPtr->location = EMPTY;
valPrstPtr->parent = nodeLeafCompressed;
nodeLeafCompressed->child = valPrstPtr;
}
DAQ_DEBUG("findValueInNode: Found");
return nodeLeafCompressed->child;
}
keyCalc = key[treeRoot->keySize - current->depth - 1];
std::bitset<8> x(keyCalc);
DAQ_DEBUG("findValueInNode: keyCalc=" + x.to_string());
if (current->type == TYPE256) { // TYPE256
node256 = current;
if (!allocate && node256->children[keyCalc]) {
Expand Down Expand Up @@ -496,14 +515,15 @@ TreeImpl::findValueInNode(persistent_ptr<Node> current, const char *_key,
return nullptr;
}

/*void ARTree::printKey(const char *key) {
void ARTree::printKey(const char *key) {
std::mutex localMutex;
std::lock_guard<std::mutex> lock(localMutex);
std::cout << "printing key" << std::endl;
for (int i = 0; i < KEY_SIZE; i++) {
std::cout << std::bitset<8>(key[KEY_SIZE - i - 1]) << std::endl;
for (int i = 0; i < tree->treeRoot->keySize; i++) {
std::cout << std::bitset<8>(key[tree->treeRoot->keySize - i - 1])
<< std::endl;
}
}*/
}

/*
* Allocate value in ARTree for given key.
Expand Down
10 changes: 6 additions & 4 deletions lib/pmem/ARTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct locationWrapper {
int value;
};
class Node;

class NodeLeafCompressed;
struct ValueWrapper {
explicit ValueWrapper()
: actionValue(nullptr), actionUpdate(nullptr), location(EMPTY) {}
Expand All @@ -95,7 +95,8 @@ struct ValueWrapper {
v<locationWrapper> locationVolatile;
struct pobj_action *actionValue;
struct pobj_action *actionUpdate;
persistent_ptr<Node> parent; // pointer to parent, needed for removal
persistent_ptr<NodeLeafCompressed>
parent; // pointer to parent, needed for removal
};

class Node {
Expand Down Expand Up @@ -176,8 +177,9 @@ class ARTree : public DaqDB::RTreeEngine {
void AllocateIOVForKey(const char *key, uint64_t **ptr, size_t size) final;
void UpdateValueWrapper(const char *key, uint64_t *ptr, size_t size) final;
void printKey(const char *key);
void decrementParent(persistent_ptr<Node> node);
void removeFromParent(persistent_ptr<ValueWrapper> valPrstPtr);
void decrementParent(persistent_ptr<Node256> node, const char *key);
void removeFromTree(persistent_ptr<NodeLeafCompressed> current,
const char *key);

private:
inline bool
Expand Down