ob_ddl_clog.cpp 12.0 KB
Newer Older
W
wangzelin.wzl 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX STORAGE

#include "ob_ddl_clog.h"
#include "storage/ddl/ob_ddl_struct.h"
#include "storage/tx_storage/ob_ls_handle.h"
#include "storage/meta_mem/ob_tablet_handle.h"
#include "storage/tx_storage/ob_ls_service.h"
20
#include "storage/ddl/ob_tablet_ddl_kv_mgr.h"
S
simonjoylet 已提交
21
#include "storage/tablet/ob_tablet.h"
W
wangzelin.wzl 已提交
22 23 24 25 26

namespace oceanbase
{

using namespace blocksstable;
27
using namespace share;
W
wangzelin.wzl 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68

namespace storage
{

ObDDLClogCbStatus::ObDDLClogCbStatus()
  : the_other_release_this_(false), state_(ObDDLClogState::STATE_INIT), ret_code_(OB_SUCCESS)
{
}

bool ObDDLClogCbStatus::try_set_release_flag()
{
  return ATOMIC_BCAS(&the_other_release_this_, false, true);
}

ObDDLClogCb::ObDDLClogCb()
  : status_()
{
}

int ObDDLClogCb::on_success()
{
  status_.set_state(STATE_SUCCESS);
  try_release();
  return OB_SUCCESS;
}

int ObDDLClogCb::on_failure()
{
  status_.set_state(STATE_FAILED);
  try_release();
  return OB_SUCCESS;
}

void ObDDLClogCb::try_release()
{
  if (status_.try_set_release_flag()) {
  } else {
    op_free(this);
  }
}

69 70 71 72 73
ObDDLStartClogCb::ObDDLStartClogCb()
  : is_inited_(false), status_(), lock_tid_(0), ddl_kv_mgr_handle_()
{
}

74 75 76 77 78
int ObDDLStartClogCb::init(const ObITable::TableKey &table_key,
                           const int64_t data_format_version,
                           const int64_t execution_id,
                           const uint32_t lock_tid,
                           ObDDLKvMgrHandle &ddl_kv_mgr_handle)
79 80 81 82 83
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(is_inited_)) {
    ret = OB_INIT_TWICE;
    LOG_WARN("init twice", K(ret));
84 85
  } else if (OB_UNLIKELY(!table_key.is_valid() || execution_id < 0 || data_format_version < 0
          || 0 == lock_tid || !ddl_kv_mgr_handle.is_valid())) {
86 87 88
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret));
  } else {
89 90 91
    table_key_ = table_key;
    data_format_version_ = data_format_version;
    execution_id_ = execution_id;
92 93 94 95 96 97 98 99 100 101
    lock_tid_ = lock_tid;
    ddl_kv_mgr_handle_ = ddl_kv_mgr_handle;
    is_inited_ = true;
  }
  return ret;
}

int ObDDLStartClogCb::on_success()
{
  int ret = OB_SUCCESS;
102
  const SCN &start_scn = __get_scn();
103 104 105
  if (OB_UNLIKELY(!is_inited_)) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
106 107 108
  } else if (OB_FAIL(ddl_kv_mgr_handle_.get_obj()->ddl_start_nolock(table_key_, start_scn, data_format_version_,
      execution_id_, SCN::min_scn()/*checkpoint_scn*/))) {
    LOG_WARN("failed to start ddl in cb", K(ret), K(table_key_), K(start_scn), K(execution_id_));
109
  }
110
  ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_);
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
  status_.set_ret_code(ret);
  status_.set_state(STATE_SUCCESS);
  try_release();
  return OB_SUCCESS; // force return success
}

int ObDDLStartClogCb::on_failure()
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(!is_inited_)) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else {
    ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_);
  }
  status_.set_state(STATE_FAILED);
  try_release();
  return OB_SUCCESS;
}

void ObDDLStartClogCb::try_release()
{
  if (status_.try_set_release_flag()) {
  } else {
    op_free(this);
  }
}

W
wangzelin.wzl 已提交
139 140
ObDDLMacroBlockClogCb::ObDDLMacroBlockClogCb()
  : is_inited_(false), status_(), ls_id_(), redo_info_(), macro_block_id_(),
141
    arena_("ddl_clog_cb", OB_MALLOC_BIG_BLOCK_SIZE), data_buffer_lock_(), is_data_buffer_freed_(false), lock_tid_(0), ddl_kv_mgr_handle_()
W
wangzelin.wzl 已提交
142 143 144 145 146 147
{

}

int ObDDLMacroBlockClogCb::init(const share::ObLSID &ls_id,
                                const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
148 149 150
                                const blocksstable::MacroBlockId &macro_block_id,
                                const uint32_t lock_tid,
                                ObDDLKvMgrHandle &ddl_kv_mgr_handle)
W
wangzelin.wzl 已提交
151 152 153 154 155
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(is_inited_)) {
    ret = OB_INIT_TWICE;
    LOG_WARN("init twice", K(ret));
156
  } else if (OB_UNLIKELY(!ls_id.is_valid() || !redo_info.is_valid() || !macro_block_id.is_valid() || 0 == lock_tid || !ddl_kv_mgr_handle.is_valid())) {
W
wangzelin.wzl 已提交
157 158 159 160 161 162 163 164 165 166 167 168
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(ls_id), K(redo_info), K(macro_block_id));
  } else {
    void *buf = nullptr;
    if (OB_ISNULL(buf = arena_.alloc(redo_info.data_buffer_.length()))) {
      ret = OB_ALLOCATE_MEMORY_FAILED;
      LOG_WARN("allocate memory failed", K(ret), K(redo_info.data_buffer_.length()));
    } else {
      redo_info_.data_buffer_.assign(const_cast<char *>(redo_info.data_buffer_.ptr()), redo_info.data_buffer_.length());
      redo_info_.block_type_ = redo_info.block_type_;
      redo_info_.logic_id_ = redo_info.logic_id_;
      redo_info_.table_key_ = redo_info.table_key_;
O
obdev 已提交
169
      redo_info_.start_scn_ = redo_info.start_scn_;
W
wangzelin.wzl 已提交
170 171
      ls_id_ = ls_id;
      macro_block_id_ = macro_block_id;
172 173
      lock_tid_ = lock_tid;
      ddl_kv_mgr_handle_ = ddl_kv_mgr_handle;
W
wangzelin.wzl 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
    }
  }
  return ret;
}

void ObDDLMacroBlockClogCb::try_release()
{
  {
    ObSpinLockGuard data_buffer_guard(data_buffer_lock_);
    is_data_buffer_freed_ = true;
  }
  if (status_.try_set_release_flag()) {
  } else {
    op_free(this);
  }
}

int ObDDLMacroBlockClogCb::on_success()
{
  int ret = OB_SUCCESS;
  ObDDLMacroBlock macro_block;
  ObLSHandle ls_handle;
  ObTabletHandle tablet_handle;
  {
    ObSpinLockGuard data_buffer_guard(data_buffer_lock_);
    if (is_data_buffer_freed_) {
      LOG_INFO("data buffer is freed, do not need to callback");
    } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
      LOG_WARN("get ls handle failed", K(ret), K(ls_id_));
O
obdev 已提交
203
    } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, redo_info_.table_key_.get_tablet_id(), tablet_handle))) {
W
wangzelin.wzl 已提交
204 205 206 207 208 209
      LOG_WARN("get tablet handle failed", K(ret), K(redo_info_.table_key_));
    } else if (OB_FAIL(macro_block.block_handle_.set_block_id(macro_block_id_))) {
      LOG_WARN("set macro block id failed", K(ret), K(macro_block_id_));
    } else {
      macro_block.block_type_ = redo_info_.block_type_;
      macro_block.logic_id_ = redo_info_.logic_id_;
O
obdev 已提交
210
      macro_block.scn_ = __get_scn();
W
wangzelin.wzl 已提交
211 212
      macro_block.buf_ = redo_info_.data_buffer_.ptr();
      macro_block.size_ = redo_info_.data_buffer_.length();
O
obdev 已提交
213
      macro_block.ddl_start_scn_ = redo_info_.start_scn_;
W
wangzelin.wzl 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226
      if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block))) {
        LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle), K(macro_block));
      }
    }
  }
  status_.set_ret_code(ret);
  status_.set_state(STATE_SUCCESS);
  try_release();
  return OB_SUCCESS; // force return success
}

int ObDDLMacroBlockClogCb::on_failure()
{
227
  ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_);
W
wangzelin.wzl 已提交
228 229 230 231 232
  status_.set_state(STATE_FAILED);
  try_release();
  return OB_SUCCESS;
}

233
ObDDLCommitClogCb::ObDDLCommitClogCb()
234
  : is_inited_(false), status_(), ls_id_(), tablet_id_(), start_scn_(SCN::min_scn()), lock_tid_(0), ddl_kv_mgr_handle_()
235 236 237 238 239 240
{

}

int ObDDLCommitClogCb::init(const share::ObLSID &ls_id,
                            const common::ObTabletID &tablet_id,
241 242 243
                            const share::SCN &start_scn,
                            const uint32_t lock_tid,
                            ObDDLKvMgrHandle &ddl_kv_mgr_handle)
244 245 246 247 248
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(is_inited_)) {
    ret = OB_INIT_TWICE;
    LOG_WARN("init twice", K(ret));
249 250
  } else if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || !start_scn.is_valid_and_not_min()
      || 0 == lock_tid || !ddl_kv_mgr_handle.is_valid())) {
251
    ret = OB_INVALID_ARGUMENT;
252
    LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id), K(start_scn), K(lock_tid));
253 254 255 256
  } else {
    ls_id_ = ls_id;
    tablet_id_ = tablet_id;
    start_scn_ = start_scn;
257 258
    lock_tid_ = lock_tid;
    ddl_kv_mgr_handle_ = ddl_kv_mgr_handle;
259 260 261 262 263 264 265 266
    is_inited_ = true;
  }
  return ret;
}

int ObDDLCommitClogCb::on_success()
{
  int ret = OB_SUCCESS;
267 268
  ddl_kv_mgr_handle_.get_obj()->set_commit_scn_nolock(__get_scn());
  ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_);
269 270 271 272 273 274 275 276
  status_.set_ret_code(ret);
  status_.set_state(STATE_SUCCESS);
  try_release();
  return OB_SUCCESS; // force return success
}

int ObDDLCommitClogCb::on_failure()
{
277 278
  int ret = OB_SUCCESS;
  ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_);
279 280 281 282 283 284 285 286 287 288 289 290 291 292
  status_.set_state(STATE_FAILED);
  try_release();
  return OB_SUCCESS;
}

void ObDDLCommitClogCb::try_release()
{
  if (status_.try_set_release_flag()) {
  } else {
    op_free(this);
  }
}


W
wangzelin.wzl 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
DEFINE_SERIALIZE(ObDDLClogHeader)
{
  int ret = OB_SUCCESS;
  int64_t tmp_pos = pos;

  if (OB_ISNULL(buf) || buf_len <= 0) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(buf_len));
  } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, tmp_pos, static_cast<int64_t>(ddl_clog_type_)))) {
    LOG_WARN("fail to serialize ObDDLClogHeader", K(ret));
  } else {
    pos = tmp_pos;
  }
  return ret;
}

DEFINE_DESERIALIZE(ObDDLClogHeader)
{
  int ret = OB_SUCCESS;
  int64_t tmp_pos = pos;

  int64_t log_type = 0;
  if (OB_ISNULL(buf) || data_len <= 0) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(data_len));
  } else if (OB_FAIL(serialization::decode_i64(buf, data_len, tmp_pos, &log_type))) {
    LOG_WARN("fail to deserialize ObDDLClogHeader", K(ret));
  } else {
    ddl_clog_type_ = static_cast<ObDDLClogType>(log_type);
    pos = tmp_pos;
  }

  return ret;
}

DEFINE_GET_SERIALIZE_SIZE(ObDDLClogHeader)
{
  int64_t size = 0;
  size += serialization::encoded_length_i64(static_cast<int64_t>(ddl_clog_type_));
  return size;
}

ObDDLStartLog::ObDDLStartLog()
336
  : table_key_(), data_format_version_(0), execution_id_(-1)
W
wangzelin.wzl 已提交
337 338 339
{
}

340
int ObDDLStartLog::init(const ObITable::TableKey &table_key, const int64_t data_format_version, const int64_t execution_id)
W
wangzelin.wzl 已提交
341 342
{
  int ret = OB_SUCCESS;
343
  if (OB_UNLIKELY(!table_key.is_valid() || execution_id < 0 || data_format_version_ < 0)) {
W
wangzelin.wzl 已提交
344
    ret = OB_INVALID_ARGUMENT;
345
    LOG_WARN("invalid argument", K(ret), K(table_key), K(execution_id), K(data_format_version));
W
wangzelin.wzl 已提交
346 347
  } else {
    table_key_ = table_key;
348
    data_format_version_ = data_format_version;
O
obdev 已提交
349
    execution_id_ = execution_id;
W
wangzelin.wzl 已提交
350 351 352 353
  }
  return ret;
}

354
OB_SERIALIZE_MEMBER(ObDDLStartLog, table_key_, data_format_version_, execution_id_);
W
wangzelin.wzl 已提交
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374

ObDDLRedoLog::ObDDLRedoLog()
  : redo_info_()
{
}

int ObDDLRedoLog::init(const blocksstable::ObDDLMacroBlockRedoInfo &redo_info)
{
  int ret = OB_SUCCESS;
  if (!redo_info.is_valid()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(redo_info));
  } else {
    redo_info_ = redo_info;
  }
  return ret;
}

OB_SERIALIZE_MEMBER(ObDDLRedoLog, redo_info_);

S
simonjoylet 已提交
375
ObDDLCommitLog::ObDDLCommitLog()
376
  : table_key_(), start_scn_(SCN::min_scn())
W
wangzelin.wzl 已提交
377 378 379
{
}

S
simonjoylet 已提交
380 381
int ObDDLCommitLog::init(const ObITable::TableKey &table_key,
                         const SCN &start_scn)
W
wangzelin.wzl 已提交
382 383
{
  int ret = OB_SUCCESS;
384
  if (!table_key.is_valid() || !start_scn.is_valid_and_not_min()) {
W
wangzelin.wzl 已提交
385
    ret = OB_INVALID_ARGUMENT;
386
    LOG_WARN("invalid argument", K(ret), K(table_key), K(start_scn));
W
wangzelin.wzl 已提交
387 388
  } else {
    table_key_ = table_key;
O
obdev 已提交
389
    start_scn_ = start_scn;
W
wangzelin.wzl 已提交
390 391 392 393
  }
  return ret;
}

S
simonjoylet 已提交
394
OB_SERIALIZE_MEMBER(ObDDLCommitLog, table_key_, start_scn_);
W
wangzelin.wzl 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420


ObTabletSchemaVersionChangeLog::ObTabletSchemaVersionChangeLog()
  : tablet_id_(), schema_version_(-1)
{
}

int ObTabletSchemaVersionChangeLog::init(const ObTabletID &tablet_id, const int64_t schema_version)
{
  int ret = OB_SUCCESS;
  if (!tablet_id.is_valid() || schema_version < 0) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(tablet_id), K(schema_version));
  } else {
    tablet_id_ = tablet_id;
    schema_version_ = schema_version;
  }
  return ret;
}

OB_SERIALIZE_MEMBER(ObTabletSchemaVersionChangeLog, tablet_id_, schema_version_);

OB_SERIALIZE_MEMBER(ObDDLBarrierLog, ls_id_, hidden_tablet_ids_);

}
}