ob_ddl_server_client.cpp 13.3 KB
Newer Older
O
obdev 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX STORAGE

#include "ob_ddl_server_client.h"
#include "observer/ob_server_struct.h"
#include "share/ob_common_rpc_proxy.h"
#include "share/ob_ddl_common.h"
#include "storage/ddl/ob_ddl_heart_beat_task.h"
#include "lib/ob_define.h"
#include "lib/mysqlclient/ob_isql_client.h"
#include "sql/engine/cmd/ob_ddl_executor_util.h"
23
#include "rootserver/ddl_task/ob_table_redefinition_task.h"
24
#include "observer/omt/ob_multi_tenant.h"
O
obdev 已提交
25 26 27 28 29 30

namespace oceanbase
{
namespace storage
{

31
int ObDDLServerClient::create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, int64_t &snapshot_version, sql::ObSQLSessionInfo &session)
O
obdev 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44
{
  int ret = OB_SUCCESS;
  obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
  if (OB_UNLIKELY(!arg.is_valid())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid arg", K(ret), K(arg));
  } else if (OB_ISNULL(common_rpc_proxy)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("common rpc proxy is null", K(ret));
  } else if (OB_FAIL(common_rpc_proxy->timeout(GCONF._ob_ddl_timeout).create_hidden_table(arg, res))) {
    LOG_WARN("failed to create hidden table", KR(ret), K(arg));
  } else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.set_register_task_id(res.task_id_, res.tenant_id_))) {
    LOG_WARN("failed to set register task id", K(ret), K(res));
O
obdev 已提交
45 46
  }
  if (OB_SUCC(ret)) {
47
    if (OB_FAIL(wait_task_reach_pending(arg.tenant_id_, res.task_id_, snapshot_version, *GCTX.sql_proxy_, session))) {
O
obdev 已提交
48
      LOG_WARN("failed to wait table lock. remove register task id and abort redef table task.", K(ret), K(arg), K(res));
O
obdev 已提交
49
    }
O
obdev 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
#ifdef ERRSIM
    if (OB_SUCC(ret)) {
      ret = OB_E(common::EventTable::EN_DDL_DIRECT_LOAD_WAIT_TABLE_LOCK_FAIL) OB_SUCCESS;
      LOG_INFO("wait table lock failed errsim", K(ret));
    }
#endif
    if (OB_FAIL(ret)) {
      int tmp_ret = OB_SUCCESS;
      obrpc::ObAbortRedefTableArg abort_redef_table_arg;
      abort_redef_table_arg.task_id_ = res.task_id_;
      abort_redef_table_arg.tenant_id_ = arg.tenant_id_;
      if (OB_TMP_FAIL(abort_redef_table(abort_redef_table_arg, session))) {
        LOG_WARN("failed to abort redef table", K(tmp_ret), K(abort_redef_table_arg));
      }
      // abort_redef_table() function last step must remove heart_beat task, so there is no need to call heart_beat_clear()
O
obdev 已提交
65 66 67 68 69 70 71 72 73
    }
  }
  return ret;
}

int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res, sql::ObSQLSessionInfo &session)
{
  int ret = OB_SUCCESS;
  obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
74
  int64_t unused_snapshot_version = OB_INVALID_VERSION;
O
obdev 已提交
75 76 77 78 79 80 81 82 83 84
  if (OB_UNLIKELY(!arg.is_valid())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid arg", K(ret), K(arg));
  } else if (OB_ISNULL(common_rpc_proxy)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("common rpc proxy is null", K(ret));
  } else if (OB_FAIL(common_rpc_proxy->start_redef_table(arg, res))) {
    LOG_WARN("failed to start redef table", KR(ret), K(arg));
  } else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.set_register_task_id(res.task_id_, res.tenant_id_))) {
    LOG_WARN("failed to set register task id", K(ret), K(res));
85
  } else if (OB_FAIL(wait_task_reach_pending(arg.orig_tenant_id_, res.task_id_, unused_snapshot_version, *GCTX.sql_proxy_, session))) {
O
obdev 已提交
86
    LOG_WARN("failed to wait table lock. remove register task id and abort redef table task.", K(ret), K(arg), K(res));
O
obdev 已提交
87 88 89 90 91 92 93
    int tmp_ret = OB_SUCCESS;
    obrpc::ObAbortRedefTableArg abort_redef_table_arg;
    abort_redef_table_arg.task_id_ = res.task_id_;
    abort_redef_table_arg.tenant_id_ = arg.orig_tenant_id_;
    if (OB_TMP_FAIL(abort_redef_table(abort_redef_table_arg, session))) {
      LOG_WARN("failed to abort redef table", K(tmp_ret), K(abort_redef_table_arg));
    }
O
obdev 已提交
94
    // abort_redef_table() function last step must remove heart_beat task, so there is no need to call heart_beat_clear()
O
obdev 已提交
95 96 97 98 99 100
  }
  return ret;
}
int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg)
{
  int ret = OB_SUCCESS;
101 102
  const int64_t retry_interval = 100 * 1000L;
  ObAddr rs_leader_addr;
O
obdev 已提交
103 104 105 106 107 108 109
  obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
  if (OB_UNLIKELY(!arg.is_valid())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid arg", K(ret), K(arg));
  } else if (OB_ISNULL(common_rpc_proxy)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("common rpc proxy is null", K(ret));
110 111 112
  } else {
    while (OB_SUCC(ret)) {
      int tmp_ret = OB_SUCCESS;
113
      omt::ObTenant *tenant = nullptr;
114 115
      if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
        LOG_WARN("fail to rootservice address", K(tmp_ret));
116 117
      } else if (OB_FAIL(GCTX.omt_->get_tenant(arg.tenant_id_, tenant))) {
        LOG_WARN("fail to get tenant, maybe tenant deleted", K_(arg.tenant_id));
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
      } else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).copy_table_dependents(arg))) {
        LOG_WARN("copy table dependents failed", K(ret), K(arg));
        if (OB_ENTRY_NOT_EXIST == ret) {
          LOG_WARN("ddl task not exist", K(ret), K(arg));
          break;
        } else {
          LOG_INFO("ddl task exist, try again", K(arg));
          ret = OB_SUCCESS;
          ob_usleep(retry_interval);
        }
      } else {
        LOG_INFO("copy table dependents success", K(arg));
        break;
      }
    }
O
obdev 已提交
133 134 135 136
  }
  return ret;
}

O
obdev 已提交
137
int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, sql::ObSQLSessionInfo &session)
O
obdev 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151
{
  int ret = OB_SUCCESS;
  const int64_t retry_interval = 100 * 1000L;
  ObAddr rs_leader_addr;
  obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
  if (OB_UNLIKELY(!arg.is_valid())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid arg", K(ret), K(arg));
  } else if (OB_ISNULL(common_rpc_proxy)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("common rpc proxy is null", K(ret));
  } else {
    while (OB_SUCC(ret)) {
      int tmp_ret = OB_SUCCESS;
152
      omt::ObTenant *tenant = nullptr;
O
obdev 已提交
153 154
      if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
        LOG_WARN("fail to get rootservice address", K(tmp_ret));
155 156
      } else if (OB_FAIL(GCTX.omt_->get_tenant(arg.tenant_id_, tenant))) {
        LOG_WARN("fail to get tenant, maybe tenant deleted", K_(arg.tenant_id));
O
obdev 已提交
157
      } else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).abort_redef_table(arg))) {
158
        LOG_WARN("abort redef table failed", K(ret), K(arg));
O
obdev 已提交
159 160 161
        if (OB_ENTRY_NOT_EXIST == ret) {
          break;
        } else {
162
          LOG_INFO("ddl task exist, try again", K(arg));
O
obdev 已提交
163 164 165 166 167 168 169 170 171 172 173
          ret = OB_SUCCESS;
          ob_usleep(retry_interval);
        }
      } else {
        LOG_INFO("abort task success");
        break;
      }
    }
    if (OB_ENTRY_NOT_EXIST == ret) {
      ret = OB_SUCCESS;
    }
O
obdev 已提交
174 175 176 177 178 179 180 181 182
    if (OB_SUCC(ret)) {
      if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(arg.tenant_id_, arg.task_id_, session, common_rpc_proxy))) {
        if (OB_CANCELED == ret) {
          ret = OB_SUCCESS;
          LOG_INFO("ddl abort success", K_(arg.task_id));
        } else {
          LOG_WARN("wait ddl finish failed", K(ret), K(arg.tenant_id_), K(arg.task_id_));
        }
      }
O
obdev 已提交
183
    }
O
obdev 已提交
184 185 186
    int tmp_ret = OB_SUCCESS;
    if (OB_TMP_FAIL(heart_beat_clear(arg.task_id_))) {
      LOG_WARN("heart beat clear failed", K(tmp_ret), K(arg.task_id_));
O
obdev 已提交
187 188 189 190 191
    }
  }
  return ret;
}

O
obdev 已提交
192 193 194
int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &finish_redef_arg,
                                          const obrpc::ObDDLBuildSingleReplicaResponseArg &build_single_arg,
                                          sql::ObSQLSessionInfo &session)
O
obdev 已提交
195 196
{
  int ret = OB_SUCCESS;
O
obdev 已提交
197
  int tmp_ret = OB_SUCCESS;
198 199
  const int64_t retry_interval = 100 * 1000L;
  ObAddr rs_leader_addr;
O
obdev 已提交
200
  obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
O
obdev 已提交
201
  if (OB_UNLIKELY(!finish_redef_arg.is_valid() || !build_single_arg.is_valid())) {
O
obdev 已提交
202
    ret = OB_INVALID_ARGUMENT;
O
obdev 已提交
203
    LOG_WARN("invalid arg", K(ret), K(finish_redef_arg), K(build_single_arg));
O
obdev 已提交
204 205 206
  } else if (OB_ISNULL(common_rpc_proxy)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("common rpc proxy is null", K(ret));
207 208 209
  } else {
    while (OB_SUCC(ret)) {
      int tmp_ret = OB_SUCCESS;
210
      omt::ObTenant *tenant = nullptr;
211 212
      if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
        LOG_WARN("fail to rootservice address", K(tmp_ret));
213 214
      } else if (OB_FAIL(GCTX.omt_->get_tenant(finish_redef_arg.tenant_id_, tenant))) {
        LOG_WARN("fail to get tenant, maybe tenant deleted", K_(finish_redef_arg.tenant_id));
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
      } else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).finish_redef_table(finish_redef_arg))) {
        LOG_WARN("finish redef table failed", K(ret), K(finish_redef_arg));
        if (OB_ENTRY_NOT_EXIST == ret) {
          break;
        } else {
          LOG_INFO("ddl task exist, try again", K(finish_redef_arg));
          ret = OB_SUCCESS;
          ob_usleep(retry_interval);
        }
      } else {
        LOG_INFO("finish redef table success", K(finish_redef_arg));
        break;
      }
    }
    if (OB_FAIL(ret)) {
    } else if (OB_FAIL(build_ddl_single_replica_response(build_single_arg))) {
        LOG_WARN("build ddl single replica response", K(ret), K(build_single_arg));
    } else if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(finish_redef_arg.tenant_id_, finish_redef_arg.task_id_, session, common_rpc_proxy))) {
      LOG_WARN("failed to wait ddl finish", K(ret), K(finish_redef_arg.tenant_id_), K(finish_redef_arg.task_id_));
    }
    if (OB_TMP_FAIL(heart_beat_clear(finish_redef_arg.task_id_))) {
      LOG_WARN("heart beat clear failed", K(tmp_ret), K(finish_redef_arg.task_id_));
    }
O
obdev 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
  }
  return ret;
}

int ObDDLServerClient::build_ddl_single_replica_response(const obrpc::ObDDLBuildSingleReplicaResponseArg &arg)
{
  int ret = OB_SUCCESS;
  obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
  if (OB_UNLIKELY(!arg.is_valid())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid arg", K(ret), K(arg));
  } else if (OB_ISNULL(common_rpc_proxy)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("common rpc proxy is null", K(ret));
  } else if (OB_FAIL(common_rpc_proxy->build_ddl_single_replica_response(arg))) {
    LOG_WARN("failed to finish redef table", K(ret), K(arg));
  }
  return ret;
}

258
int ObDDLServerClient::wait_task_reach_pending(const uint64_t tenant_id, const int64_t task_id, int64_t &snapshot_version, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session)
O
obdev 已提交
259 260 261 262 263 264 265 266 267 268 269 270
{
  int ret = OB_SUCCESS;
  const int64_t retry_interval = 100 * 1000;
  ObSqlString sql_string;
  THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + OB_MAX_USER_SPECIFIED_TIMEOUT);
  SMART_VAR(ObMySQLProxy::MySQLResult, res) {
    sqlclient::ObMySQLResult *result = NULL;
    if (OB_UNLIKELY(task_id <= 0 || OB_INVALID_ID == tenant_id)) {
      ret = OB_INVALID_ARGUMENT;
      LOG_WARN("invalid argument", K(ret), K(task_id), K(tenant_id));
    } else {
      while (OB_SUCC(ret)) {
271
        if (OB_FAIL(sql_string.assign_fmt("SELECT status, snapshot_version FROM %s WHERE task_id = %lu", share::OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) {
O
obdev 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
          LOG_WARN("assign sql string failed", K(ret), K(task_id));
        } else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql_string.ptr()))) {
          LOG_WARN("fail to execute sql", K(ret), K(sql_string));
        } else if (OB_ISNULL(result = res.get_result())) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("error unexpected, query result must not be NULL", K(ret));
        } else if (OB_FAIL(result->next())) {
          if (OB_LIKELY(OB_ITER_END == ret)) {
            ret = OB_ENTRY_NOT_EXIST;
          } else {
            LOG_WARN("fail to get next row", K(ret));
          }
        } else {
          int task_status = 0;
          EXTRACT_INT_FIELD_MYSQL(*result, "status", task_status, int);
287
          EXTRACT_UINT_FIELD_MYSQL(*result, "snapshot_version", snapshot_version, uint64_t);
288 289
          share::ObDDLTaskStatus task_cur_status = static_cast<share::ObDDLTaskStatus>(task_status);
          if (rootserver::ObTableRedefinitionTask::check_task_status_before_pending(task_cur_status)) {
O
obdev 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
            LOG_INFO("task status not equal REPENDING, Please Keep Waiting", K(task_status));
            if (OB_FAIL(sql::ObDDLExecutorUtil::handle_session_exception(session))) {
              break;
            } else {
              ob_usleep(retry_interval);
            }
          } else {
            break;
          }
        }
      }
    }
  }
  return ret;
}

O
obdev 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
int ObDDLServerClient::heart_beat_clear(const int64_t task_id)
{
  int ret = OB_SUCCESS;
  if (task_id <= 0) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(task_id));
  } else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(task_id))) {
    if (OB_HASH_NOT_EXIST == ret) {
      ret = OB_SUCCESS;
    } else {
      LOG_ERROR("failed to remove register task id", K(ret), K(task_id));
    }
  }
  return ret;
}

O
obdev 已提交
322 323 324

}  // end of namespace storage
}  // end of namespace oceanbase