当前位置:网站首页>Leveldb source code analysis -- writing data

Leveldb source code analysis -- writing data

2022-06-24 17:22:00 Xiao Lin Gang

principle

Think about it LSM The process of data writing :

  • Write to disk WAL Log files ;
  • Update in memory MemTable data ;

Write data call

//  Write data external interface 
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

Batch write

WriteBatch Encapsulates an atomization operation for batch data modification ;

It mainly completes the serial splicing of data , The format after splicing is as follows :

count | record | record... (kTypeValue+Key+Value;kKeyDeletion+Key)

//  Encode the string , Save splice to rep_ variable 
void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

Writing data

because WAL Log files and MemTable The memory structure is a global shared resource , When multiple threads write data at the same time , A mutex lock is required to ensure the isolation of the operation . Considering writing WAL Write operations involving disks , Takes a long time , This will affect the concurrency of data writing .

leveldb In response to this problem , A batch write optimization is done :

Split the data writing operation into two stages , To shorten the lock waiting time ; In the preparation stage , After the lock is obtained during writing , Add the changed data to the queue to be written ; Then check whether you are at the head of the queue to be written , If not , Then release the lock , Enter the waiting ; If you check that you are in the head of the write queue , Then get the lock again , And read data from the queue to be written as much as possible , Write to WAL Log file .

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  //  If the data is not written , And the current writer Not at the head of the queue to be written , Is waiting for 
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  //  If the data is written , The exit ( Other threads have helped complete data writing )
  if (w.done) {
    return w.status;
  }

  //  Data flow limiting mechanism 
  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(updates == nullptr);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    //  Get as much data from the queue to be written as possible , Spliced into writing WriteBatch structure 
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

    {
      //  Release the lock , Let other threads read data , Or put the data into the queue to be written 
      mutex_.Unlock();
      //  Write data in bulk 
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
      }
      //  Write data to MemTable in 
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
    }
    if (write_batch == tmp_batch_) tmp_batch_->Clear();

    //  Update the global sequence
    versions_->SetLastSequence(last_sequence);
  }

  //  This completed write is excluded from the write queue , Update it to write complete status , And wake up the waiting thread 
  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  //  For example, there is still data in the queue , Wake up other threads to continue writing data 
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}
原网站

版权声明
本文为[Xiao Lin Gang]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/03/20210322004249884G.html