ob_all_virtual_ls_info.cpp 6.6 KB
Newer Older
W
wangzelin.wzl 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#include "observer/virtual_table/ob_all_virtual_ls_info.h"
#include "storage/tx_storage/ob_ls_service.h"

using namespace oceanbase::common;
using namespace oceanbase::storage;
namespace oceanbase
{
namespace observer
{

ObAllVirtualLSInfo::ObAllVirtualLSInfo()
    : ObVirtualTableScannerIterator(),
      addr_(),
      ls_id_(share::ObLSID::INVALID_LS_ID),
      ls_iter_guard_()
{
}

ObAllVirtualLSInfo::~ObAllVirtualLSInfo()
{
  reset();
}

void ObAllVirtualLSInfo::reset()
{
  // 注意这里跨租户资源必须由ObMultiTenantOperator释放, 因此必须放在最前面调用
  omt::ObMultiTenantOperator::reset();
  addr_.reset();
  ObVirtualTableScannerIterator::reset();
}

void ObAllVirtualLSInfo::release_last_tenant()
{
  ls_id_ = share::ObLSID::INVALID_LS_ID;
  ls_iter_guard_.reset();
}

int ObAllVirtualLSInfo::inner_get_next_row(ObNewRow *&row)
{
  int ret = OB_SUCCESS;
  if (OB_FAIL(execute(row))) {
    SERVER_LOG(WARN, "execute fail", K(ret));
  }
  return ret;
}

bool ObAllVirtualLSInfo::is_need_process(uint64_t tenant_id)
{
  if (is_sys_tenant(effective_tenant_id_) || tenant_id == effective_tenant_id_) {
    return true;
  }
  return false;
}

O
obdev 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
int ObAllVirtualLSInfo::next_ls_info_(ObLSVTInfo &ls_info)
{
  int ret = OB_SUCCESS;
  ObLS *ls = nullptr;
  do {
    if (OB_FAIL(ls_iter_guard_->get_next(ls))) {
      if (OB_ITER_END != ret) {
        SERVER_LOG(WARN, "get_next_ls failed", K(ret));
      }
    } else if (NULL == ls) {
      SERVER_LOG(WARN, "ls shouldn't NULL here", K(ls));
      // try another ls
      ret = OB_EAGAIN;
    } else if (FALSE_IT(ls_id_ = ls->get_ls_id().id())) {
    } else if (OB_FAIL(ls->get_ls_info(ls_info))) {
      SERVER_LOG(WARN, "get ls info failed", K(ret), KPC(ls));
      // try another ls
      ret = OB_EAGAIN;
    }
  } while (OB_EAGAIN == ret);
  return ret;
}

W
wangzelin.wzl 已提交
90 91 92 93 94 95 96 97 98 99
int ObAllVirtualLSInfo::process_curr_tenant(ObNewRow *&row)
{
  int ret = OB_SUCCESS;
  ObLSVTInfo ls_info;
  if (NULL == allocator_) {
    ret = OB_NOT_INIT;
    SERVER_LOG(WARN, "allocator_ shouldn't be NULL", K(allocator_), K(ret));
  } else if (FALSE_IT(start_to_read_ = true)) {
  } else if (ls_iter_guard_.get_ptr() == nullptr && OB_FAIL(MTL(ObLSService*)->get_ls_iter(ls_iter_guard_, ObLSGetMod::OBSERVER_MOD))) {
    SERVER_LOG(WARN, "get_ls_iter fail", K(ret));
O
obdev 已提交
100
  } else if (OB_FAIL(next_ls_info_(ls_info))) {
W
wangzelin.wzl 已提交
101
    if (OB_ITER_END != ret) {
O
obdev 已提交
102
      SERVER_LOG(WARN, "get next_ls_info failed", K(ret));
W
wangzelin.wzl 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
    }
  } else {
    const int64_t col_count = output_column_ids_.count();
    for (int64_t i = 0; OB_SUCC(ret) && i < col_count; ++i) {
      uint64_t col_id = output_column_ids_.at(i);
      switch (col_id) {
        case OB_APP_MIN_COLUMN_ID:
          // svr_ip
          if (addr_.ip_to_string(ip_buf_, sizeof(ip_buf_))) {
            cur_row_.cells_[i].set_varchar(ip_buf_);
            cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
          } else {
            ret = OB_ERR_UNEXPECTED;
            SERVER_LOG(WARN, "fail to execute ip_to_string", K(ret));
          }
          break;
        case OB_APP_MIN_COLUMN_ID + 1:
          // svr_port
          cur_row_.cells_[i].set_int(addr_.get_port());
          break;
        case OB_APP_MIN_COLUMN_ID + 2:
          // tenant_id
          cur_row_.cells_[i].set_int(MTL_ID());
          break;
        case OB_APP_MIN_COLUMN_ID + 3:
          // ls_id
          cur_row_.cells_[i].set_int(ls_id_);
          break;
        case OB_APP_MIN_COLUMN_ID + 4: {
          // replica_type
          if (OB_FAIL(replica_type_to_string(ls_info.replica_type_,
                                             replica_type_name_,
                                             sizeof(replica_type_name_)))) {
            SERVER_LOG(WARN, "get replica type name failed", K(ret), K(ls_info.replica_type_));
          } else {
            replica_type_name_[MAX_REPLICA_TYPE_LENGTH - 1] = '\0';
            cur_row_.cells_[i].set_varchar(replica_type_name_);
            cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
          }
          break;
        }
        case OB_APP_MIN_COLUMN_ID + 5: {
          // ls_state
          ObRole role;
          int64_t unused_proposal_id = 0;
          if (OB_FAIL(role_to_string(ls_info.ls_state_,
                                     state_name_,
                                     sizeof(state_name_)))) {
            SERVER_LOG(WARN, "get state role name failed", K(ret), K(role));
          } else {
            state_name_[MAX_LS_STATE_LENGTH - 1] = '\0';
            cur_row_.cells_[i].set_varchar(state_name_);
            cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
          }
          break;
        }
        case OB_APP_MIN_COLUMN_ID + 6:
          // tablet_count
          cur_row_.cells_[i].set_int(ls_info.tablet_count_);
          break;
        case OB_APP_MIN_COLUMN_ID + 7:
          // weak_read_timestamp
          cur_row_.cells_[i].set_uint64(ls_info.weak_read_timestamp_ < 0 ? 0 : ls_info.weak_read_timestamp_);
          break;
        case OB_APP_MIN_COLUMN_ID + 8:
          // need_rebuild
          cur_row_.cells_[i].set_varchar(ls_info.need_rebuild_ ? "YES" : "NO");
          cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
          break;
        case OB_APP_MIN_COLUMN_ID + 9:
          //TODO: SCN
          // clog_checkpoint_ts
          cur_row_.cells_[i].set_uint64(ls_info.checkpoint_ts_ < 0 ? 0 : ls_info.checkpoint_ts_);
          break;
        case OB_APP_MIN_COLUMN_ID + 10:
          // clog_checkpoint_lsn
          cur_row_.cells_[i].set_uint64(ls_info.checkpoint_lsn_ < 0 ? 0 : ls_info.checkpoint_lsn_);
          break;
      case OB_APP_MIN_COLUMN_ID + 11:
        // migrate_status
        cur_row_.cells_[i].set_int(ls_info.migrate_status_);
        break;
        default:
          ret = OB_ERR_UNEXPECTED;
          SERVER_LOG(WARN, "invalid col_id", K(ret), K(col_id));
          break;
      }
    }
  }
  if (OB_SUCC(ret)) {
    row = &cur_row_;
  }
  return ret;
}

} // observer
} // oceanbase