reader_routine.cpp 2.6 KB
Newer Older
W
init  
wangyunlai.wyl 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase Migration Service LogProxy is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#include <unistd.h>
#include "common/log.h"
#include "common/common.h"
#include "common/config.h"
#include "common/counter.h"
#include "oblogreader/oblogreader.h"
#include "oblogreader/reader_routine.h"

namespace oceanbase {
namespace logproxy {

static Config& _s_config = Config::instance();

F
Fankux 已提交
26 27
ReaderRoutine::ReaderRoutine(ObLogReader& reader, OblogAccess& oblog, BlockingQueue<ILogRecord*>& q)
    : Thread("ReaderRoutine"), _reader(reader), _oblog(oblog), _queue(q)
W
init  
wangyunlai.wyl 已提交
28 29 30 31
{}

int ReaderRoutine::init(const OblogConfig& config)
{
32 33 34
  std::map<std::string, std::string> configs;
  config.generate_configs(configs);
  return _oblog.init(configs, config.start_timestamp.val());
W
init  
wangyunlai.wyl 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48
}

void ReaderRoutine::stop()
{
  if (is_run()) {
    Thread::stop();
    _oblog.stop();
    _queue.clear([this](ILogRecord* record) { _oblog.release(record); });
  }
}

void ReaderRoutine::run()
{
  if (_oblog.start() != OMS_OK) {
F
Fankux 已提交
49
    OMS_ERROR << "Failed to start ReaderRoutine";
W
init  
wangyunlai.wyl 已提交
50 51 52
    return;
  }

F
Fankux 已提交
53 54 55
  Counter& counter = Counter::instance();
  Timer stage_tm;

W
init  
wangyunlai.wyl 已提交
56
  while (is_run()) {
F
Fankux 已提交
57
    stage_tm.reset();
W
init  
wangyunlai.wyl 已提交
58 59
    ILogRecord* record = nullptr;
    int ret = _oblog.fetch(record, _s_config.read_timeout_us.val());
F
Fankux 已提交
60 61
    int64_t fetch_us = stage_tm.elapsed();

W
init  
wangyunlai.wyl 已提交
62 63 64 65 66 67 68 69 70 71
    if (ret == OB_TIMEOUT && record == nullptr) {
      OMS_INFO << "fetch liboblog timeout, nothing incoming...";
      continue;
    }
    if (ret != OB_SUCCESS || record == nullptr) {
      OMS_WARN << "fetch liboblog " << (ret == OB_SUCCESS ? "nullptr" : "failed") << ", ignore...";
      ::usleep(_s_config.read_fail_interval_us.val());
      continue;
    }

F
Fankux 已提交
72
    stage_tm.reset();
W
init  
wangyunlai.wyl 已提交
73 74 75
    while (!_queue.offer(record, _s_config.read_timeout_us.val())) {
      OMS_WARN << "reader transfer queue full(" << _queue.size(false) << "), retry...";
    }
F
Fankux 已提交
76
    int64_t offer_us = stage_tm.elapsed();
W
init  
wangyunlai.wyl 已提交
77

F
Fankux 已提交
78 79 80 81
    counter.count_key(Counter::READER_FETCH_US, fetch_us);
    counter.count_key(Counter::READER_OFFER_US, offer_us);
    counter.count_read_io(record->getRealSize());
    counter.count_read(1);
W
init  
wangyunlai.wyl 已提交
82 83
  }

F
Fankux 已提交
84
  _reader.stop();
W
init  
wangyunlai.wyl 已提交
85 86 87 88
}

}  // namespace logproxy
}  // namespace oceanbase