ob_px_sqc_async_proxy.cpp 9.8 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX SQL_ENG

#include "sql/engine/px/ob_px_sqc_async_proxy.h"
16
#include "sql/engine/px/ob_px_util.h"
O
oceanbase-admin 已提交
17 18 19 20 21

namespace oceanbase {
using namespace common;
namespace sql {
/* ObSqcAsyncCB */
W
wangzelin.wzl 已提交
22
int ObSqcAsyncCB::process() {
O
oceanbase-admin 已提交
23 24 25 26 27 28 29
  ObThreadCondGuard guard(cond_);
  int ret = OB_SUCCESS;
  is_processed_ = true;
  ret = cond_.broadcast();
  return ret;
}

W
wangzelin.wzl 已提交
30
void ObSqcAsyncCB::on_invalid() {
O
oceanbase-admin 已提交
31 32 33 34
  ObThreadCondGuard guard(cond_);
  int ret = OB_SUCCESS;
  is_invalid_ = true;
  ret = cond_.broadcast();
W
wangzelin.wzl 已提交
35 36
  LOG_WARN("ObSqcAsyncCB invalid, check object serialization impl or oom",
           K(trace_id_), K(ret));
O
oceanbase-admin 已提交
37 38
}

W
wangzelin.wzl 已提交
39
void ObSqcAsyncCB::on_timeout() {
O
oceanbase-admin 已提交
40 41 42 43 44 45
  ObThreadCondGuard guard(cond_);
  int ret = OB_SUCCESS;
  is_timeout_ = true;
  ret = cond_.broadcast();
  LOG_WARN("ObSqcAsyncCB timeout, check timeout value, peer cpu load, network "
           "packet drop rate",
W
wangzelin.wzl 已提交
46
           K(trace_id_), K(ret));
O
oceanbase-admin 已提交
47 48
}

W
wangzelin.wzl 已提交
49 50
rpc::frame::ObReqTransport::AsyncCB *
ObSqcAsyncCB::clone(const rpc::frame::SPAlloc &alloc) const {
O
oceanbase-admin 已提交
51
  UNUSED(alloc);
W
wangzelin.wzl 已提交
52 53
  return const_cast<rpc::frame::ObReqTransport::AsyncCB *>(
      static_cast<const rpc::frame::ObReqTransport::AsyncCB *const>(this));
O
oceanbase-admin 已提交
54 55 56
}

/* ObPxSqcAsyncProxy */
W
wangzelin.wzl 已提交
57
int ObPxSqcAsyncProxy::launch_all_rpc_request() {
O
oceanbase-admin 已提交
58 59 60 61 62 63
  int ret = OB_SUCCESS;
  // prepare allocate the results_ array
  if (OB_FAIL(results_.prepare_allocate(sqcs_.count()))) {
    LOG_WARN("fail to prepare allocate result array");
  }

W
wangzelin.wzl 已提交
64 65 66 67 68 69 70
  if (OB_SUCC(ret)) {
    int64_t cluster_id = GCONF.cluster_id;
    SMART_VAR(ObPxRpcInitSqcArgs, args) {
      if (sqcs_.count() > 1) {
        args.enable_serialize_cache();
      }
      ARRAY_FOREACH_X(sqcs_, idx, count, OB_SUCC(ret)) {
71 72
        if (OB_UNLIKELY(ObPxCheckAlive::is_in_blacklist(sqcs_.at(idx)->get_exec_addr(),
                        session_->get_process_query_time()))) {
W
wangzelin.wzl 已提交
73 74 75 76 77 78 79 80
          ret = OB_RPC_CONNECT_ERROR;
          LOG_WARN("peer no in communication, maybe crashed", K(ret),
                  KPC(sqcs_.at(idx)), K(cluster_id), K(session_->get_process_query_time()));
        } else {
          ret = launch_one_rpc_request(args, idx, NULL);
        }
      }
    }
O
oceanbase-admin 已提交
81 82 83 84 85 86 87 88
  }
  if (OB_FAIL(ret)) {
    LOG_WARN("fail to launch all sqc rpc request", K(ret));
    fail_process();
  }
  return ret;
}

W
wangzelin.wzl 已提交
89
int ObPxSqcAsyncProxy::launch_one_rpc_request(ObPxRpcInitSqcArgs &args, int64_t idx, ObSqcAsyncCB *cb) {
O
oceanbase-admin 已提交
90
  int ret = OB_SUCCESS;
W
wangzelin.wzl 已提交
91 92 93 94 95 96
  ObCurTraceId::TraceId *trace_id = NULL;
  ObPxSqcMeta &sqc = *sqcs_.at(idx);
  const ObAddr &addr = sqc.get_exec_addr();
  int64_t timeout_us =
      phy_plan_ctx_->get_timeout_timestamp() - ObTimeUtility::current_time();
  args.set_serialize_param(exec_ctx_, const_cast<ObOpSpec &>(*dfo_.get_root_op_spec()), *phy_plan_);
O
oceanbase-admin 已提交
97 98 99 100 101 102 103 104 105 106 107
  if (OB_FAIL(ret)) {
  } else if (timeout_us < 0) {
    ret = OB_TIMEOUT;
  } else if (OB_FAIL(args.sqc_.assign(sqc))) {
    LOG_WARN("fail assign sqc", K(ret));
  } else if (OB_ISNULL(trace_id = ObCurTraceId::get_trace_id())) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("fail to get trace id");
  } else {
    // allocate SqcAsync callback
    if (cb == NULL) {
W
wangzelin.wzl 已提交
108
      void *mem = NULL;
O
oceanbase-admin 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
      if (NULL == (mem = allocator_.alloc(sizeof(ObSqcAsyncCB)))) {
        ret = OB_ALLOCATE_MEMORY_FAILED;
        LOG_WARN("alloc memory failed", "size", sizeof(ObSqcAsyncCB), K(ret));
      } else {
        cb = new (mem) ObSqcAsyncCB(cond_, *trace_id);
        if (OB_FAIL(callbacks_.push_back(cb))) {
          // free the callback
          LOG_WARN("callback obarray push back failed.");
          cb->~ObSqcAsyncCB();
          allocator_.free(cb);
          cb = NULL;
        }
      }
    }
    if (cb != NULL) {
      if (OB_SUCC(ret)) {
        if (OB_FAIL(proxy_.to(addr)
W
wangzelin.wzl 已提交
126
                        .by(THIS_WORKER.get_rpc_tenant()?: session_->get_effective_tenant_id())
O
oceanbase-admin 已提交
127 128
                        .timeout(timeout_us)
                        .async_init_sqc(args, cb))) {
W
wangzelin.wzl 已提交
129 130
          LOG_WARN("fail to call asynchronous sqc rpc", K(sqc), K(timeout_us),
                   K(ret));
O
oceanbase-admin 已提交
131 132
          // error_index_ = idx;
        } else {
W
wangzelin.wzl 已提交
133 134
          LOG_DEBUG("send the sqc request successfully.", K(idx), K(sqc),
                    K(args), K(cb));
O
oceanbase-admin 已提交
135 136
        }
      }
W
wangzelin.wzl 已提交
137 138
      // ret为TIME_OUT,或者在重新发送异步rpc的时候失败,都需要把对应的callback回收掉
      // 如果remove对应的callback失败,就不能对callback进行析构
O
oceanbase-admin 已提交
139
      if (OB_FAIL(ret) && cb != NULL) {
W
wangzelin.wzl 已提交
140
        // 使用temp_ret的原因是需要保留原始 ret 错误码
O
oceanbase-admin 已提交
141 142
        int temp_ret = callbacks_.remove(idx);
        if (temp_ret != OB_SUCCESS) {
W
wangzelin.wzl 已提交
143
          // 这里需要将callback标记为无效,等待`fail_process`处理
O
oceanbase-admin 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156
          cb->set_invalid(true);
          LOG_WARN("callback obarray remove element failed", K(ret));
        } else {
          cb->~ObSqcAsyncCB();
          allocator_.free(cb);
          cb = NULL;
        }
      }
    }
  }
  return ret;
}

W
wangzelin.wzl 已提交
157
int ObPxSqcAsyncProxy::wait_all() {
O
oceanbase-admin 已提交
158 159
  int ret = OB_SUCCESS;
  LOG_TRACE("wail all async sqc rpc to end", K(dfo_));
W
wangzelin.wzl 已提交
160 161 162 163
  // 退出while的条件:3个条件任意满足即退出while循环
  // 1. 在有效时间内获得足够多并且正确的callback结果
  // 2. 超时,ret = OB_TIMEOUT
  // 3. retry一个rpc失败
O
oceanbase-admin 已提交
164 165 166 167 168 169
  while (return_cb_count_ < sqcs_.count() && OB_SUCC(ret)) {

    ObThreadCondGuard guard(cond_);
    // wait for timeout or until notified.
    cond_.wait_us(500);

W
wangzelin.wzl 已提交
170 171 172
    if ((phy_plan_ctx_->get_timeout_timestamp() -
         ObTimeUtility::current_time()) < 0) {
      // 超过查询计划的timeout,满足退出条件2
O
oceanbase-admin 已提交
173 174 175
      ret = OB_TIMEOUT;
    }

W
wangzelin.wzl 已提交
176 177
    ARRAY_FOREACH_X(callbacks_, idx, count, OB_SUCC(ret)) {
      ObSqcAsyncCB &callback = *callbacks_.at(idx);
O
oceanbase-admin 已提交
178
      if (!callback.is_visited() && callback.is_timeout()) {
W
wangzelin.wzl 已提交
179 180 181
        // callback超时,不需要重试
        // 可能只是RPC超时, 但不是QUERY超时, 实现上需要区分
        // 这种情况需要标记为RPC CONNECT ERROR进行重试
O
oceanbase-admin 已提交
182
        return_cb_count_++;
W
wangzelin.wzl 已提交
183 184
        if (phy_plan_ctx_->get_timeout_timestamp() -
          ObTimeUtility::current_time() > 0) {
O
oceanbase-admin 已提交
185
          error_index_ = idx;
O
obdev 已提交
186
          ret = OB_RPC_CONNECT_ERROR;
O
oceanbase-admin 已提交
187 188 189 190 191
        } else {
          ret = OB_TIMEOUT;
        }
        callback.set_visited(true);
      } else if (!callback.is_visited() && callback.is_invalid()) {
W
wangzelin.wzl 已提交
192
        // rpc解析pack失败,callback调用on_invalid方法,不需要重试
O
oceanbase-admin 已提交
193 194 195 196 197 198 199
        return_cb_count_++;
        ret = OB_RPC_PACKET_INVALID;
        callback.set_visited(true);
      } else if (!callback.is_visited() && callback.is_processed()) {
        return_cb_count_++;
        callback.set_visited(true);
        if (OB_SUCC(callback.get_ret_code().rcode_)) {
W
wangzelin.wzl 已提交
200
          const ObPxRpcInitSqcResponse &cb_result = callback.get_result();
O
oceanbase-admin 已提交
201
          if (cb_result.rc_ == OB_ERR_INSUFFICIENT_PX_WORKER) {
W
wangzelin.wzl 已提交
202 203
            // 没有获得足够的px worker,不需要再做内部SQC的重试,防止死锁
            // SQC如果没有获得足够的worker,外层直接进行query级别的重试
204
            //
W
wangzelin.wzl 已提交
205 206
            LOG_INFO("can't get enough worker resource, and not retry",
                K(cb_result.rc_), K(*sqcs_.at(idx)));
O
oceanbase-admin 已提交
207 208
          }
          if (OB_FAIL(cb_result.rc_)) {
W
wangzelin.wzl 已提交
209
            // 错误可能包含 is_data_not_readable_err或者其他类型的错误
O
oceanbase-admin 已提交
210 211 212 213
            if (is_data_not_readable_err(ret)) {
              error_index_ = idx;
            }
          } else {
W
wangzelin.wzl 已提交
214
            // 获得正确的返回结果
O
oceanbase-admin 已提交
215 216 217
            results_.at(idx) = &cb_result;
          }
        } else {
W
wangzelin.wzl 已提交
218
          // RPC框架错误,直接返回对应的错误码,当前SQC不需要再进行重试
O
oceanbase-admin 已提交
219 220 221 222 223 224 225
          ret = callback.get_ret_code().rcode_;
          LOG_WARN("call rpc failed", K(ret), K(callback.get_ret_code()));
        }
      }
    }
  }

W
wangzelin.wzl 已提交
226 227 228
  // wait_all的结果:
  // 1. sqc对应的所有callback都返回正确的结果,return_cb_count_=sqcs_.count(),直接返回OB_SUCCESS;
  // 2. 由于超时或者重试sqc rpc失败,这种情况下需要等待所有callback响应结束后,才能返回ret。
O
oceanbase-admin 已提交
229
  if (return_cb_count_ < callbacks_.count()) {
W
wangzelin.wzl 已提交
230
    // 还有未处理完的callback,需要等待所有的callback响应结束才能够退出`wait_all`方法
O
oceanbase-admin 已提交
231 232 233 234 235
    fail_process();
  }
  return ret;
}

W
wangzelin.wzl 已提交
236
void ObPxSqcAsyncProxy::destroy() {
O
oceanbase-admin 已提交
237 238
  int ret = OB_SUCCESS;
  LOG_DEBUG("async sqc proxy deconstruct, the callbacklist is ", K(callbacks_));
W
wangzelin.wzl 已提交
239 240
  ARRAY_FOREACH(callbacks_, idx) {
    ObSqcAsyncCB *callback = callbacks_.at(idx);
O
oceanbase-admin 已提交
241 242 243 244 245 246 247 248
    LOG_DEBUG("async sqc proxy deconstruct, the callback status is ", K(idx), K(*callback));
    callback->~ObSqcAsyncCB();
  }
  allocator_.reuse();
  callbacks_.reuse();
  results_.reuse();
}

W
wangzelin.wzl 已提交
249
void ObPxSqcAsyncProxy::fail_process() {
250
  LOG_WARN_RET(OB_SUCCESS,
W
wangzelin.wzl 已提交
251 252
      "async sqc fails, process the callbacks that have not yet got results",
      K(return_cb_count_), K(callbacks_.count()));
O
oceanbase-admin 已提交
253 254
  while (return_cb_count_ < callbacks_.count()) {
    ObThreadCondGuard guard(cond_);
W
wangzelin.wzl 已提交
255 256
    ARRAY_FOREACH_X(callbacks_, idx, count, true) {
      ObSqcAsyncCB &callback = *callbacks_.at(idx);
O
oceanbase-admin 已提交
257 258 259
      if (!callback.is_visited()) {
        if (callback.is_processed() || callback.is_timeout() || callback.is_invalid()) {
          return_cb_count_++;
W
wangzelin.wzl 已提交
260 261
          LOG_DEBUG("async sql fails, wait all callbacks", K(return_cb_count_),
              K(callbacks_.count()));
O
oceanbase-admin 已提交
262 263 264 265 266 267
          callback.set_visited(true);
        }
      }
    }
    cond_.wait_us(500);
  }
268
  LOG_WARN_RET(OB_SUCCESS, "async sqc fails, all callbacks have been processed");
O
oceanbase-admin 已提交
269 270
}

W
wangzelin.wzl 已提交
271 272
} // namespace sql
} // namespace oceanbase