im.cc 2.5 KB
Newer Older
S
superjom 已提交
1
#include <glog/logging.h>
S
superjom 已提交
2
#include <ctime>
S
superjom 已提交
3

S
superjom 已提交
4
#include "visualdl/logic/im.h"
S
superjom 已提交
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

namespace visualdl {

/*
 * @num_samples: number of instances to sample
 * @size: counter of the records.
 * @returns: id of the instance to replace, if drop this instance, return -1.
 */
int ReserviorSample(int num_samples, int num_records) {
  if (num_records <= num_samples) {
    return num_records;
  }

  std::srand(std::time(0));
  float prob = static_cast<float>(std::rand()) / RAND_MAX;
  float receive_prob = static_cast<float>(num_samples) / num_records;
  if (prob < receive_prob) {
    int offset2replace = std::rand() % num_samples;
    return offset2replace;
  }
  return -1;
}

S
superjom 已提交
28 29
IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
  switch (type) {
S
superjom 已提交
30
    case StorageBase::Type::kMemory: {
S
superjom 已提交
31
      storage_.reset(new MemoryStorage);
S
superjom 已提交
32
    } break;
S
superjom 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    default:
      CHECK(false) << "Unsupported storage kind " << type;
  }

  switch (mode) {
    case StorageBase::Mode::kRead:
      dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
      break;
    case StorageBase::Mode::kWrite:
      dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
      break;
    default:
      break;
  }
}

S
superjom 已提交
49
void IM::SetPersistDest(const std::string &path) {
S
superjom 已提交
50
  CHECK(storage_->mutable_data()->dir().empty())
S
superjom 已提交
51
      << "duplicate set storage's path";
S
superjom 已提交
52
  storage_->mutable_data()->set_dir(path);
S
superjom 已提交
53 54
}

S
superjom 已提交
55
storage::Tablet *IM::AddTablet(const std::string &tag, int num_samples) {
S
superjom 已提交
56
  auto tablet = storage_->NewTablet(tag, num_samples);
S
superjom 已提交
57 58 59
  return tablet;
}

S
superjom 已提交
60
void IM::AddRecord(const std::string &tag, const storage::Record &data) {
S
superjom 已提交
61 62
  auto *tablet = storage_->tablet(tag);
  CHECK(tablet) << "no tablet called " << tag;
S
superjom 已提交
63

S
superjom 已提交
64
  auto num_records = tablet->total_records();
S
superjom 已提交
65
  const auto num_samples = tablet->num_samples();
S
superjom 已提交
66 67 68 69 70

  int offset;
  // use reservoir sampling or not
  if (num_samples > 0) {
    offset = ReserviorSample(num_samples, num_records + 1);
S
superjom 已提交
71
    if (offset < 0) return;
S
superjom 已提交
72 73 74
  } else {
    offset = num_records;
  }
S
superjom 已提交
75 76 77

  storage::Record *record;
  if (offset >= num_records) {
S
superjom 已提交
78
    record = tablet->add_records();
S
superjom 已提交
79
  } else {
S
superjom 已提交
80
    record = tablet->mutable_records(offset);
S
superjom 已提交
81 82
  }

S
superjom 已提交
83
  *record = data;
S
superjom 已提交
84
  tablet->set_total_records(num_records + 1);
S
superjom 已提交
85 86
}

S
superjom 已提交
87
void IM::Clear() {
S
superjom 已提交
88
  auto *data = storage().mutable_data();
S
superjom 已提交
89 90 91 92 93
  data->clear_tablets();
  data->clear_dir();
  data->clear_timestamp();
}

S
superjom 已提交
94
void IM::PersistToDisk() {
S
superjom 已提交
95
  CHECK(!storage_->data().dir().empty()) << "path of storage should be set";
S
superjom 已提交
96
  // TODO make dir first
S
superjom 已提交
97
  // MakeDir(storage_.data().dir());
S
superjom 已提交
98
  storage_->PersistToDisk();
S
superjom 已提交
99 100
}

S
superjom 已提交
101
}  // namespace visualdl