查找的逻辑比较简单,给定一个键 x ,查找其是否在 B+ 树中存在。实现逻辑是先找到键可能在的叶节点,然后扫描一遍叶节点的内容确定是否存在。重点则是在找叶节点:
从根节点开始,每次在内部节点中查找 x 所在的指针即value,实际是一个page_id,直到叶节点
循环时找内部节点中第一个比 x 大的键,取其左侧的值即可(k[0]无效),而这样不能探测到 x 比所有 k 都大的情况,所以要将 next_page_id 初始化为最右侧的键,即next_page_id = internal_page->ValueAt(internal_page->GetSize() - 1);
INDEX_TEMPLATE_ARGUMENTS auto BPLUSTREE_TYPE::FindLeaf(const KeyType &key) -> Page *{ page_id_t next_page_id = root_page_id_; while (true) { Page* page = buffer_pool_manager_->FetchPage(next_page_id); auto next_page = reinterpret_cast<BPlusTreePage *>(page->GetData()); if (next_page->IsLeafPage()) { return page; } auto internal_page = static_cast<InternalPage *>(next_page);
// init next_page_id as the rightmost pointer next_page_id = internal_page->ValueAt(internal_page->GetSize() - 1); // if fount a greater key , set next_page_id as the pointer at its left for (int i = 1; i < internal_page->GetSize() ; i++) { // ignore first key if (comparator_(internal_page->KeyAt(i), key) > 0) { next_page_id = internal_page->ValueAt(i-1); break; } } buffer_pool_manager_->UnpinPage(internal_page->GetPageId(),false); } }
INDEX_TEMPLATE_ARGUMENTS auto BPLUSTREE_TYPE::Insert(const KeyType &key, const ValueType &value, Transaction *transaction) -> bool { if (IsEmpty()) { Page *page = buffer_pool_manager_->NewPage(&root_page_id_); UpdateRootPageId(1); // 1 means inserting a new root page auto leaf_page = reinterpret_cast<LeafPage *>(page->GetData()); leaf_page->Init(root_page_id_,INVALID_PAGE_ID,leaf_max_size_); leaf_page->SetKeyValueAt(0,key,value); leaf_page->IncreaseSize(1); leaf_page->SetNextPageId(INVALID_PAGE_ID); buffer_pool_manager_->UnpinPage(root_page_id_,true); return true; }
Page *page = FindLeaf(key); auto leaf_page = reinterpret_cast<LeafPage *>(page->GetData()); for (int i = 0; i < leaf_page->GetSize() ; i++) { if (comparator_(leaf_page->KeyAt(i), key) == 0) { buffer_pool_manager_->UnpinPage(leaf_page->GetPageId(),false); return false; } }
leaf_page->Insert(key,value,comparator_); if (leaf_page->GetSize() < leaf_max_size_) { buffer_pool_manager_->UnpinPage(leaf_page->GetPageId(),false); return true; }
//leaf page is full , split it page_id_t new_page_id; Page *new_page = buffer_pool_manager_->NewPage(&new_page_id); auto new_leaf_page = reinterpret_cast<LeafPage *>(new_page->GetData()); new_leaf_page->Init(new_page_id,leaf_page->GetParentPageId(),leaf_max_size_); new_leaf_page->SetNextPageId(leaf_page->GetNextPageId()); leaf_page->SetNextPageId(new_page_id); //move half data to new leaf page leaf_page->MoveDataTo(new_leaf_page,(leaf_max_size_ + 1)/2);
// recursively insert while (true) { // if the old page is root , create a new parent internal page as root if (old_tree_page->IsRootPage()) { Page *new_page = buffer_pool_manager_->NewPage(&root_page_id_); auto new_root_page = reinterpret_cast<InternalPage *>(new_page->GetData()); new_root_page->Init(root_page_id_,INVALID_PAGE_ID,internal_max_size_); // split_key next is not used , just a placeholder new_root_page->SetKeyValueAt(0,split_key,old_tree_page->GetPageId()); new_root_page->SetKeyValueAt(1,split_key,new_tree_page->GetPageId()); new_root_page->IncreaseSize(2); old_tree_page->SetParentPageId(root_page_id_); new_tree_page->SetParentPageId(root_page_id_); UpdateRootPageId(); buffer_pool_manager_->UnpinPage(root_page_id_,true); break; }