/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #ifndef OB_PX_RPC_PROCESSOR_H #define OB_PX_RPC_PROCESSOR_H #include "rpc/obrpc/ob_rpc_processor.h" #include "share/interrupt/ob_global_interrupt_call.h" #include "sql/engine/px/ob_px_rpc_proxy.h" #include "sql/engine/ob_des_exec_context.h" #include "sql/engine/ob_physical_plan.h" namespace oceanbase { namespace sql { class ObPxSqcHandler; class ObInitSqcP : public obrpc::ObRpcProcessor> { public: ObInitSqcP(const observer::ObGlobalContext &gctx) : exec_ctx_(CURRENT_CONTEXT->get_arena_allocator(), gctx.session_mgr_), phy_plan_(), unregister_interrupt_(false) {} virtual ~ObInitSqcP() = default; virtual int init() final; virtual void destroy() final; virtual int process() final; virtual int after_process(int error_code) final; private: int pre_setup_op_input(ObPxSqcHandler &sqc_handler); int startup_normal_sqc(ObPxSqcHandler &sqc_handler); private: sql::ObDesExecContext exec_ctx_; sql::ObPhysicalPlan phy_plan_; bool unregister_interrupt_; }; class ObInitTaskP : public obrpc::ObRpcProcessor > { public: ObInitTaskP(const observer::ObGlobalContext &gctx) : exec_ctx_(CURRENT_CONTEXT->get_arena_allocator(), gctx.session_mgr_), phy_plan_() {} virtual ~ObInitTaskP() = default; virtual int init() final; virtual int process() final; virtual int after_process(int error_code) final; private: sql::ObDesExecContext exec_ctx_; sql::ObPhysicalPlan phy_plan_; //observer::ObVirtualTableIteratorFactory vt_iter_factory_; //share::schema::ObSchemaGetterGuard schema_guard_; }; class ObInitFastSqcP : public obrpc::ObRpcProcessor > { public: ObInitFastSqcP(const observer::ObGlobalContext &gctx) : exec_ctx_(CURRENT_CONTEXT->get_arena_allocator(), gctx.session_mgr_), phy_plan_() {} virtual ~ObInitFastSqcP() = default; virtual int init() final; virtual void destroy() final; virtual int process() final; private: int startup_normal_sqc(ObPxSqcHandler &sqc_handler); private: sql::ObDesExecContext exec_ctx_; sql::ObPhysicalPlan phy_plan_; }; class ObFastInitSqcReportQCMessageCall { public: ObFastInitSqcReportQCMessageCall(ObPxSqcMeta *sqc, int err, int64_t timeout_ts, bool sqc_alive) : sqc_(sqc), err_(err), need_interrupt_(false), timeout_ts_(timeout_ts), sqc_alive_(sqc_alive) { need_interrupt_ = true; } ~ObFastInitSqcReportQCMessageCall() = default; void operator() (hash::HashMapPair &entry); int mock_sqc_finish_msg(); public: ObPxSqcMeta *sqc_; int err_; bool need_interrupt_; int64_t timeout_ts_; bool sqc_alive_; }; class ObDealWithRpcTimeoutCall { public: ObDealWithRpcTimeoutCall(common::ObAddr addr, ObQueryRetryInfo *retry_info, int64_t timeout_ts, common::ObCurTraceId::TraceId &trace_id, bool retry) : addr_(addr), retry_info_(retry_info), timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT), can_retry_(retry) {} ~ObDealWithRpcTimeoutCall() = default; void operator() (hash::HashMapPair &entry); void deal_with_rpc_timeout_err(); public: common::ObAddr addr_; ObQueryRetryInfo *retry_info_; int64_t timeout_ts_; common::ObCurTraceId::TraceId trace_id_; int ret_; bool can_retry_; }; class ObFastInitSqcCB : public obrpc::ObPxRpcProxy::AsyncCB { public: ObFastInitSqcCB(const common::ObAddr &server, const common::ObCurTraceId::TraceId &trace_id, ObQueryRetryInfo *retry_info, int64_t timeout_ts, ObInterruptibleTaskID tid, ObPxSqcMeta *sqc) : addr_(server), retry_info_(retry_info), timeout_ts_(timeout_ts), interrupt_id_(tid), sqc_(sqc) { trace_id_.set(trace_id); } virtual ~ObFastInitSqcCB() {} public: virtual int process(); virtual void on_invalid() {} virtual void on_timeout(); rpc::frame::ObReqTransport::AsyncCB *clone( const rpc::frame::SPAlloc &alloc) const { void *buf = alloc(sizeof(*this)); rpc::frame::ObReqTransport::AsyncCB *newcb = NULL; if (NULL != buf) { newcb = new (buf) ObFastInitSqcCB(addr_, trace_id_, retry_info_, timeout_ts_, interrupt_id_, sqc_); } return newcb; } virtual void set_args(const Request &arg) { UNUSED(arg); } int deal_with_rpc_timeout_err_safely(); void interrupt_qc(int err, bool is_timeout); private: common::ObAddr addr_; ObQueryRetryInfo *retry_info_; int64_t timeout_ts_; ObInterruptibleTaskID interrupt_id_; ObPxSqcMeta *sqc_; common::ObCurTraceId::TraceId trace_id_; DISALLOW_COPY_AND_ASSIGN(ObFastInitSqcCB); }; class ObPxTenantTargetMonitorP : public obrpc::ObRpcProcessor > { public: ObPxTenantTargetMonitorP(const observer::ObGlobalContext &global_ctx) : global_ctx_(global_ctx) { (void)global_ctx_; } virtual ~ObPxTenantTargetMonitorP() = default; virtual int init() final; virtual void destroy() final; virtual int process() final; private: const observer::ObGlobalContext &global_ctx_; }; } // sql } // oceanbase #endif /* OB_PX_RPC_PROCESSOR_H */