persistent_in_stream_without_local_copy.cpp 3.4 KB
Newer Older
1
#include "oneflow/core/persistence/persistent_in_stream_without_local_copy.h"
W
willzhang4a58 已提交
2
#include "oneflow/core/job/job_desc.h"
W
willzhang4a58 已提交
3
#include "oneflow/core/thread/thread_pool.h"
4 5 6

namespace oneflow {

W
willzhang4a58 已提交
7 8 9 10 11 12 13 14
static ThreadPool g_persistent_in_thread_pool(1);

PersistentInStreamWithoutLocalCopy::~PersistentInStreamWithoutLocalCopy() {
  WaitUntilStandByBufferReadyBytesNotEqualZero();
  delete[] standby_buffer_;
  delete[] buffer_;
}

15
int32_t PersistentInStreamWithoutLocalCopy::ReadLine(std::string* l) {
16
  if (IsEof()) { return -1; }
17
  l->clear();
18 19 20 21 22 23 24 25 26
  while (*cur_buf_begin_ != '\n') {
    if (cur_buf_begin_ == cur_buf_end_) {
      UpdateBuffer();
      if (cur_buf_begin_ == cur_buf_end_) {
        return 0;
      } else {
        continue;
      }
    }
27
    l->push_back(*cur_buf_begin_++);
28 29 30 31 32
  }
  ++cur_buf_begin_;
  return 0;
}

33
int32_t PersistentInStreamWithoutLocalCopy::Read(char* s, size_t n) {
34
  if (IsEof()) { return -1; }
C
chengtbf 已提交
35
  while (n) {
36
    if (cur_buf_begin_ == cur_buf_end_) { UpdateBuffer(); }
C
chengtbf 已提交
37
    CHECK_LT(cur_buf_begin_, cur_buf_end_);
W
willzhang4a58 已提交
38 39
    size_t copy_size = std::min<size_t>(cur_buf_end_ - cur_buf_begin_, n);
    memcpy(s, cur_buf_begin_, copy_size);
C
chengtbf 已提交
40 41 42
    s += copy_size;
    cur_buf_begin_ += copy_size;
    n -= copy_size;
43 44 45 46
  }
  return 0;
}

W
willzhang4a58 已提交
47 48 49
PersistentInStreamWithoutLocalCopy::PersistentInStreamWithoutLocalCopy(fs::FileSystem* fs,
                                                                       const std::string& file_path,
                                                                       uint64_t offset) {
Y
Yi Zhu 已提交
50 51
  fs->NewRandomAccessFile(file_path, &file_);
  file_size_ = fs->GetFileSize(file_path);
W
willzhang4a58 已提交
52 53 54
  CHECK_LT(offset, file_size_);
  standby_buffer_ = new char[Global<JobDesc>::Get()->persistence_buf_byte() + 1];
  standby_buffer_ready_bytes_ = 0;
55
  cur_file_pos_ = offset;
W
willzhang4a58 已提交
56 57 58 59
  file_read_done_ = false;
  buffer_ = new char[Global<JobDesc>::Get()->persistence_buf_byte() + 1];
  cur_buf_begin_ = buffer_;
  cur_buf_end_ = buffer_;
60
  *cur_buf_end_ = '\0';
W
willzhang4a58 已提交
61
  AsyncUpdateStandByBuffer();
62 63
}

64
void PersistentInStreamWithoutLocalCopy::UpdateBuffer() {
65
  CHECK_EQ(cur_buf_begin_, cur_buf_end_);
W
willzhang4a58 已提交
66 67 68 69 70
  WaitUntilStandByBufferReadyBytesNotEqualZero();
  if (standby_buffer_ready_bytes_ == -1) { return; }
  std::swap(standby_buffer_, buffer_);
  cur_buf_begin_ = buffer_;
  cur_buf_end_ = buffer_ + standby_buffer_ready_bytes_;
71
  *cur_buf_end_ = '\0';
W
willzhang4a58 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
  standby_buffer_ready_bytes_ = 0;
  AsyncUpdateStandByBuffer();
}

void PersistentInStreamWithoutLocalCopy::WaitUntilStandByBufferReadyBytesNotEqualZero() {
  std::unique_lock<std::mutex> lck(standby_buffer_ready_mtx_);
  standby_buffer_ready_cond_.wait(lck, [this]() { return standby_buffer_ready_bytes_ != 0; });
}

void PersistentInStreamWithoutLocalCopy::AsyncUpdateStandByBuffer() {
  g_persistent_in_thread_pool.AddWork([this]() {
    uint64_t n =
        std::min(Global<JobDesc>::Get()->persistence_buf_byte(), file_size_ - cur_file_pos_);
    if (n > 0) {
      file_->Read(cur_file_pos_, n, standby_buffer_);
      AddNForCurFilePos(n);
    }
    if (cur_file_pos_ == file_size_) { file_read_done_ = true; }
    std::unique_lock<std::mutex> lck(standby_buffer_ready_mtx_);
    if (n > 0) {
      standby_buffer_ready_bytes_ = n;
    } else {
      standby_buffer_ready_bytes_ = -1;
    }
    standby_buffer_ready_cond_.notify_all();
  });
}

bool PersistentInStreamWithoutLocalCopy::IsEof() const {
  return cur_buf_begin_ == cur_buf_end_ && file_read_done_;
102 103 104
}

}  // namespace oneflow