ob_async_rpc_proxy.h 18.8 KB
Newer Older
W
wangzelin.wzl 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_RPC_OB_ASYNC_RPC_PROXY_H_
#define OCEANBASE_RPC_OB_ASYNC_RPC_PROXY_H_

#include "lib/lock/ob_thread_cond.h"
#include "lib/list/ob_dlink_node.h"
#include "lib/list/ob_dlist.h"
#include "share/ob_rpc_struct.h"
#include "share/ob_srv_rpc_proxy.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "rpc/obrpc/ob_rpc_result_code.h"
#include "rpc/obrpc/ob_rpc_proxy.h"

namespace oceanbase
{
namespace obrpc
{

30 31 32
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
class ObAsyncCB : public RpcProxy::template AsyncCB<PC>,
    public common::ObDLinkBase<ObAsyncCB<PC, AsyncRpcProxy, RpcProxy> >
W
wangzelin.wzl 已提交
33
{
34
  using AsyncCB = typename RpcProxy::template AsyncCB<PC>;
W
wangzelin.wzl 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
public:
  ObAsyncCB(AsyncRpcProxy &proxy) : proxy_(proxy) {}
  virtual ~ObAsyncCB() {}

  void set_args(const typename AsyncCB::Request &args) { UNUSED(args); }
  rpc::frame::ObReqTransport::AsyncCB *clone(const rpc::frame::SPAlloc &alloc) const;

  int process();
  void on_timeout();
  void on_invalid();

  int get_ret_code() const { return AsyncCB::rcode_.rcode_; }
  const common::ObAddr &get_dst() const { return AsyncCB::dst_; }
  int64_t get_timeout() const { return AsyncCB::timeout_; }
  const typename AsyncCB::Response &get_result() const
  {
    return AsyncCB::result_;
  }

  TO_STRING_KV("dst", get_dst(), "ret_code", get_ret_code(),
      "result", get_result());
private:
  AsyncRpcProxy &proxy_;
};

60 61
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
rpc::frame::ObReqTransport::AsyncCB *ObAsyncCB<PC, AsyncRpcProxy, RpcProxy>::clone(
W
wangzelin.wzl 已提交
62 63 64 65 66 67 68
    const rpc::frame::SPAlloc &alloc) const
{
  UNUSED(alloc);
  return const_cast<rpc::frame::ObReqTransport::AsyncCB *>(
      static_cast<const rpc::frame::ObReqTransport::AsyncCB * const>(this));
}

69 70
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
int ObAsyncCB<PC, AsyncRpcProxy, RpcProxy>::process()
W
wangzelin.wzl 已提交
71 72 73 74 75 76 77 78
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(proxy_.receive_response())) {
    RPC_LOG(WARN, "proxy_ receive_response failed", K(ret));
  }
  return ret;
}

79 80
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
void ObAsyncCB<PC, AsyncRpcProxy, RpcProxy>::on_timeout()
W
wangzelin.wzl 已提交
81 82
{
  int ret = common::OB_SUCCESS;
83
  RPC_LOG(WARN, "some error in rcode and enter on_timeout", K(AsyncCB::rcode_.rcode_));
W
wangzelin.wzl 已提交
84 85 86 87 88 89
  AsyncCB::rcode_.rcode_ = common::OB_TIMEOUT;
  if (OB_FAIL(proxy_.receive_response())) {
    RPC_LOG(WARN, "proxy_ receive_response failed", K(ret));
  }
}

90 91
template<ObRpcPacketCode PC, typename AsyncRpcProxy, typename RpcProxy>
void ObAsyncCB<PC, AsyncRpcProxy, RpcProxy>::on_invalid()
W
wangzelin.wzl 已提交
92 93 94 95
{
  int tmp_ret = common::OB_SUCCESS;
  AsyncCB::rcode_.rcode_ = common::OB_RPC_PACKET_INVALID;
  if (common::OB_SUCCESS != (tmp_ret = proxy_.receive_response())) {
96
    RPC_LOG_RET(WARN, tmp_ret, "proxy_ receive_response failed", K(tmp_ret));
W
wangzelin.wzl 已提交
97 98 99
  }
}

100
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
W
wangzelin.wzl 已提交
101 102 103 104 105 106 107 108
class ObAsyncRpcProxy
{
public:
  struct EmptyType {
  public:
    bool is_valid() const { return true; }
    TO_STRING_EMPTY();
  };
109
  ObAsyncRpcProxy(RpcProxy &rpc_proxy, const Func &func);
W
wangzelin.wzl 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
  virtual ~ObAsyncRpcProxy();

  void reuse();

  int call(const common::ObAddr &server,
           const int64_t timeout);
  int call(const common::ObAddr &server,
           const int64_t timeout,
           const RpcArg &arg);
  int call(const common::ObAddr &server,
           const int64_t timeout,
           const uint64_t tenant_id,
           const RpcArg &arg);
  int call(const common::ObAddr &server,
           const int64_t timeout,
           const int64_t cluster_id,
           const uint64_t tenant_id,
           const RpcArg &arg);
O
obdev 已提交
128 129 130 131 132 133
  int call(const common::ObAddr &server,
           const int64_t timeout,
           const int64_t cluster_id,
           const uint64_t tenant_id,
           const uint64_t group_id,
           const RpcArg &arg);
W
wangzelin.wzl 已提交
134 135 136 137 138 139 140 141 142

  // wait all asynchronous rpc finish, return fail if any rpc fail.
  int wait();
  // wait all asynchronous rpc finish and store it return code to %return_code_array
  int wait_all(common::ObIArray<int> &return_code_array);
  const common::ObIArray<RpcArg> &get_args() const { return args_; }
  const common::ObIArray<common::ObAddr> &get_dests() const { return dests_; }
  const common::ObIArray<const RpcResult *> &get_results() const { return results_; }
  int receive_response();
143 144

  int check_return_cnt(const int64_t return_cnt) const;
W
wangzelin.wzl 已提交
145 146
private:
  int call_rpc(const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id,
147
               const uint64_t tenant_id, const RpcArg &arg, ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb);
O
obdev 已提交
148 149
  int call_rpc(const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id,
               const uint64_t tenant_id, const uint64_t group_id, const RpcArg &arg,
150
               ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb);
W
wangzelin.wzl 已提交
151
  int call_rpc(const common::ObAddr &server, const int64_t timeout, const uint64_t tenant_id,
152
               const EmptyType &empty_obj, ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb);
W
wangzelin.wzl 已提交
153
  int wait(common::ObIArray<int> *return_code_array, const bool return_rpc_error);
154
  RpcProxy &rpc_proxy_;
W
wangzelin.wzl 已提交
155 156 157 158 159
  common::ObArray<RpcArg> args_;
  common::ObArray<common::ObAddr> dests_;
  common::ObArray<const RpcResult *> results_;
  Func func_;
  common::ObArenaAllocator allocator_;
160
  common::ObDList<ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> > cb_list_;
W
wangzelin.wzl 已提交
161 162 163 164 165 166
  int64_t response_count_;
  common::ObThreadCond cond_;
private:
  DISALLOW_COPY_AND_ASSIGN(ObAsyncRpcProxy);
};

167 168 169
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::ObAsyncRpcProxy(
    RpcProxy &rpc_proxy, const Func &func)
W
wangzelin.wzl 已提交
170 171 172 173 174 175 176 177 178 179
  : rpc_proxy_(rpc_proxy), args_(), results_(),
    func_(func), allocator_(common::ObModIds::OB_ASYNC_RPC_PROXY),
    cb_list_(), response_count_(0), cond_()
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(cond_.init(common::ObWaitEventIds::ASYNC_RPC_PROXY_COND_WAIT))) {
    RPC_LOG(ERROR, "cond init failed", K(ret));
  }
}

180 181
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::~ObAsyncRpcProxy()
W
wangzelin.wzl 已提交
182 183 184 185
{
  reuse();
}

186 187
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
void ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::reuse()
W
wangzelin.wzl 已提交
188 189
{
  args_.reuse();
190
  dests_.reuse();
W
wangzelin.wzl 已提交
191 192
  results_.reuse();
  response_count_ = 0;
193 194
  ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb = cb_list_.get_first();
  ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *next = NULL;
W
wangzelin.wzl 已提交
195 196 197 198 199 200 201 202 203
  while (cb != cb_list_.get_header()) {
    next = cb->get_next();
    cb->~ObAsyncCB();
    cb = next;
  }
  cb_list_.clear();
  allocator_.reuse();
}

204 205
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
W
wangzelin.wzl 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
    const common::ObAddr &server,
    const int64_t timeout)
{
  int ret = common::OB_SUCCESS;
  if (!server.is_valid() || timeout <= 0) {
    ret = common::OB_INVALID_ARGUMENT;
    RPC_LOG(WARN, "invalid argument", K(server), K(timeout), K(ret));
  } else if (OB_FAIL(call(server, timeout, EmptyType()))) {
    RPC_LOG(WARN, "call failed", K(server), K(timeout), K(ret));
  }

  // do_call failed, outer code won't wait, we should wait rpc responses have sent
  if (OB_FAIL(ret)) {
    common::ObThreadCondGuard guard(cond_);
    while (response_count_ < cb_list_.get_size()) {
      cond_.wait();
    }
  }
  return ret;
}

227 228
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
W
wangzelin.wzl 已提交
229 230 231 232
    const common::ObAddr &server,
    const int64_t timeout,
    const RpcArg &arg)
{
O
obdev 已提交
233 234 235
  return call(server, timeout, common::OB_INVALID_CLUSTER_ID, OB_SYS_TENANT_ID, 0, arg);
}

236 237
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
O
obdev 已提交
238 239 240 241 242 243
    const common::ObAddr &server,
    const int64_t timeout,
    const uint64_t tenant_id,
    const RpcArg &arg)
{
  return call(server, timeout, common::OB_INVALID_CLUSTER_ID, tenant_id, 0, arg);
W
wangzelin.wzl 已提交
244 245
}

246 247
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
W
wangzelin.wzl 已提交
248 249
    const common::ObAddr &server,
    const int64_t timeout,
O
obdev 已提交
250
    const int64_t cluster_id,
W
wangzelin.wzl 已提交
251 252 253
    const uint64_t tenant_id,
    const RpcArg &arg)
{
O
obdev 已提交
254
  return call(server, timeout, cluster_id, tenant_id, 0, arg);
W
wangzelin.wzl 已提交
255 256
}

257 258
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call(
W
wangzelin.wzl 已提交
259 260 261 262
    const common::ObAddr &server,
    const int64_t timeout,
    const int64_t cluster_id,
    const uint64_t tenant_id,
O
obdev 已提交
263
    const uint64_t group_id,
W
wangzelin.wzl 已提交
264 265 266 267 268 269 270
    const RpcArg &arg)
{
  int ret = common::OB_SUCCESS;
  void *mem = NULL;
  if (!server.is_valid() || timeout <= 0 || !arg.is_valid()) {
    ret = common::OB_INVALID_ARGUMENT;
    RPC_LOG(WARN, "invalid argument", K(server), K(timeout), K(arg), KR(ret));
271
  } else if (NULL == (mem = allocator_.alloc(sizeof(ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy>)))) {
W
wangzelin.wzl 已提交
272 273
    ret = common::OB_ALLOCATE_MEMORY_FAILED;
    RPC_LOG(ERROR, "alloc memory failed",
274
        "size", sizeof(ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy>), KR(ret));
W
wangzelin.wzl 已提交
275
  } else {
276
    ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb = new (mem) ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy>(*this);
W
wangzelin.wzl 已提交
277 278 279 280 281 282 283 284
    if (!cb_list_.add_last(cb)) {
      ret = common::OB_ERR_UNEXPECTED;
      RPC_LOG(WARN, "cb_list add_last failed", KR(ret));
    } else {
      if (OB_FAIL(args_.push_back(arg))) {
        RPC_LOG(WARN, "push_back failed", K(arg), KR(ret));
      } else if (OB_FAIL(dests_.push_back(server))) {
        RPC_LOG(WARN, "push_back failed", K(server), KR(ret));
O
obdev 已提交
285
      } else if (0 == group_id && OB_FAIL(call_rpc(server, timeout, cluster_id, tenant_id, arg, cb))) {
W
wangzelin.wzl 已提交
286
        RPC_LOG(WARN, "call rpc func failed", K(server), K(timeout),
O
obdev 已提交
287 288 289 290
               K(cluster_id), K(tenant_id), K(arg), K(group_id), KR(ret));
      } else if (0 != group_id && OB_FAIL(call_rpc(server, timeout, cluster_id, tenant_id, group_id, arg, cb))) {
        RPC_LOG(WARN, "call rpc func failed", K(server), K(timeout),
               K(cluster_id), K(tenant_id), K(arg), K(group_id), KR(ret));
W
wangzelin.wzl 已提交
291 292 293
      }
    }
    if (OB_FAIL(ret)) {
294 295
      // if send rpc failed, just call on_timeout to fill the result and add response count
      cb->on_timeout();
W
wangzelin.wzl 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309
    }
  }

  // do_call failed, outer code won't wait, we should wait rpc responses have sent
  if (OB_FAIL(ret)) {
    common::ObThreadCondGuard guard(cond_);
    while (response_count_ < cb_list_.get_size()) {
      cond_.wait();
    }
  }

  return ret;
}

310 311
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call_rpc(
W
wangzelin.wzl 已提交
312 313
    const common::ObAddr &server, const int64_t timeout,
    const int64_t cluster_id, const uint64_t tenant_id,
314
    const RpcArg &arg, ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb)
W
wangzelin.wzl 已提交
315 316 317 318 319 320
{
  int ret = common::OB_SUCCESS;
  if (!server.is_valid() || timeout <= 0 || !arg.is_valid() || NULL == cb) {
    ret = common::OB_INVALID_ARGUMENT;
    RPC_LOG(WARN, "invalid argument", K(server), K(timeout), K(arg), KP(cb), KR(ret));
  } else if (common::OB_INVALID_CLUSTER_ID == cluster_id) {
O
obdev 已提交
321 322 323 324
    if (OB_FAIL((rpc_proxy_.to(server).by(tenant_id).timeout(timeout).*func_)(
        arg, cb, ObRpcOpts()))) {
      RPC_LOG(WARN, "call rpc func failed", K(server), K(timeout), K(arg), K(tenant_id), KR(ret));
    }
W
wangzelin.wzl 已提交
325
  } else {
O
obdev 已提交
326 327 328
    if (OB_FAIL((rpc_proxy_.to(server).dst_cluster_id(cluster_id)
                .by(tenant_id).timeout(timeout).*func_)(arg, cb, ObRpcOpts()))) {
      RPC_LOG(WARN, "call rpc func failed", K(server), K(timeout), K(arg),
W
wangzelin.wzl 已提交
329
              K(cluster_id), K(tenant_id), KR(ret));
O
obdev 已提交
330 331 332 333 334
    }
  }
  return ret;
}

335 336
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call_rpc(
O
obdev 已提交
337 338 339
    const common::ObAddr &server, const int64_t timeout,
    const int64_t cluster_id, const uint64_t tenant_id,
    const uint64_t group_id, const RpcArg &arg,
340
    ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb)
O
obdev 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
{
  int ret = common::OB_SUCCESS;
  if (!server.is_valid() || timeout <= 0 || !arg.is_valid() || NULL == cb) {
    ret = common::OB_INVALID_ARGUMENT;
    RPC_LOG(WARN, "invalid argument", K(server), K(timeout), K(arg), KP(cb), KR(ret));
  } else if (common::OB_INVALID_CLUSTER_ID == cluster_id) {
    if (OB_FAIL((rpc_proxy_.to(server).by(tenant_id).timeout(timeout).group_id(group_id).*func_)(
        arg, cb, ObRpcOpts()))) {
      RPC_LOG(WARN, "call rpc func failed", K(server), K(timeout), K(arg),
              K(tenant_id), K(group_id), KR(ret));
    }
  } else {
    if (OB_FAIL((rpc_proxy_.to(server).dst_cluster_id(cluster_id)
                .by(tenant_id).timeout(timeout).group_id(group_id).*func_)(arg, cb, ObRpcOpts()))) {
      RPC_LOG(WARN, "call rpc func failed", K(server), K(timeout), K(arg),
              K(cluster_id), K(tenant_id), K(group_id), KR(ret));
    }
W
wangzelin.wzl 已提交
358 359 360 361
  }
  return ret;
}

362 363
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::call_rpc(
W
wangzelin.wzl 已提交
364
    const common::ObAddr &server, const int64_t timeout, const uint64_t tenant_id,
365
    const EmptyType &empty_obj, ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb)
W
wangzelin.wzl 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378
{
  UNUSED(empty_obj);
  int ret = common::OB_SUCCESS;
  if (!server.is_valid() || timeout <= 0 || NULL == cb) {
    ret = common::OB_INVALID_ARGUMENT;
    RPC_LOG(WARN, "invalid argument", K(server), K(timeout), KP(cb), K(ret));
  } else if (OB_FAIL((rpc_proxy_.to(server).by(tenant_id).timeout(timeout).*func_)(
          cb, ObRpcOpts()))) {
    RPC_LOG(WARN, "call rpc func failed", K(server), K(timeout), K(tenant_id), K(ret));
  }
  return ret;
}

379 380
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::wait()
W
wangzelin.wzl 已提交
381 382 383 384 385 386
{
  common::ObIArray<int> *return_code_array = NULL;
  const bool return_rpc_error = true;
  return wait(return_code_array, return_rpc_error);
}

387 388
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::wait_all(common::ObIArray<int> &return_code_array)
W
wangzelin.wzl 已提交
389 390 391 392 393
{
  const bool return_rpc_error = false;
  return wait(&return_code_array, return_rpc_error);
}

394 395
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::wait(
W
wangzelin.wzl 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
    common::ObIArray<int> *return_code_array, const bool return_rpc_error)
{
  int ret = common::OB_SUCCESS;
  {
    common::ObThreadCondGuard guard(cond_);
    if (response_count_ < 0 || response_count_ > cb_list_.get_size()) {
      ret = common::OB_INNER_STAT_ERROR;
      RPC_LOG(WARN, "inner stat error", K_(response_count), "cb_count",
          cb_list_.get_size(), K(ret));
    } else {
      while (response_count_ < cb_list_.get_size()) {
        cond_.wait();
      }

      // set results
      int index = 0;
412
      ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb = cb_list_.get_first();
W
wangzelin.wzl 已提交
413 414 415 416 417 418 419
      while (common::OB_SUCCESS == ret && cb != cb_list_.get_header()) {
        if (NULL == cb) {
          ret = common::OB_ERR_UNEXPECTED;
          RPC_LOG(WARN, "cb is null", KP(cb), K(ret));
        } else {
          const int rc = cb->get_ret_code();
          if (common::OB_SUCCESS != rc) {
420
            if (index <= (args_.count() -1)) {
O
obdev 已提交
421
              // ignore ret
422
              RPC_LOG(WARN, "execute rpc failed", K(rc), "server", cb->get_dst(), "timeout", cb->get_timeout(),
O
obdev 已提交
423
                  "packet code", PC, "arg", args_.at(index), K(ret));
424
            } else {
O
obdev 已提交
425
              // ignore ret
426
              RPC_LOG(WARN, "execute rpc failed and args_ count is not correct", K(rc), "server", cb->get_dst(), "timeout", cb->get_timeout(),
O
obdev 已提交
427
                  "packet code", PC, K(args_.count()), K(index), K(ret));
428
            }
W
wangzelin.wzl 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
          }
          if (NULL != return_code_array) {
            if (OB_FAIL(return_code_array->push_back(rc))) {
              RPC_LOG(WARN, "add return code failed", K(ret));
            }
          }
          if (OB_SUCC(ret)) {
            if (common::OB_SUCCESS != rc && return_rpc_error) {
              ret = rc;
            }
          }

          if (OB_FAIL(ret)) {
          } else if (OB_FAIL(results_.push_back(&cb->get_result()))) {
            RPC_LOG(WARN, "push_back failed", K(ret));
          } else {
            cb = cb->get_next();
            ++index;
          }
        }
      }
    }
  }
  return ret;
}

455 456
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::receive_response()
W
wangzelin.wzl 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
{
  int ret = common::OB_SUCCESS;
  common::ObThreadCondGuard guard(cond_);
  if (response_count_ < 0 || response_count_ >= cb_list_.get_size()) {
    ret = common::OB_INNER_STAT_ERROR;
    RPC_LOG(WARN, "inner stat error", K_(response_count), "cb_count", cb_list_.get_size(), K(ret));
  } else {
    ++response_count_;
    if (response_count_ == cb_list_.get_size()) {
      int tmp_ret = cond_.broadcast();
      if (common::OB_SUCCESS != tmp_ret) {
        RPC_LOG(WARN, "condition broadcast failed", K(tmp_ret));
      }
    }
  }
  return ret;
}

475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
template<ObRpcPacketCode PC, typename RpcArg, typename RpcResult, typename Func, typename RpcProxy>
int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::check_return_cnt(
    const int64_t return_cnt) const
{
  int ret = common::OB_SUCCESS;
  if (return_cnt != args_.count()
      || return_cnt != dests_.count()
      || return_cnt != results_.count()) {
    ret = common::OB_INVALID_ARGUMENT;
    RPC_LOG(WARN, "return cnt not match",
            KR(ret), K(return_cnt),
            "arg_cnt", args_.count(),
            "dest_cnt", dests_.count(),
            "result_cnt", results_.count());
  }
  return ret;
}

W
wangzelin.wzl 已提交
493 494
#define RPC_F(code, arg, result, name) \
  typedef obrpc::ObAsyncRpcProxy<code, arg, result, \
495
    int (obrpc::ObSrvRpcProxy::*)(const arg &, obrpc::ObSrvRpcProxy::AsyncCB<code> *, const obrpc::ObRpcOpts &), obrpc::ObSrvRpcProxy> name
W
wangzelin.wzl 已提交
496 497 498 499 500

}//end namespace obrpc
}//end namespace oceanbase

#endif //OCEANBASE_RPC_OB_ASYNC_RPC_PROXY_H_