diff --git a/db/db_impl.cc b/db/db_impl.cc index 23c2434718e46805db1c06819fe9bf8dcb80f18b..f3e3b4d32107602576430a6dc1ad864a9b37f025 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1117,11 +1117,6 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, return s; } - if (wal_files->empty()) { - return Status::IOError(" NO WAL Files present in the db"); - } - // std::shared_ptr would have been useful here. - s = RetainProbableWalFiles(*wal_files, seq); if (!s.ok()) { return s; @@ -1133,8 +1128,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, seq, std::move(wal_files), &last_flushed_sequence_)); - iter->get()->Next(); - return iter->get()->status(); + return (*iter)->status(); } Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs, diff --git a/db/db_test.cc b/db/db_test.cc index b59de240b29aff3a7fdf1dc78b6f1f92c63a9ce0..340590cdd754974088f73070e33d36b6858fcdb1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3563,7 +3563,8 @@ TEST(DBTest, TransactionLogIteratorJustEmptyFile) { DestroyAndReopen(&options); unique_ptr iter; Status status = dbfull()->GetUpdatesSince(0, &iter); - ASSERT_TRUE(!status.ok()); + // Check that an empty iterator is returned + ASSERT_TRUE(!iter->Valid()); } while (ChangeCompactOptions()); } @@ -3594,7 +3595,7 @@ TEST(DBTest, TransactionLogIteratorBatchOperations) { Reopen(&options); Put("key4", DummyString(1024)); auto iter = OpenTransactionLogIter(3); - ExpectRecords(1, iter); + ExpectRecords(2, iter); } while (ChangeCompactOptions()); } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 5ce89b3d46b4872a60b6ba49d13e252dbb8b3025..39ca38507e797eba5ab989a6ca8a1968dc7f45df 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -20,10 +20,11 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( currentFileIndex_(0), lastFlushedSequence_(lastFlushedSequence) { assert(startingSequenceNumber_ <= *lastFlushedSequence_); - assert(files_.get() != nullptr); + assert(files_ != nullptr); reporter_.env = options_->env; reporter_.info_log = options_->info_log.get(); + SeekToStartSequence(); // Seek till starting sequence } Status TransactionLogIteratorImpl::OpenLogFile( @@ -52,7 +53,7 @@ Status TransactionLogIteratorImpl::OpenLogFile( BatchResult TransactionLogIteratorImpl::GetBatch() { assert(isValid_); // cannot call in a non valid state. BatchResult result; - result.sequence = currentSequence_; + result.sequence = currentBatchSeq_; result.writeBatchPtr = std::move(currentBatch_); return result; } @@ -65,24 +66,21 @@ bool TransactionLogIteratorImpl::Valid() { return started_ && isValid_; } -void TransactionLogIteratorImpl::Next() { - LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get(); - -// First seek to the given seqNo. in the current file. - std::string scratch; - Slice record; - if (!started_) { - started_ = true; // this piece only runs onced. +void TransactionLogIteratorImpl::SeekToStartSequence() { + std::string scratch; + Slice record; isValid_ = false; if (startingSequenceNumber_ > *lastFlushedSequence_) { currentStatus_ = Status::IOError("Looking for a sequence, " - "which is not flushed yet."); + "which is not flushed yet."); + return; + } + if (files_->size() == 0) { return; } - Status s = OpenLogReader(currentLogFile); + Status s = OpenLogReader(files_->at(0).get()); if (!s.ok()) { currentStatus_ = s; - isValid_ = false; return; } while (currentLogReader_->ReadRecord(&record, &scratch)) { @@ -92,23 +90,39 @@ void TransactionLogIteratorImpl::Next() { continue; } UpdateCurrentWriteBatch(record); - if (currentSequence_ >= startingSequenceNumber_) { - assert(currentSequence_ <= *lastFlushedSequence_); + if (currentBatchSeq_ + currentBatchCount_ - 1 >= + startingSequenceNumber_) { + assert(currentBatchSeq_ <= *lastFlushedSequence_); isValid_ = true; - break; + started_ = true; // set started_ as we could seek till starting sequence + return; } else { isValid_ = false; } } - if (isValid_) { - // Done for this iteration - return; + // Could not find start sequence in first file. Normally this must be the + // only file. Otherwise log the error and let the iterator return next entry + if (files_->size() != 1) { + currentStatus_ = Status::Corruption("Start sequence was not found, " + "skipping to the next available"); + reporter_.Corruption(0, currentStatus_); + started_ = true; // Let Next find next available entry + Next(); } +} + +void TransactionLogIteratorImpl::Next() { + // TODO:Next() says that it requires Valid to be true but this is not true + // assert(Valid()); + std::string scratch; + Slice record; + isValid_ = false; + if (!started_) { // Runs every time until we can seek to the start sequence + return SeekToStartSequence(); } - bool openNextFile = true; - while(openNextFile) { + while(true) { assert(currentLogReader_); - if (currentSequence_ < *lastFlushedSequence_) { + if (currentBatchSeq_ < *lastFlushedSequence_) { if (currentLogReader_->IsEOF()) { currentLogReader_->UnmarkEOF(); } @@ -119,30 +133,28 @@ void TransactionLogIteratorImpl::Next() { continue; } else { UpdateCurrentWriteBatch(record); - openNextFile = false; - break; + return; } } } - if (openNextFile) { - if (currentFileIndex_ < files_.get()->size() - 1) { - ++currentFileIndex_; - Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get()); - if (!status.ok()) { - isValid_ = false; - currentStatus_ = status; - return; - } - } else { + // Open the next file + if (currentFileIndex_ < files_->size() - 1) { + ++currentFileIndex_; + Status status =OpenLogReader(files_->at(currentFileIndex_).get()); + if (!status.ok()) { isValid_ = false; - openNextFile = false; - if (currentSequence_ == *lastFlushedSequence_) { - currentStatus_ = Status::OK(); - } else { - currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); - } + currentStatus_ = status; + return; + } + } else { + isValid_ = false; + if (currentBatchSeq_ == *lastFlushedSequence_) { + currentStatus_ = Status::OK(); + } else { + currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); } + return; } } } @@ -150,7 +162,8 @@ void TransactionLogIteratorImpl::Next() { void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { WriteBatch* batch = new WriteBatch(); WriteBatchInternal::SetContents(batch, record); - currentSequence_ = WriteBatchInternal::Sequence(batch); + currentBatchSeq_ = WriteBatchInternal::Sequence(batch); + currentBatchCount_ = WriteBatchInternal::Count(batch); currentBatch_.reset(batch); isValid_ = true; currentStatus_ = Status::OK(); diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 9b4d7c9b58b48cd2b198f2a4e25d3ee7ad6f073d..6269dc30da50a48292194879898d66f695390bc1 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -88,9 +88,11 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { Status OpenLogFile(const LogFile* logFile, unique_ptr* file); LogReporter reporter_; SequenceNumber const * const lastFlushedSequence_; - // represents the sequence number being read currently. - SequenceNumber currentSequence_; + SequenceNumber currentBatchSeq_; // sequence number at start of current batch + uint64_t currentBatchCount_; // count in current batch + + void SeekToStartSequence(); void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file); }; diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index 0a2be79c3153623663ef7b5811778c4a41c8be32..a7553cea8f0992f0ecc7803ea0f59647c6a8bd25 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -67,8 +67,8 @@ class TransactionLogIterator { // REQUIRES: Valid() to be true. virtual void Next() = 0; - // Return's ok if the iterator is valid. - // Return the Error when something has gone wrong. + // Returns ok if the iterator is valid. + // Returns the Error when something has gone wrong. virtual Status status() = 0; // If valid return's the current write_batch and the sequence number of the