ob_log_binlog_record.cpp 10.0 KB
Newer Older
S
SanmuWangZJU 已提交
1 2 3 4 5 6 7 8 9 10
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
W
wangzelin.wzl 已提交
11 12
 *
 * Binlog Record
S
SanmuWangZJU 已提交
13 14
 */

15
#define USING_LOG_PREFIX OBLOG_FORMATTER
S
SanmuWangZJU 已提交
16

17 18 19
#ifdef OB_USE_DRCMSG
#include <drcmsg/MD.h>                        // ITableMeta
#endif
S
SanmuWangZJU 已提交
20 21 22 23 24 25 26 27 28

#include "ob_log_binlog_record.h"
#include "ob_log_utils.h"
#include "ob_log_instance.h"                  // TCTX

using namespace oceanbase::common;

namespace oceanbase
{
W
wangzelin.wzl 已提交
29
namespace libobcdc
S
SanmuWangZJU 已提交
30 31 32
{

ObLogBR::ObLogBR() : ObLogResourceRecycleTask(ObLogResourceRecycleTask::BINLOG_RECORD_TASK),
W
wangzelin.wzl 已提交
33 34 35 36
                     data_(nullptr),
                     host_(nullptr),
                     stmt_task_(nullptr),
                     next_br_(nullptr),
S
SanmuWangZJU 已提交
37 38
                     valid_(true),
                     tenant_id_(OB_INVALID_TENANT_ID),
W
wangzelin.wzl 已提交
39 40 41
                     schema_version_(OB_INVALID_VERSION),
                     commit_version_(0),
                     row_index_(0),
S
SanmuWangZJU 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54
                     part_trans_task_count_(0)
{
}

ObLogBR::~ObLogBR()
{
  reset();

  destruct_data_();
}

void ObLogBR::construct_data_(const bool creating_binlog_record)
{
W
wangzelin.wzl 已提交
55
  data_ = DRCMessageFactory::createBinlogRecord(TCTX.drc_message_factory_binlog_record_type_, creating_binlog_record);
S
SanmuWangZJU 已提交
56 57

  if (OB_ISNULL(data_)) {
58
    OBLOG_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "DRCMessageFactory::createBinlogRecord fails");
S
SanmuWangZJU 已提交
59 60 61 62 63 64 65 66 67
  } else {
    // set user data pointer to the pointer hold the binlog record
    data_->setUserData(this);
  }
}

void ObLogBR::destruct_data_()
{
  if (NULL != data_) {
W
wangzelin.wzl 已提交
68
    DRCMessageFactory::destroy(data_);
S
SanmuWangZJU 已提交
69 70 71 72 73 74 75 76 77
    data_ = NULL;
  }
}

void ObLogBR::reset()
{
  if (NULL != data_) {
    data_->clear();

W
wangzelin.wzl 已提交
78
    // note reset all filed used by libobcdc, cause clear() may won't reset fields
S
SanmuWangZJU 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93

    // clear new/old column array
    data_->setNewColumn(NULL, 0);
    data_->setOldColumn(NULL, 0);

    // clear TableMeta and IDBMeta
    data_->setTableMeta(NULL);
    data_->setTbname(NULL);
    data_->setDBMeta(NULL);
    data_->setDbname(NULL);

    // set user data pointer to the pointer hold the binlog record
    data_->setUserData(this);
  }

W
wangzelin.wzl 已提交
94 95 96
  host_ = nullptr;
  stmt_task_ = nullptr;
  next_br_ = nullptr;
S
SanmuWangZJU 已提交
97 98
  valid_ = true;
  tenant_id_ = OB_INVALID_TENANT_ID;
W
wangzelin.wzl 已提交
99 100
  schema_version_ = OB_INVALID_VERSION;
  commit_version_ = 0;
S
SanmuWangZJU 已提交
101 102 103 104 105 106 107 108
  part_trans_task_count_ = 0;
}

int ObLogBR::set_table_meta(ITableMeta *table_meta)
{
  int ret = OB_SUCCESS;

  if (OB_ISNULL(data_)) {
W
wangzelin.wzl 已提交
109
    LOG_ERROR("IBinlogRecord has not been created");
S
SanmuWangZJU 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    ret = OB_NOT_INIT;
  } else if (OB_ISNULL(table_meta)) {
    LOG_ERROR("invalid argument", K(table_meta));
    ret = OB_INVALID_ARGUMENT;
  } else {
    data_->setTableMeta(table_meta);
    data_->setTbname(table_meta->getName());
  }

  return ret;
}

int ObLogBR::set_db_meta(IDBMeta *db_meta)
{
  int ret = OB_SUCCESS;

  if (OB_ISNULL(data_)) {
W
wangzelin.wzl 已提交
127
    LOG_ERROR("IBinlogRecord has not been created");
S
SanmuWangZJU 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141
    ret = OB_NOT_INIT;
  } else if (OB_ISNULL(db_meta)) {
    LOG_ERROR("invalid argument", K(db_meta));
    ret = OB_INVALID_ARGUMENT;
  } else {
    data_->setDBMeta(db_meta);
    data_->setDbname(db_meta->getName());
  }

  return ret;
}

int ObLogBR::init_data(const RecordType type,
    const uint64_t cluster_id,
W
wangzelin.wzl 已提交
142 143
    const int64_t tenant_id,
    const uint64_t row_index,
S
SanmuWangZJU 已提交
144 145 146
    const common::ObString &trace_id,
    const common::ObString &trace_info,
    const common::ObString &unique_id,
W
wangzelin.wzl 已提交
147
    const int64_t schema_version,
S
SanmuWangZJU 已提交
148 149 150 151 152 153 154
    const int64_t commit_version,
    const int64_t part_trans_task_count,
    const common::ObString *major_version_str)
{
  int ret = OB_SUCCESS;

  if (OB_ISNULL(data_)) {
W
wangzelin.wzl 已提交
155
    LOG_ERROR("IBinlogRecord has not been created");
S
SanmuWangZJU 已提交
156 157
    ret = OB_NOT_INIT;
  } else if (OB_UNLIKELY(EUNKNOWN == type)
158
      || OB_UNLIKELY(! is_valid_cluster_id_(cluster_id))
S
SanmuWangZJU 已提交
159
      || OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)
W
wangzelin.wzl 已提交
160 161 162
      || OB_UNLIKELY(OB_INVALID_TIMESTAMP == schema_version)
      || OB_UNLIKELY(commit_version <= 0)) {
    LOG_ERROR("invalid argument", K(type), K(cluster_id), K(tenant_id), K(schema_version), K(commit_version));
S
SanmuWangZJU 已提交
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
    ret = OB_INVALID_ARGUMENT;
  } else if (OB_FAIL(verify_part_trans_task_count_(type, part_trans_task_count))) {
    LOG_ERROR("verify_part_trans_task_count_ fail", KR(ret), K(type),
        "type", print_record_type(type), K(part_trans_task_count));
  } else {
    // set to invalid_data
    int src_category = SRC_NO;
    // NOTE: init checkpint to 0 (sec/microsecond)
    uint64_t checkpoint_sec = 0;
    uint64_t checkpoint_usec = 0;

    data_->setRecordType(type);
    data_->setSrcCategory(src_category);
    data_->setCheckpoint(checkpoint_sec, checkpoint_usec);
    data_->setId(0); // always set id to 0
    data_->setSrcType(SRC_OCEANBASE_1_0);  // for OB 1.0

    // means that two consecutive statements operate on different fields
    // set this field to true for performance
    data_->setFirstInLogevent(true);

    // treate cluster_id as thread_id
    // convert from 64 bit to 32 bit
    data_->setThreadId(static_cast<uint32_t>(cluster_id));
    // set trans commit timestamp (second)
W
wangzelin.wzl 已提交
188 189
    int64_t commit_version_usec = commit_version / NS_CONVERSION;
    data_->setTimestamp(commit_version_usec / 1000000);
S
SanmuWangZJU 已提交
190 191
    // set trans commit timestamp (microsecond)
    // note: combine getTimestamp() and getRecordUsec() as complete trans commit timestamp
W
wangzelin.wzl 已提交
192
    data_->setRecordUsec(static_cast<uint32_t>(commit_version_usec % 1000000));
S
SanmuWangZJU 已提交
193 194 195 196 197 198 199 200

    // won't use this field
    data_->putFilterRuleVal("0", 1);
    // set unique id to binlog record
    data_->putFilterRuleVal(unique_id.ptr(), unique_id.length());
    // set OBTraceID
    data_->putFilterRuleVal(trace_id.ptr(), trace_id.length());

201 202
    // set ObTraceInfo
    data_->setObTraceInfo(trace_info.ptr());
S
SanmuWangZJU 已提交
203 204 205 206

    // put major version(from int32_t to char*) to the forth field
    if (EBEGIN == type) {
      if (OB_ISNULL(major_version_str)) {
Z
zhjc1124 已提交
207
        ret = OB_ERR_UNEXPECTED;
S
SanmuWangZJU 已提交
208 209 210 211 212 213
        LOG_ERROR("major version str for EBEGIN statement should not be null!", KR(ret), K(cluster_id),
            K(type), K(trace_id));
      } else {
        data_->putFilterRuleVal(major_version_str->ptr(), major_version_str->length());
      }
    }
W
wangzelin.wzl 已提交
214 215 216 217 218 219 220 221

    set_next(NULL);
    valid_ = true;
    tenant_id_ = tenant_id;
    row_index_ = row_index;
    schema_version_ = schema_version;
    commit_version_ = commit_version;
    part_trans_task_count_ = part_trans_task_count;
S
SanmuWangZJU 已提交
222 223 224 225 226
  }

  return ret;
}

227 228 229 230 231 232 233 234
bool ObLogBR::is_valid_cluster_id_(const uint64_t cluster_id) const
{
  bool bool_ret = false;
  bool_ret = is_valid_cluster_id(cluster_id)
    || (MIN_DRC_CLUSTER_ID <= cluster_id && MAX_DRC_CLUSTER_ID >= cluster_id);
  return bool_ret;
}

W
wangzelin.wzl 已提交
235
int ObLogBR::put_old(IBinlogRecord *br, const bool is_changed)
S
SanmuWangZJU 已提交
236 237 238 239 240 241 242 243 244 245
{
  int ret = OB_SUCCESS;

  if (OB_ISNULL(br)) {
    LOG_ERROR("invalid argument", K(br));
    ret = OB_INVALID_ARGUMENT;
  } else {
    // DRC proto
    // mark value of OldCol to empty string, use global unique empty string value
    // value of unchanged OldCol as NULL
246
    const char *val = is_changed ? COLUMN_VALUE_IS_EMPTY : COLUMN_VALUE_IS_NULL;
247 248
    // mark value of unchanged column is PADDING(column value should not be used)
    VALUE_ORIGIN origin = is_changed ? VALUE_ORIGIN::REDO : VALUE_ORIGIN::PADDING;
S
SanmuWangZJU 已提交
249 250 251

    int64_t pos = (NULL == val ? 0 : strlen(val));

252
    (void)br->putOld(val, static_cast<int>(pos), origin);
S
SanmuWangZJU 已提交
253 254 255 256 257
  }

  return ret;
}

258 259 260 261 262 263 264 265 266 267 268 269 270
void ObLogBR::mark_value_populated_by_cdc(IBinlogRecord &br, const bool is_new_col, const char *reason, const int col_idx)
{
  if (is_new_col) {
    // mark for new_cols
    (void) br.putNew(COLUMN_VALUE_IS_NULL, 0, VALUE_ORIGIN::PADDING);
  } else {
    // mark for old_cols
    (void) br.putOld(COLUMN_VALUE_IS_NULL, 0, VALUE_ORIGIN::PADDING);
  }
  LOG_DEBUG("mark_value_populated_by_cdc", K(is_new_col), K(col_idx), KCSTRING(reason));
}


S
SanmuWangZJU 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
int ObLogBR::get_record_type(int &record_type)
{
  int ret = OB_SUCCESS;
  record_type = 0;

  if (OB_ISNULL(data_)) {
    LOG_ERROR("data_ is null", K(data_));
    ret = OB_ERR_UNEXPECTED;
  } else {
    record_type = data_->recordType();
  }

  return ret;
}

int ObLogBR::setInsertRecordTypeForHBasePut(const RecordType type)
{
  int ret = OB_SUCCESS;

  if (OB_ISNULL(data_)) {
W
wangzelin.wzl 已提交
291
    LOG_ERROR("IBinlogRecord has not been created");
S
SanmuWangZJU 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
    ret = OB_NOT_INIT;
  } else if (OB_UNLIKELY(EINSERT != type)) {
    LOG_ERROR("invalid argument", "type", print_record_type(type));
  } else {
    data_->setRecordType(type);
  }

  return ret;
}

int ObLogBR::verify_part_trans_task_count_(const RecordType type,
    const int64_t part_trans_task_count)
{
  int ret = OB_SUCCESS;

  if (OB_UNLIKELY(EUNKNOWN == type)) {
    LOG_ERROR("invalid argument", K(type), "type", print_record_type(type));
    ret = OB_INVALID_ARGUMENT;
  } else {
    if ((EDDL == type) || (EBEGIN == type) || (ECOMMIT == type)) {
      // verify part_trans_task_count, should greater than 0 if DDL/BEGIN/COMMIT
      if (OB_UNLIKELY(part_trans_task_count <= 0)) {
        LOG_ERROR("part_trans_task_count is not greater than 0", K(type),
            "type", print_record_type(type), K(part_trans_task_count));
        ret = OB_ERR_UNEXPECTED;
      } else {
        // do nothing
      }
    }
  }

  return ret;
}

// unserilized Binlog record
ObLogUnserilizedBR::ObLogUnserilizedBR() : ObLogBR()
{
  construct_unserilized_data_();

  ObLogBR::reset();
}

ObLogUnserilizedBR::~ObLogUnserilizedBR()
{
}

void ObLogUnserilizedBR::construct_unserilized_data_()
{
  const bool creating_binlog_record = true;
  construct_data_(creating_binlog_record);
}

// serilized Binlog Record
ObLogSerilizedBR::ObLogSerilizedBR() : ObLogBR()
{
  construct_serilized_data_();

  ObLogBR::reset();
}

ObLogSerilizedBR::~ObLogSerilizedBR()
{
}

void ObLogSerilizedBR::construct_serilized_data_()
{
  const bool creating_binlog_record = false;
  construct_data_(creating_binlog_record);
}

W
wangzelin.wzl 已提交
362
} // end namespace libobcdc
S
SanmuWangZJU 已提交
363
} // end namespace oceanbase