提交 2b95dc15 编写于 作者: I Igor Canadi

Revert "Fix bad merge of D16791 and D16767"

This reverts commit 839c8ecf.
上级 5ba028c1
...@@ -4543,43 +4543,16 @@ TEST(DBTest, TransactionLogIterator) { ...@@ -4543,43 +4543,16 @@ TEST(DBTest, TransactionLogIterator) {
{ {
auto iter = OpenTransactionLogIter(0); auto iter = OpenTransactionLogIter(0);
ExpectRecords(3, iter); ExpectRecords(3, iter);
assert(!iter->IsObsolete()); }
iter->Next(); Reopen(&options);
assert(!iter->Valid()); env_->SleepForMicroseconds(2 * 1000 * 1000);{
assert(!iter->IsObsolete());
assert(iter->status().ok());
Reopen(&options);
env_->SleepForMicroseconds(2 * 1000 * 1000);
Put("key4", DummyString(1024)); Put("key4", DummyString(1024));
Put("key5", DummyString(1024)); Put("key5", DummyString(1024));
Put("key6", DummyString(1024)); Put("key6", DummyString(1024));
iter->Next();
assert(!iter->Valid());
assert(iter->IsObsolete());
assert(iter->status().ok());
} }
{ {
auto iter = OpenTransactionLogIter(0); auto iter = OpenTransactionLogIter(0);
ExpectRecords(6, iter); ExpectRecords(6, iter);
assert(!iter->IsObsolete());
iter->Next();
assert(!iter->Valid());
assert(!iter->IsObsolete());
assert(iter->status().ok());
Put("key7", DummyString(1024));
iter->Next();
assert(iter->Valid());
assert(iter->status().ok());
dbfull()->Flush(FlushOptions());
Put("key8", DummyString(1024));
iter->Next();
assert(!iter->Valid());
assert(iter->IsObsolete());
assert(iter->status().ok());
} }
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
......
...@@ -21,7 +21,6 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( ...@@ -21,7 +21,6 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
files_(std::move(files)), files_(std::move(files)),
started_(false), started_(false),
isValid_(false), isValid_(false),
is_obsolete_(false),
currentFileIndex_(0), currentFileIndex_(0),
currentBatchSeq_(0), currentBatchSeq_(0),
currentLastSeq_(0), currentLastSeq_(0),
...@@ -70,15 +69,14 @@ bool TransactionLogIteratorImpl::Valid() { ...@@ -70,15 +69,14 @@ bool TransactionLogIteratorImpl::Valid() {
return started_ && isValid_; return started_ && isValid_;
} }
bool TransactionLogIteratorImpl::RestrictedRead(Slice* record, bool TransactionLogIteratorImpl::RestrictedRead(
std::string* scratch) { Slice* record,
bool ret = currentLogReader_->ReadRecord(record, scratch); std::string* scratch) {
// Don't read if no more complete entries to read from logs
if (!reporter_.last_status.ok()) { if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) {
currentStatus_ = reporter_.last_status; return false;
} }
return currentLogReader_->ReadRecord(record, scratch);
return ret;
} }
void TransactionLogIteratorImpl::SeekToStartSequence( void TransactionLogIteratorImpl::SeekToStartSequence(
...@@ -88,7 +86,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence( ...@@ -88,7 +86,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
Slice record; Slice record;
started_ = false; started_ = false;
isValid_ = false; isValid_ = false;
is_obsolete_ = false;
if (files_->size() <= startFileIndex) { if (files_->size() <= startFileIndex) {
return; return;
} }
...@@ -97,18 +94,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence( ...@@ -97,18 +94,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
currentStatus_ = s; currentStatus_ = s;
return; return;
} }
auto latest_seq_num = dbimpl_->GetLatestSequenceNumber();
if (startingSequenceNumber_ > latest_seq_num) {
if (strict) {
currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
"seek to required sequence number");
reporter_.Info(currentStatus_.ToString().c_str());
} else {
// isValid_ is false;
return;
}
}
while (RestrictedRead(&record, &scratch)) { while (RestrictedRead(&record, &scratch)) {
if (record.size() < 12) { if (record.size() < 12) {
reporter_.Corruption( reporter_.Corruption(
...@@ -138,11 +123,11 @@ void TransactionLogIteratorImpl::SeekToStartSequence( ...@@ -138,11 +123,11 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
// only file. Otherwise log the error and let the iterator return next entry // only file. Otherwise log the error and let the iterator return next entry
// If strict is set, we want to seek exactly till the start sequence and it // If strict is set, we want to seek exactly till the start sequence and it
// should have been present in the file we scanned above // should have been present in the file we scanned above
if (strict || files_->size() == 1) { if (strict) {
currentStatus_ = Status::Corruption("Gap in sequence number. Could not " currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
"seek to required sequence number"); "seek to required sequence number");
reporter_.Info(currentStatus_.ToString().c_str()); reporter_.Info(currentStatus_.ToString().c_str());
} else { } else if (files_->size() != 1) {
currentStatus_ = Status::Corruption("Start sequence was not found, " currentStatus_ = Status::Corruption("Start sequence was not found, "
"skipping to the next available"); "skipping to the next available");
reporter_.Info(currentStatus_.ToString().c_str()); reporter_.Info(currentStatus_.ToString().c_str());
...@@ -164,30 +149,11 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { ...@@ -164,30 +149,11 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
// Runs every time until we can seek to the start sequence // Runs every time until we can seek to the start sequence
return SeekToStartSequence(); return SeekToStartSequence();
} }
while(true) {
is_obsolete_ = false;
auto latest_seq_num = dbimpl_->GetLatestSequenceNumber();
if (currentLastSeq_ >= latest_seq_num) {
isValid_ = false;
return;
}
bool first = true;
while (currentFileIndex_ < files_->size()) {
if (!first) {
Status status =OpenLogReader(files_->at(currentFileIndex_).get());
if (!status.ok()) {
isValid_ = false;
currentStatus_ = status;
return;
}
}
first = false;
assert(currentLogReader_); assert(currentLogReader_);
if (currentLogReader_->IsEOF()) { if (currentLogReader_->IsEOF()) {
currentLogReader_->UnmarkEOF(); currentLogReader_->UnmarkEOF();
} }
while (RestrictedRead(&record, &scratch)) { while (RestrictedRead(&record, &scratch)) {
if (record.size() < 12) { if (record.size() < 12) {
reporter_.Corruption( reporter_.Corruption(
...@@ -205,14 +171,26 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { ...@@ -205,14 +171,26 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
return; return;
} }
} }
// Open the next file // Open the next file
++currentFileIndex_; if (currentFileIndex_ < files_->size() - 1) {
++currentFileIndex_;
Status status =OpenLogReader(files_->at(currentFileIndex_).get());
if (!status.ok()) {
isValid_ = false;
currentStatus_ = status;
return;
}
} else {
isValid_ = false;
if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) {
currentStatus_ = Status::OK();
} else {
currentStatus_ = Status::Corruption("NO MORE DATA LEFT");
}
return;
}
} }
// Read all the files but cannot find next record expected.
// TODO(sdong): support to auto fetch new log files from DB and continue.
isValid_ = false;
is_obsolete_ = true;
} }
bool TransactionLogIteratorImpl::IsBatchExpected( bool TransactionLogIteratorImpl::IsBatchExpected(
......
...@@ -19,9 +19,7 @@ namespace rocksdb { ...@@ -19,9 +19,7 @@ namespace rocksdb {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Env* env; Env* env;
Logger* info_log; Logger* info_log;
Status last_status;
virtual void Corruption(size_t bytes, const Status& s) { virtual void Corruption(size_t bytes, const Status& s) {
last_status = s;
Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str()); Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
} }
virtual void Info(const char* s) { virtual void Info(const char* s) {
...@@ -76,8 +74,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { ...@@ -76,8 +74,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
virtual bool Valid(); virtual bool Valid();
virtual bool IsObsolete() override { return is_obsolete_; }
virtual void Next(); virtual void Next();
virtual Status status(); virtual Status status();
...@@ -93,7 +89,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { ...@@ -93,7 +89,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
std::unique_ptr<VectorLogPtr> files_; std::unique_ptr<VectorLogPtr> files_;
bool started_; bool started_;
bool isValid_; // not valid when it starts of. bool isValid_; // not valid when it starts of.
bool is_obsolete_;
Status currentStatus_; Status currentStatus_;
size_t currentFileIndex_; size_t currentFileIndex_;
std::unique_ptr<WriteBatch> currentBatch_; std::unique_ptr<WriteBatch> currentBatch_;
......
...@@ -73,12 +73,6 @@ class TransactionLogIterator { ...@@ -73,12 +73,6 @@ class TransactionLogIterator {
// Can read data from a valid iterator. // Can read data from a valid iterator.
virtual bool Valid() = 0; virtual bool Valid() = 0;
// IsObsolete() returns true if new log files were created. This usually
// means that the user needs to close the current iterator and create a new
// one to get the newest updates. It should happen only when mem tables are
// flushed.
virtual bool IsObsolete() = 0;
// Moves the iterator to the next WriteBatch. // Moves the iterator to the next WriteBatch.
// REQUIRES: Valid() to be true. // REQUIRES: Valid() to be true.
virtual void Next() = 0; virtual void Next() = 0;
......
...@@ -67,21 +67,8 @@ static void ReplicationThreadBody(void* arg) { ...@@ -67,21 +67,8 @@ static void ReplicationThreadBody(void* arg) {
} }
} }
fprintf(stderr, "Refreshing iterator\n"); fprintf(stderr, "Refreshing iterator\n");
for (; !iter->IsObsolete(); iter->Next()) { for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
if (!iter->Valid()) {
if (t->stop.Acquire_Load() == nullptr) {
return;
}
// need to wait for new rows.
continue;
}
BatchResult res = iter->GetBatch(); BatchResult res = iter->GetBatch();
if (!iter->status().ok()) {
fprintf(stderr, "Corruption reported when reading seq no. b/w %ld",
static_cast<uint64_t>(currentSeqNum));
exit(1);
}
if (res.sequence != currentSeqNum) { if (res.sequence != currentSeqNum) {
fprintf(stderr, fprintf(stderr,
"Missed a seq no. b/w %ld and %ld\n", "Missed a seq no. b/w %ld and %ld\n",
...@@ -89,8 +76,6 @@ static void ReplicationThreadBody(void* arg) { ...@@ -89,8 +76,6 @@ static void ReplicationThreadBody(void* arg) {
(long)res.sequence); (long)res.sequence);
exit(1); exit(1);
} }
t->no_read++;
currentSeqNum++;
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册