提交 895f700d 编写于 作者: X xj0 提交者: wangzelin.wzl

add tableapi to opensource release

上级 7c05e325
......@@ -578,7 +578,6 @@ PCODE_DEF(OB_TABLE_API_LOGIN, 0x1101)
PCODE_DEF(OB_TABLE_API_EXECUTE, 0x1102)
PCODE_DEF(OB_TABLE_API_BATCH_EXECUTE, 0x1103)
PCODE_DEF(OB_TABLE_API_EXECUTE_QUERY, 0x1104)
PCODE_DEF(OB_TABLE_API_QUERY_AND_MUTATE, 0x1105)
// Event Job API
PCODE_DEF(OB_RUN_EVENT_JOB, 0x1201)
......
......@@ -240,6 +240,16 @@ ob_set_subtarget(ob_server vt
virtual_table/ob_all_virtual_backupset_history_mgr.cpp
)
ob_set_subtarget(ob_server table
table/ob_table_batch_execute_processor.cpp
table/ob_table_end_trans_cb.cpp
table/ob_table_execute_processor.cpp
table/ob_table_query_processor.cpp
table/ob_table_rpc_processor.cpp
table/ob_table_service.cpp
table/ob_table_api_row_iterator.cpp
)
ob_server_add_pchs(observer
ob_server_struct.h
ob_uniq_task_queue.h
......
......@@ -45,6 +45,7 @@
#include "sql/ob_sql_init.h"
#include "sql/ob_sql_task.h"
#include "observer/ob_server.h"
#include "observer/table/ob_table_rpc_processor.h"
#include "sql/ob_sql_init.h"
#include "sql/dtl/ob_dtl.h"
#include "sql/ob_sql_init.h"
......@@ -129,6 +130,7 @@ ObServer::ObServer()
vt_data_service_(root_service_, self_addr_, &config_),
cache_size_calculator_(),
weak_read_service_(),
table_service_(),
cgroup_ctrl_(),
start_time_(ObTimeUtility::current_time()),
zone_merged_version_(OB_MERGED_VERSION_INIT),
......@@ -198,8 +200,10 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg)
}
if (OB_SUCC(ret)) {
if (OB_FAIL(init_loaddata_global_stat())) {
LOG_WARN("fail to init global load data stat map", K(ret));
if (OB_FAIL(ObTableApiProcessorBase::init_session())) {
LOG_WARN("failed to init static session", K(ret));
} else if (OB_FAIL(init_loaddata_global_stat())) {
LOG_WARN("fail to init global load data stat map", K(ret));
}
}
}
......@@ -326,6 +330,8 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg)
LOG_WARN("fail to init long ops monitor instance", K(ret));
} else if (OB_FAIL(ObCompatModeGetter::instance().init(&sql_proxy_))) {
LOG_WARN("fail to init get compat mode server");
} else if (OB_FAIL(table_service_.init(gctx_))) {
LOG_WARN("failed to init table service", K(ret));
} else if (OB_FAIL(ObTimerMonitor::get_instance().init())) {
LOG_WARN("failed to init timer monitor", K(ret));
} else if (OB_FAIL(ObBGThreadMonitor::get_instance().init())) {
......@@ -1468,6 +1474,7 @@ int ObServer::init_global_context()
(void)gctx_.set_split_schema_version(OB_INVALID_VERSION);
(void)gctx_.set_split_schema_version_v2(OB_INVALID_VERSION);
gctx_.weak_read_service_ = &weak_read_service_;
gctx_.table_service_ = &table_service_;
gctx_.cgroup_ctrl_ = &cgroup_ctrl_;
gctx_.schema_status_proxy_ = &schema_status_proxy_;
(void)gctx_.set_upgrade_stage(obrpc::OB_UPGRADE_STAGE_INVALID);
......
......@@ -19,6 +19,7 @@
#include "share/stat/ob_user_tab_col_statistics.h"
#include "share/stat/ob_opt_stat_service.h"
#include "observer/table/ob_table_service.h"
#include "sql/ob_sql.h"
#include "sql/engine/cmd/ob_load_data_rpc.h"
#include "sql/ob_query_exec_ctx_mgr.h"
......@@ -388,6 +389,9 @@ private:
// Weakly Consistent Read Service
transaction::ObWeakReadService weak_read_service_;
// table service
ObTableService table_service_;
// Tenant isolation resource management
omt::ObCgroupCtrl cgroup_ctrl_;
......
......@@ -266,6 +266,7 @@ ObGlobalContext& ObGlobalContext::operator=(const ObGlobalContext& other)
sort_dir_ = other.sort_dir_;
diag_ = other.diag_;
scramble_rand_ = other.scramble_rand_;
table_service_ = other.table_service_;
cgroup_ctrl_ = other.cgroup_ctrl_;
inited_ = other.inited_;
split_schema_version_ = other.split_schema_version_;
......
......@@ -84,6 +84,7 @@ class ObCgroupCtrl;
namespace observer {
class ObService;
class ObVTIterCreator;
class ObTableService;
class ObServerOptions {
public:
......@@ -197,6 +198,7 @@ struct ObGlobalContext {
common::ObString* sort_dir_;
obmysql::ObDiag* diag_;
common::ObMysqlRandom* scramble_rand_;
ObTableService* table_service_;
omt::ObCgroupCtrl* cgroup_ctrl_;
bool inited_;
int64_t split_schema_version_;
......
......@@ -37,6 +37,11 @@
#include "observer/ob_rpc_processor_simple.h"
#include "observer/ob_srv_task.h"
#include "observer/table/ob_table_rpc_processor.h"
#include "observer/table/ob_table_execute_processor.h"
#include "observer/table/ob_table_batch_execute_processor.h"
#include "observer/table/ob_table_query_processor.h"
using namespace oceanbase;
using namespace oceanbase::observer;
using namespace oceanbase::lib;
......@@ -125,6 +130,12 @@ void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator* xlator)
// SQL Estimate
RPC_PROCESSOR(ObEstimatePartitionRowsP, gctx_);
// table api
RPC_PROCESSOR(ObTableLoginP, gctx_);
RPC_PROCESSOR(ObTableApiExecuteP, gctx_);
RPC_PROCESSOR(ObTableBatchExecuteP, gctx_);
RPC_PROCESSOR(ObTableQueryP, gctx_);
// HA GTS
RPC_PROCESSOR(ObHaGtsPingRequestP, gctx_);
RPC_PROCESSOR(ObHaGtsGetRequestP, gctx_);
......
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_RPC_ASYNC_RESPONSE_H
#define _OB_RPC_ASYNC_RESPONSE_H 1
#include "rpc/ob_request.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "rpc/frame/ob_req_processor.h"
#include "rpc/obmysql/ob_mysql_request_utils.h"
#include "rpc/obrpc/ob_rpc_result_code.h"
#include "lib/oblog/ob_warning_buffer.h"
#include "ob_table_rpc_processor_util.h"
namespace oceanbase
{
namespace obrpc
{
// this class is copied from ObRpcProcessor
template <class T>
class ObRpcAsyncResponse
{
public:
ObRpcAsyncResponse(rpc::ObRequest *req, T &result)
:req_(req),
result_(result),
using_buffer_(NULL)
{}
virtual ~ObRpcAsyncResponse() = default;
int response(const int retcode);
private:
int serialize();
int do_response(ObRpcPacket *response_pkt, bool bad_routing);
char *easy_alloc(int64_t size) const;
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ObRpcAsyncResponse);
private:
rpc::ObRequest *req_;
T &result_;
common::ObDataBuffer *using_buffer_;
};
template <class T>
char *ObRpcAsyncResponse<T>::easy_alloc(int64_t size) const
{
void *buf = NULL;
if (OB_ISNULL(req_)) {
RPC_OBRPC_LOG(ERROR, "request is invalid", KP(req_));
} else if (OB_ISNULL(req_->get_request())
|| OB_ISNULL(req_->get_request()->ms)
|| OB_ISNULL(req_->get_request()->ms->pool)) {
RPC_OBRPC_LOG(ERROR, "request is invalid", K(req_));
} else {
buf = easy_pool_alloc(
req_->get_request()->ms->pool, static_cast<uint32_t>(size));
}
return static_cast<char*>(buf);
}
template <class T>
int ObRpcAsyncResponse<T>::serialize()
{
int ret = common::OB_SUCCESS;
if (OB_ISNULL(using_buffer_)) {
ret = common::OB_ERR_UNEXPECTED;
RPC_OBRPC_LOG(ERROR, "using_buffer_ should not be NULL", K(ret));
} else if (OB_FAIL(common::serialization::encode(
using_buffer_->get_data(), using_buffer_->get_capacity(),
using_buffer_->get_position(), result_))) {
RPC_OBRPC_LOG(WARN, "encode data error", K(ret));
} else {
//do nothing
}
return ret;
}
template <class T>
int ObRpcAsyncResponse<T>::do_response(ObRpcPacket *response_pkt, bool bad_routing)
{
int ret = common::OB_SUCCESS;
if (OB_ISNULL(req_)) {
ret = common::OB_ERR_NULL_VALUE;
RPC_OBRPC_LOG(WARN, "req is NULL", K(ret));
} else if (OB_ISNULL(req_->get_request())) {
ret = common::OB_ERR_NULL_VALUE;
RPC_OBRPC_LOG(WARN, "req is NULL", K(ret));
} else {
const ObRpcPacket *rpc_pkt = &reinterpret_cast<const ObRpcPacket&>(req_->get_packet());
// TODO: fufeng, make force_destroy_second as a configure item
// static const int64_t RESPONSE_RESERVED_US = 20 * 1000 * 1000;
// int64_t rts = static_cast<int64_t>(req_->get_request()->start_time) * 1000 * 1000;
// todo(fufeng): get 'force destroy second' from eio?
// if (rts > 0 && eio_->force_destroy_second > 0
// && ::oceanbase::common::ObTimeUtility::current_time() - rts + RESPONSE_RESERVED_US > eio_->force_destroy_second * 1000000) {
// _OB_LOG(ERROR, "pkt process too long time: pkt_receive_ts=%ld, pkt_code=%d", rts, pcode);
// }
//copy packet into req buffer
ObRpcPacketCode pcode = rpc_pkt->get_pcode();
if (OB_SUCC(ret)) {
ObRpcPacket *packet = response_pkt;
packet->set_pcode(pcode);
packet->set_chid(rpc_pkt->get_chid());
packet->set_session_id(0); // not stream
packet->set_trace_id(common::ObCurTraceId::get());
packet->set_resp();
packet->set_request_arrival_time(req_->get_request_arrival_time());
packet->set_arrival_push_diff(req_->get_arrival_push_diff());
packet->set_push_pop_diff(req_->get_push_pop_diff());
packet->set_pop_process_start_diff(req_->get_pop_process_start_diff());
packet->set_process_start_end_diff(req_->get_process_start_end_diff());
packet->set_process_end_response_diff(req_->get_process_end_response_diff());
if (bad_routing) {
packet->set_bad_routing();
}
packet->calc_checksum();
req_->get_request()->opacket = packet;
}
//just set request retcode, wakeup in ObSingleServer::handlePacketQueue()
req_->set_request_rtcode(EASY_OK);
obmysql::ObMySQLRequestUtils::wakeup_request(req_);
}
return ret;
}
template <class T>
int ObRpcAsyncResponse<T>::response(const int retcode)
{
int ret = common::OB_SUCCESS;
if (OB_ISNULL(req_)) {
ret = common::OB_INVALID_ARGUMENT;
RPC_OBRPC_LOG(WARN, "invalid req, maybe stream rpc timeout", K(ret), K(retcode),
KP_(req));
} else {
obrpc::ObRpcResultCode rcode;
rcode.rcode_ = retcode;
// add warning buffer into result code buffer if rpc fails.
common::ObWarningBuffer *wb = common::ob_get_tsi_warning_buffer();
if (wb) {
if (retcode != common::OB_SUCCESS) {
(void)snprintf(rcode.msg_, common::OB_MAX_ERROR_MSG_LEN, "%s", wb->get_err_msg());
}
//always add warning buffer
bool not_null = true;
for (uint32_t idx = 0; OB_SUCC(ret) && not_null && idx < wb->get_readable_warning_count(); idx++) {
const common::ObWarningBuffer::WarningItem *item = wb->get_warning_item(idx);
if (item != NULL) {
if (OB_FAIL(rcode.warnings_.push_back(*item))) {
RPC_OBRPC_LOG(WARN, "Failed to add warning", K(ret));
}
} else {
not_null = false;
}
}
}
int64_t content_size = common::serialization::encoded_length(result_) +
common::serialization::encoded_length(rcode);
char *buf = NULL;
if (OB_FAIL(ret)) {
//do nothing
} else if (content_size > common::OB_MAX_PACKET_LENGTH) {
ret = common::OB_RPC_PACKET_TOO_LONG;
RPC_OBRPC_LOG(WARN, "response content size bigger than OB_MAX_PACKET_LENGTH", K(ret));
} else {
//allocate memory from easy
//[ ObRpcPacket ... ObDatabuffer ... serilized content ...]
int64_t size = (content_size) + sizeof (common::ObDataBuffer) + sizeof(ObRpcPacket);
buf = static_cast<char*>(easy_alloc(size));
if (NULL == buf) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_OBRPC_LOG(WARN, "allocate rpc data buffer fail", K(ret), K(size));
} else {
using_buffer_ = new (buf + sizeof(ObRpcPacket)) common::ObDataBuffer();
if (!(using_buffer_->set_data(buf + sizeof(ObRpcPacket) + sizeof (*using_buffer_),
content_size))) {
ret = common::OB_INVALID_ARGUMENT;
RPC_OBRPC_LOG(WARN, "invalid parameters", K(ret));
}
}
}
if (OB_FAIL(ret)) {
//do nothing
} else if (OB_ISNULL(using_buffer_)) {
ret = common::OB_ERR_UNEXPECTED;
RPC_OBRPC_LOG(ERROR, "using_buffer_ is NULL", K(ret));
} else if (OB_FAIL(rcode.serialize(using_buffer_->get_data(),
using_buffer_->get_capacity(),
using_buffer_->get_position()))) {
RPC_OBRPC_LOG(WARN, "serialize result code fail", K(ret));
} else {
// also send result if process successfully.
if (common::OB_SUCCESS == retcode) {
if (OB_FAIL(serialize())) {
RPC_OBRPC_LOG(WARN, "serialize result fail", K(ret));
}
}
}
// routing check : whether client should refresh location cache and retry
// Now, following the same logic as in ../mysql/ob_query_retry_ctrl.cpp
bool bad_routing = false;
if (OB_SUCC(ret)) {
if (common::OB_SUCCESS != retcode && observer::is_bad_routing_err(retcode)) {
bad_routing = true;
RPC_OBRPC_LOG(WARN, "bad routing", K(retcode), K(bad_routing));
}
}
if (OB_SUCC(ret)) {
ObRpcPacket *pkt = new (buf) ObRpcPacket();
//Response rsp(sessid, is_stream_, is_last, pkt);
pkt->set_content(using_buffer_->get_data(), using_buffer_->get_position());
if (OB_FAIL(do_response(pkt, bad_routing))) {
RPC_OBRPC_LOG(WARN, "response data fail", K(ret));
}
}
using_buffer_ = NULL;
}
return ret;
}
} // end namespace obrpc
} // end namespace oceanbase
#endif /* _OB_RPC_ASYNC_RESPONSE_H */
此差异已折叠。
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_TABLE_API_ROW_ITERATOR_H_
#define OB_TABLE_API_ROW_ITERATOR_H_
#include "ob_table_service.h"
#include "common/row/ob_row_iterator.h"
namespace oceanbase {
namespace observer {
class ObTableApiRowIterator : public common::ObNewRowIterator {
public:
ObTableApiRowIterator();
virtual ~ObTableApiRowIterator();
int init(
storage::ObPartitionService &partition_service,
share::schema::ObMultiVersionSchemaService &schema_service,
ObTableServiceCtx &ctx);
virtual void reset();
OB_INLINE common::ObIArray<uint64_t> &get_column_ids() { return column_ids_; }
OB_INLINE common::ObIArray<common::ObString> &get_properties() { return properties_; }
OB_INLINE int64_t get_schema_version() { return schema_version_; }
OB_INLINE int64_t get_rowkey_column_cnt() { return rowkey_column_cnt_; }
protected:
int check_row(common::ObNewRow &row);
int entity_to_row(const table::ObITableEntity &entity, common::ObIArray<ObObj> &row);
int cons_all_columns(const table::ObITableEntity &entity, const bool ignore_missing_column = false);
int cons_missing_columns(const table::ObITableEntity &entity);
int fill_get_param(
ObTableServiceCtx &ctx,
const table::ObTableOperationType::Type op_type,
ObRowkey &rowkey,
storage::ObTableScanParam &scan_param,
share::schema::ObTableParam &table_param);
int fill_multi_get_param(
ObTableServiceCtx &ctx,
const ObTableBatchOperation &batch_operation,
storage::ObTableScanParam &scan_param,
share::schema::ObTableParam &table_param);
int fill_generate_columns(common::ObNewRow &row);
virtual bool is_read() const { return false; }
private:
int check_table_supported(const share::schema::ObTableSchema *table_schema);
int check_column_type(const sql::ObExprResType &column_type, common::ObObj &obj);
int fill_range(const ObRowkey &rowkey, ObIArray<common::ObNewRange> &ranges);
int fill_flag(ObTableServiceCtx &ctx, storage::ObTableScanParam &scan_param);
int add_column_type(const share::schema::ObColumnSchemaV2 &column_schema);
int cons_column_type(const share::schema::ObColumnSchemaV2 &column_schema, sql::ObExprResType &column_type);
protected:
static const int64_t COMMON_COLUMN_NUM = 16;
storage::ObPartitionService *part_service_;
share::schema::ObMultiVersionSchemaService *schema_service_;
ObTableServiceCtx *ctx_;
share::schema::ObSchemaGetterGuard schema_guard_;
const share::schema::ObTableSchema *table_schema_;
int64_t table_id_;
int64_t tenant_id_;
int64_t schema_version_;
int64_t rowkey_column_cnt_;
common::ObSEArray<common::ObString, COMMON_COLUMN_NUM> properties_;
common::ObSEArray<uint64_t, COMMON_COLUMN_NUM> column_ids_;
common::ObSEArray<sql::ObExprResType, COMMON_COLUMN_NUM> columns_type_;
common::ObSEArray<share::schema::ObColDesc, COMMON_COLUMN_NUM> column_descs_;
common::ObSEArray<common::ObObj, COMMON_COLUMN_NUM> row_objs_;
common::ObSEArray<common::ObObj, COMMON_COLUMN_NUM> missing_default_objs_;
common::ObSEArray<common::ObISqlExpression*, COMMON_COLUMN_NUM> generate_column_exprs_;
common::ObSEArray<int64_t, COMMON_COLUMN_NUM> generate_column_idxs_;
common::ObExprCtx expr_ctx_;
common::ObNewRow row_;
common::ObArenaAllocator stmt_allocator_;
common::ObArenaAllocator row_allocator_;
const table::ObITableEntity *entity_;
bool has_generate_column_;
bool is_inited_;
};
class ObTableApiInsertRowIterator : public ObTableApiRowIterator
{
public:
ObTableApiInsertRowIterator();
virtual ~ObTableApiInsertRowIterator();
int open(const ObTableOperation &table_operation);
virtual int get_next_row(common::ObNewRow *&row);
protected:
int cons_row(const table::ObITableEntity &entity, common::ObNewRow *&row);
virtual bool is_read() const override { return false; }
};
class ObTableApiMultiInsertRowIterator : public ObTableApiInsertRowIterator
{
public:
ObTableApiMultiInsertRowIterator();
virtual ~ObTableApiMultiInsertRowIterator();
virtual void reset();
int open(const ObTableBatchOperation &table_operation);
virtual int get_next_row(common::ObNewRow *&row);
OB_INLINE void continue_iter() { is_iter_pause_ = false; }
private:
const ObTableBatchOperation *batch_operation_;
int64_t row_idx_;
int64_t batch_cnt_;
bool is_iter_pause_;
};
class ObTableApiUpdateRowIterator : public ObTableApiRowIterator
{
public:
ObTableApiUpdateRowIterator();
virtual ~ObTableApiUpdateRowIterator();
virtual void reset();
int open(const ObTableOperation &table_operation,
const ObRowkey &rowkey, bool need_update_rowkey = false);
virtual int get_next_row(common::ObNewRow *&row);
OB_INLINE common::ObIArray<uint64_t> &get_update_column_ids() { return update_column_ids_; }
OB_INLINE common::ObNewRow *get_cur_new_row() { return new_row_; }
protected:
int cons_update_columns(bool need_update_rowkey);
int cons_new_row(const ObTableOperation &table_operation, common::ObNewRow *&row);
virtual bool is_read() const override { return false; }
private:
int obj_increment(
const common::ObObj &delta,
const common::ObObj &src,
const sql::ObExprResType target_type,
common::ObObj &target);
int obj_append(
const common::ObObj &delta,
const common::ObObj &src,
const sql::ObExprResType target_type,
common::ObObj &target);
int int_add_int_with_check(
int64_t old_int,
int64_t delta_int,
common::ObObjType result_type,
common::ObObj &result);
int uint_add_int_with_check(
uint64_t old_uint,
int64_t delta_int,
common::ObObjType result_type,
common::ObObj &result);
protected:
storage::ObTableScanParam scan_param_;
share::schema::ObTableParam table_param_;
common::ObSEArray<uint64_t, COMMON_COLUMN_NUM> update_column_ids_;
common::ObNewRowIterator *scan_iter_;
common::ObNewRow *old_row_;
common::ObNewRow *new_row_;
int64_t row_idx_;
bool need_update_rowkey_;
private:
const ObTableOperation *table_operation_;
};
class ObTableApiMultiUpdateRowIterator : public ObTableApiUpdateRowIterator
{
public:
ObTableApiMultiUpdateRowIterator();
virtual ~ObTableApiMultiUpdateRowIterator();
virtual void reset();
int open(const ObTableBatchOperation &batch_operation);
virtual int get_next_row(common::ObNewRow *&row);
OB_INLINE void continue_iter() { is_iter_pause_ = false; }
OB_INLINE int64_t get_cur_update_idx() { return cur_update_idx_; }
OB_INLINE bool has_finished() { return batch_idx_ >= batch_cnt_; }
private:
const ObTableBatchOperation *batch_operation_;
int64_t batch_cnt_;
int64_t batch_idx_;
int64_t cur_update_idx_;
bool is_iter_pause_;
};
class ObTableApiDeleteRowIterator : public ObTableApiRowIterator
{
public:
ObTableApiDeleteRowIterator();
virtual ~ObTableApiDeleteRowIterator();
virtual void reset();
int open(const ObTableOperation &table_operation);
virtual int get_next_row(common::ObNewRow *&row);
OB_INLINE common::ObIArray<uint64_t> &get_delete_column_ids() { return column_ids_; }
protected:
virtual bool is_read() const override { return false; }
protected:
storage::ObTableScanParam scan_param_;
share::schema::ObTableParam table_param_;
common::ObNewRowIterator *scan_iter_;
};
class ObTableApiMultiDeleteRowIterator : public ObTableApiDeleteRowIterator
{
public:
ObTableApiMultiDeleteRowIterator();
virtual ~ObTableApiMultiDeleteRowIterator();
virtual void reset();
int open(const ObTableBatchOperation &table_operation);
virtual int get_next_row(common::ObNewRow *&row);
OB_INLINE void continue_iter() { is_iter_pause_ = false; }
OB_INLINE int64_t get_cur_delete_idx() { return cur_delete_idx_; }
OB_INLINE bool has_finished() { return batch_idx_ >= batch_cnt_; }
private:
const ObTableBatchOperation *batch_operation_;
int64_t batch_cnt_;
int64_t batch_idx_;
int64_t cur_delete_idx_;
bool is_iter_pause_;
};
class ObTableApiGetRowIterator : public ObTableApiRowIterator
{
public:
ObTableApiGetRowIterator();
virtual ~ObTableApiGetRowIterator();
virtual void reset();
int open(const ObTableOperation &table_operation);
virtual int get_next_row(common::ObNewRow *&row);
protected:
virtual bool is_read() const override { return true; }
protected:
storage::ObTableScanParam scan_param_;
share::schema::ObTableParam table_param_;
common::ObNewRowIterator *scan_iter_;
};
class ObTableApiMultiGetRowIterator : public ObTableApiGetRowIterator
{
public:
ObTableApiMultiGetRowIterator();
virtual ~ObTableApiMultiGetRowIterator();
int open(const ObTableBatchOperation &table_operation);
};
}
}
#endif /* OB_TABLE_API_ROW_ITERATOR_H_ */
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "ob_table_batch_execute_processor.h"
#include "ob_table_rpc_processor_util.h"
#include "observer/ob_service.h"
#include "storage/ob_partition_service.h"
#include "ob_table_end_trans_cb.h"
#include "sql/optimizer/ob_table_location.h" // ObTableLocation
#include "lib/stat/ob_diagnose_info.h"
#include "lib/stat/ob_session_stat.h"
using namespace oceanbase::observer;
using namespace oceanbase::common;
using namespace oceanbase::table;
using namespace oceanbase::share;
using namespace oceanbase::sql;
ObTableBatchExecuteP::ObTableBatchExecuteP(const ObGlobalContext &gctx)
:ObTableRpcProcessor(gctx),
allocator_(ObModIds::TABLE_PROC),
table_service_ctx_(allocator_),
need_rollback_trans_(false)
{
}
int ObTableBatchExecuteP::deserialize()
{
// we should set entity factory before deserialize
arg_.batch_operation_.set_entity_factory(&default_entity_factory_);
result_.set_entity_factory(&default_entity_factory_);
int ret = ParentType::deserialize();
return ret;
}
int ObTableBatchExecuteP::check_arg()
{
int ret = OB_SUCCESS;
if (arg_.consistency_level_ != ObTableConsistencyLevel::STRONG) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("some options not supported yet", K(ret),
"consistency_level", arg_.consistency_level_);
}
return ret;
}
int ObTableBatchExecuteP::check_arg2() const
{
int ret = OB_SUCCESS;
if (arg_.returning_rowkey_
|| arg_.returning_affected_entity_) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("some options not supported yet", K(ret),
"returning_rowkey", arg_.returning_rowkey_,
"returning_affected_entity", arg_.returning_affected_entity_);
}
return ret;
}
OB_INLINE bool is_errno_need_retry(int ret)
{
return OB_TRY_LOCK_ROW_CONFLICT == ret
|| OB_TRANSACTION_SET_VIOLATION == ret
|| OB_SCHEMA_ERROR == ret;
}
void ObTableBatchExecuteP::audit_on_finish()
{
audit_record_.consistency_level_ = ObTableConsistencyLevel::STRONG == arg_.consistency_level_ ?
ObConsistencyLevel::STRONG : ObConsistencyLevel::WEAK;
audit_record_.return_rows_ = arg_.returning_affected_rows_ ? result_.count() : 0;
audit_record_.table_scan_ = false;
audit_record_.affected_rows_ = result_.count();
audit_record_.try_cnt_ = retry_count_ + 1;
}
uint64_t ObTableBatchExecuteP::get_request_checksum()
{
uint64_t checksum = 0;
checksum = ob_crc64(checksum, arg_.table_name_.ptr(), arg_.table_name_.length());
const uint64_t op_checksum = arg_.batch_operation_.get_checksum();
checksum = ob_crc64(checksum, &op_checksum, sizeof(op_checksum));
checksum = ob_crc64(checksum, &arg_.consistency_level_, sizeof(arg_.consistency_level_));
checksum = ob_crc64(checksum, &arg_.returning_rowkey_, sizeof(arg_.returning_rowkey_));
checksum = ob_crc64(checksum, &arg_.returning_affected_entity_, sizeof(arg_.returning_affected_entity_));
checksum = ob_crc64(checksum, &arg_.returning_affected_rows_, sizeof(arg_.returning_affected_rows_));
checksum = ob_crc64(checksum, &arg_.binlog_row_image_type_, sizeof(arg_.binlog_row_image_type_));
return checksum;
}
int ObTableBatchExecuteP::response(const int retcode)
{
int ret = OB_SUCCESS;
if (!need_retry_in_queue_ && !did_async_end_trans()) {
ret = ObRpcProcessor::response(retcode);
}
return ret;
}
void ObTableBatchExecuteP::reset_ctx()
{
table_service_ctx_.reset_dml();
need_retry_in_queue_ = false;
need_rollback_trans_ = false;
result_.reset();
ObTableApiProcessorBase::reset_ctx();
}
int ObTableBatchExecuteP::try_process()
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
if (batch_operation.count() <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("no operation in the batch", K(ret));
} else {
if (batch_operation.is_readonly()) {
if (batch_operation.is_same_properties_names()) {
stat_event_type_ = ObTableProccessType::TABLE_API_MULTI_GET;
ret = multi_get();
} else {
stat_event_type_ = ObTableProccessType::TABLE_API_BATCH_RETRIVE;
ret = batch_execute(true);
}
} else if (batch_operation.is_same_type()) {
switch(batch_operation.at(0).type()) {
case ObTableOperationType::INSERT:
stat_event_type_ = ObTableProccessType::TABLE_API_MULTI_INSERT;
ret = multi_insert();
break;
case ObTableOperationType::DEL:
stat_event_type_ = ObTableProccessType::TABLE_API_MULTI_DELETE;
ret = multi_delete();
break;
case ObTableOperationType::UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_MULTI_UPDATE;
ret = multi_update();
break;
case ObTableOperationType::INSERT_OR_UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_MULTI_INSERT_OR_UPDATE;
ret = multi_insert_or_update();
break;
case ObTableOperationType::REPLACE:
stat_event_type_ = ObTableProccessType::TABLE_API_MULTI_REPLACE;
ret = multi_replace();
break;
case ObTableOperationType::APPEND:
stat_event_type_ = ObTableProccessType::TABLE_API_MULTI_APPEND;
ret = batch_execute(false);
break;
case ObTableOperationType::INCREMENT:
stat_event_type_ = ObTableProccessType::TABLE_API_MULTI_INCREMENT;
ret = batch_execute(false);
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected operation type", "type", batch_operation.at(0).type(), K(stat_event_type_));
break;
}
} else {
// complex batch hybrid operation
stat_event_type_ = ObTableProccessType::TABLE_API_BATCH_HYBRID;
ret = batch_execute(false);
}
}
// record events
audit_row_count_ = arg_.batch_operation_.count();
#ifndef NDEBUG
// debug mode
LOG_INFO("[TABLE] execute batch operation", K(ret), K_(arg), K_(result), "timeout", rpc_pkt_->get_timeout(), K_(retry_count));
#else
// release mode
LOG_TRACE("[TABLE] execute batch operation", K(ret), K_(arg), K_(result), "timeout", rpc_pkt_->get_timeout(), K_(retry_count),
"receive_ts", get_receive_timestamp());
#endif
return ret;
}
ObTableAPITransCb *ObTableBatchExecuteP::new_callback(rpc::ObRequest *req)
{
ObTableBatchExecuteEndTransCb *cb = OB_NEW(ObTableBatchExecuteEndTransCb, ObModIds::TABLE_PROC, req, arg_.batch_operation_.at(0).type());
if (NULL != cb) {
// @todo optimize to avoid this copy
int ret = OB_SUCCESS;
if (OB_FAIL(cb->assign_batch_execute_result(result_))) {
LOG_WARN("failed to assign result", K(ret));
cb->~ObTableBatchExecuteEndTransCb();
cb = NULL;
} else {
LOG_DEBUG("[yzfdebug] copy result", K_(result));
}
}
return cb;
}
int ObTableBatchExecuteP::get_rowkeys(ObIArray<ObRowkey> &rowkeys)
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
const int64_t N = batch_operation.count();
for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i)
{
const ObTableOperation &table_op = batch_operation.at(i);
ObRowkey rowkey = const_cast<ObITableEntity&>(table_op.entity()).get_rowkey();
if (OB_FAIL(rowkeys.push_back(rowkey))) {
LOG_WARN("failed to push back", K(ret));
}
} // end for
return ret;
}
int ObTableBatchExecuteP::get_partition_ids(uint64_t table_id, ObIArray<int64_t> &part_ids)
{
int ret = OB_SUCCESS;
uint64_t partition_id = arg_.partition_id_;
if (OB_INVALID_ID == partition_id) {
ObSEArray<sql::RowkeyArray, 3> rowkeys_per_part;
ObSEArray<ObRowkey, 3> rowkeys;
if (OB_FAIL(get_rowkeys(rowkeys))) {
LOG_WARN("failed to get rowkeys", K(ret));
} else if (OB_FAIL(get_partition_by_rowkey(table_id, rowkeys, part_ids, rowkeys_per_part))) {
LOG_WARN("failed to get partition", K(ret), K(rowkeys));
}
} else {
if (OB_FAIL(part_ids.push_back(partition_id))) {
LOG_WARN("failed to push back", K(ret));
}
}
return ret;
}
int ObTableBatchExecuteP::multi_insert_or_update()
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
const bool is_readonly = false;
uint64_t &table_id = table_service_ctx_.param_table_id();
table_service_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("should have one partition", K(ret), K(part_ids));
} else if (FALSE_IT(table_service_ctx_.param_partition_id() = part_ids.at(0))) {
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_INSERT, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->multi_insert_or_update(table_service_ctx_, batch_operation, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to insert_or_update", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans");
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableBatchExecuteP::multi_get()
{
int ret = OB_SUCCESS;
need_rollback_trans_ = false;
uint64_t &table_id = table_service_ctx_.param_table_id();
table_service_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
ObSEArray<int64_t, 1> part_ids;
const bool is_readonly = true;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("should have one partition", K(ret), K(part_ids));
} else if (FALSE_IT(table_service_ctx_.param_partition_id() = part_ids.at(0))) {
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_SELECT, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start readonly transaction", K(ret));
} else if (OB_FAIL(table_service_->multi_get(table_service_ctx_, arg_.batch_operation_, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to execute get", K(ret), K(table_id));
}
} else {}
need_rollback_trans_ = (OB_SUCCESS != ret);
int tmp_ret = ret;
if (OB_FAIL(end_trans(need_rollback_trans_, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans", K(ret), "rollback", need_rollback_trans_);
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableBatchExecuteP::multi_delete()
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
const bool is_readonly = false;
uint64_t &table_id = table_service_ctx_.param_table_id();
table_service_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("should have one partition", K(ret), K(part_ids));
} else if (FALSE_IT(table_service_ctx_.param_partition_id() = part_ids.at(0))) {
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_DELETE, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->multi_delete(table_service_ctx_, batch_operation, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to multi_delete", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans");
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableBatchExecuteP::multi_insert()
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
const bool is_readonly = false;
uint64_t &table_id = table_service_ctx_.param_table_id();
table_service_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("should have one partition", K(ret), K(part_ids));
} else if (FALSE_IT(table_service_ctx_.param_partition_id() = part_ids.at(0))) {
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_INSERT, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->multi_insert(table_service_ctx_, batch_operation, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to multi_insert", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans");
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableBatchExecuteP::multi_replace()
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
const bool is_readonly = false;
uint64_t &table_id = table_service_ctx_.param_table_id();
table_service_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("should have one partition", K(ret), K(part_ids));
} else if (FALSE_IT(table_service_ctx_.param_partition_id() = part_ids.at(0))) {
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_REPLACE, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->multi_replace(table_service_ctx_, batch_operation, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to multi_replace", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans");
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableBatchExecuteP::multi_update()
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
const bool is_readonly = false;
uint64_t &table_id = table_service_ctx_.param_table_id();
table_service_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_/*important*/);
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("should have one partition", K(ret), K(part_ids));
} else if (FALSE_IT(table_service_ctx_.param_partition_id() = part_ids.at(0))) {
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_UPDATE, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->multi_update(table_service_ctx_, batch_operation, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to multi_update", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans");
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableBatchExecuteP::batch_execute(bool is_readonly)
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
uint64_t &table_id = table_service_ctx_.param_table_id();
table_service_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_,
arg_.returning_affected_entity_,
arg_.returning_rowkey_);
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("should have one partition", K(ret), K(part_ids));
} else if (FALSE_IT(table_service_ctx_.param_partition_id() = part_ids.at(0))) {
} else if (OB_FAIL(start_trans(is_readonly, (is_readonly ? sql::stmt::T_SELECT : sql::stmt::T_UPDATE),
table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->batch_execute(table_service_ctx_, batch_operation, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to execute batch", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans");
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_TABLE_BATCH_EXECUTE_PROCESSOR_H
#define _OB_TABLE_BATCH_EXECUTE_PROCESSOR_H 1
#include "rpc/obrpc/ob_rpc_proxy.h"
#include "rpc/obrpc/ob_rpc_processor.h"
#include "share/table/ob_table_rpc_proxy.h"
#include "ob_table_rpc_processor.h"
#include "ob_table_service.h"
namespace oceanbase
{
namespace observer
{
/// @see RPC_S(PR5 batch_execute, obrpc::OB_TABLE_API_BATCH_EXECUTE, (table::ObTableBatchOperationRequest), table::ObTableBatchOperationResult);
class ObTableBatchExecuteP: public ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_BATCH_EXECUTE> >
{
typedef ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_BATCH_EXECUTE> > ParentType;
public:
explicit ObTableBatchExecuteP(const ObGlobalContext &gctx);
virtual ~ObTableBatchExecuteP() = default;
virtual int deserialize() override;
virtual int response(const int retcode) override;
protected:
virtual int check_arg() override;
virtual int try_process() override;
virtual void reset_ctx() override;
table::ObTableAPITransCb *new_callback(rpc::ObRequest *req) override;
virtual void audit_on_finish() override;
virtual uint64_t get_request_checksum() override;
private:
int check_arg2() const;
int get_rowkeys(common::ObIArray<common::ObRowkey> &rowkeys);
int get_partition_ids(uint64_t table_id, common::ObIArray<int64_t> &part_ids);
int multi_insert_or_update();
int multi_get();
int multi_delete();
int multi_insert();
int multi_replace();
int multi_update();
int batch_execute(bool is_readonly);
private:
static const int64_t COMMON_COLUMN_NUM = 16;
table::ObTableEntityFactory<table::ObTableEntity> default_entity_factory_;
table::ObTableEntity result_entity_;
common::ObArenaAllocator allocator_;
ObTableServiceGetCtx table_service_ctx_;
bool need_rollback_trans_;
};
} // end namespace observer
} // end namespace oceanbase
#endif /* _OB_TABLE_BATCH_EXECUTE_PROCESSOR_H */
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "ob_table_end_trans_cb.h"
using namespace oceanbase::common;
using namespace oceanbase::table;
ObTableAPITransCb::ObTableAPITransCb()
:ref_count_(2)
{}
ObTableAPITransCb::~ObTableAPITransCb()
{
LOG_DEBUG("[yzfdebug] ObTableAPITransCb destruct", K_(ref_count));
}
void ObTableAPITransCb::destroy_cb_if_no_ref()
{
int32_t new_ref = ATOMIC_SAF(&ref_count_, 1);
if (0 >= new_ref) {
// @caution !!!
this->~ObTableAPITransCb();
ob_free(this);
}
}
////////////////////////////////////////////////////////////////
void ObTableExecuteEndTransCb::callback(int cb_param)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!has_set_need_rollback_)) {
LOG_ERROR("is_need_rollback_ has not been set",
K(has_set_need_rollback_),
K(is_need_rollback_));
} else if (OB_UNLIKELY(ObExclusiveEndTransCallback::END_TRANS_TYPE_INVALID == end_trans_type_)) {
LOG_ERROR("end trans type is invalid", K(cb_param), K(end_trans_type_));
} else if (!is_txs_end_trans_called()) {
//has NOT invoke the end trans interface
LOG_WARN("fail before trans service end trans, disconnct", K(cb_param));
if (OB_UNLIKELY(OB_SUCCESS == cb_param)) {
LOG_ERROR("callback before trans service end trans, but ret is OB_SUCCESS, it is BUG!!!",
K(cb_param), K_(end_trans_type));
}
} else {
//has invoke the end trans interface
}
this->handin();
CHECK_BALANCE("[table async callback]");
if (cb_param != OB_SUCCESS) {
// commit failed
result_.set_errno(cb_param);
result_.set_affected_rows(0);
result_entity_.reset();
}
if (OB_FAIL(response_sender_.response(cb_param))) {
LOG_WARN("failed to send response", K(ret), K(cb_param));
} else {
LOG_DEBUG("yzfdebug async send execute response", K(cb_param));
}
this->destroy_cb_if_no_ref();
}
void ObTableExecuteEndTransCb::callback(int cb_param, const transaction::ObTransID &trans_id)
{
UNUSED(trans_id);
this->callback(cb_param);
}
// when the operation is append/increment and returning_affected_entity is true, we will return the
// new values after append/increment to the client, so we need to deep copy the entity_result here.
int ObTableExecuteEndTransCb::assign_execute_result(ObTableOperationResult &result)
{
int ret = OB_SUCCESS;
const ObITableEntity *src_entity = NULL;
if (OB_FAIL(result.get_entity(src_entity))) {
LOG_WARN("failed to get entity", K(ret));
} else if (OB_FAIL(result_entity_.deep_copy(allocator_, *src_entity))) {
LOG_WARN("failed to copy entity", K(ret));
} else {
result_ = result;
result_.set_entity(result_entity_);
}
return ret;
}
////////////////////////////////////////////////////////////////
void ObTableBatchExecuteEndTransCb::callback(int cb_param)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!has_set_need_rollback_)) {
LOG_ERROR("is_need_rollback_ has not been set",
K(has_set_need_rollback_),
K(is_need_rollback_));
} else if (OB_UNLIKELY(ObExclusiveEndTransCallback::END_TRANS_TYPE_INVALID == end_trans_type_)) {
LOG_ERROR("end trans type is invalid", K(cb_param), K(end_trans_type_));
} else if (!is_txs_end_trans_called()) {
//has NOT invoked the end trans interface
LOG_WARN("fail before trans service end trans, disconnct", K(cb_param));
if (OB_UNLIKELY(OB_SUCCESS == cb_param)) {
LOG_ERROR("callback before trans service end trans, but ret is OB_SUCCESS, it is BUG!!!",
K(cb_param), K_(end_trans_type));
}
} else {
//has invoked the end trans interface
}
this->handin();
CHECK_BALANCE("[table batch async callback]");
if (cb_param != OB_SUCCESS) {
result_.reset();
}
if (0 >= result_.count()) {
// same result for all
ObTableOperationResult single_op_result;
single_op_result.set_entity(result_entity_);
single_op_result.set_errno(cb_param);
single_op_result.set_type(table_operation_type_);
if (OB_FAIL(result_.push_back(single_op_result))) {
LOG_WARN("failed to add result", K(ret)); // @todo reset the connection
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(response_sender_.response(cb_param))) {
LOG_WARN("failed to send response", K(ret), K(cb_param));
} else {
LOG_DEBUG("yzfdebug async send batch_execute response", K(cb_param));
}
}
this->destroy_cb_if_no_ref();
}
void ObTableBatchExecuteEndTransCb::callback(int cb_param, const transaction::ObTransID &trans_id)
{
UNUSED(trans_id);
this->callback(cb_param);
}
int ObTableBatchExecuteEndTransCb::assign_batch_execute_result(ObTableBatchOperationResult &result)
{
int ret = OB_SUCCESS;
result_.reset();
ObTableOperationResult dest_result;
int64_t N = result.count();
for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i)
{
const ObTableOperationResult &src_result = result.at(i);
if (OB_FAIL(dest_result.deep_copy(allocator_, entity_factory_, src_result))) {
LOG_WARN("failed to deep copy result", K(ret));
} else if (OB_FAIL(result_.push_back(dest_result))) {
LOG_WARN("failed to push back", K(ret));
}
} // end for
return ret;
}
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_TABLE_END_TRANS_CB_H
#define _OB_TABLE_END_TRANS_CB_H 1
#include "ob_rpc_async_response.h"
#include "sql/ob_end_trans_callback.h"
#include "share/table/ob_table.h"
namespace oceanbase
{
namespace table
{
class ObTableAPITransCb: public sql::ObExclusiveEndTransCallback
{
public:
ObTableAPITransCb();
virtual ~ObTableAPITransCb();
void destroy_cb_if_no_ref();
private:
int32_t ref_count_;
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ObTableAPITransCb);
};
class ObTableExecuteEndTransCb: public ObTableAPITransCb
{
public:
ObTableExecuteEndTransCb(rpc::ObRequest *req, ObTableOperationType::Type table_operation_type)
:response_sender_(req, result_)
{
result_.set_type(table_operation_type);
}
virtual ~ObTableExecuteEndTransCb() = default;
virtual void callback(int cb_param) override;
virtual void callback(int cb_param, const transaction::ObTransID &trans_id) override;
virtual const char *get_type() const override { return "ObTableEndTransCallback"; }
virtual sql::ObEndTransCallbackType get_callback_type() const override { return sql::ASYNC_CALLBACK_TYPE; }
int assign_execute_result(ObTableOperationResult &result);
private:
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ObTableExecuteEndTransCb);
private:
ObTableEntity result_entity_;
common::ObArenaAllocator allocator_;
ObTableOperationResult result_;
obrpc::ObRpcAsyncResponse<ObTableOperationResult> response_sender_;
};
class ObTableBatchExecuteEndTransCb: public ObTableAPITransCb
{
public:
ObTableBatchExecuteEndTransCb(rpc::ObRequest *req, ObTableOperationType::Type table_operation_type)
:response_sender_(req, result_),
table_operation_type_(table_operation_type)
{
}
virtual ~ObTableBatchExecuteEndTransCb() = default;
virtual void callback(int cb_param) override;
virtual void callback(int cb_param, const transaction::ObTransID &trans_id) override;
virtual const char *get_type() const override { return "ObTableBatchEndTransCallback"; }
virtual sql::ObEndTransCallbackType get_callback_type() const override { return sql::ASYNC_CALLBACK_TYPE; }
int assign_batch_execute_result(ObTableBatchOperationResult &result);
private:
// disallow copy
DISALLOW_COPY_AND_ASSIGN(ObTableBatchExecuteEndTransCb);
private:
ObTableEntity result_entity_;
common::ObArenaAllocator allocator_;
table::ObTableEntityFactory<table::ObTableEntity> entity_factory_;
ObTableBatchOperationResult result_;
obrpc::ObRpcAsyncResponse<ObTableBatchOperationResult> response_sender_;
ObTableOperationType::Type table_operation_type_;
};
} // end namespace table
} // end namespace oceanbase
#endif /* _OB_TABLE_END_TRANS_CB_H */
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "ob_table_execute_processor.h"
#include "ob_table_rpc_processor_util.h"
#include "observer/ob_service.h"
#include "storage/ob_partition_service.h"
#include "ob_table_end_trans_cb.h"
#include "sql/optimizer/ob_table_location.h" // ObTableLocation
#include "lib/stat/ob_session_stat.h"
using namespace oceanbase::observer;
using namespace oceanbase::common;
using namespace oceanbase::table;
using namespace oceanbase::share;
using namespace oceanbase::sql;
int ObTableRpcProcessorUtil::negate_htable_timestamp(table::ObITableEntity &entity)
{
int ret = OB_SUCCESS;
// negative the value of T
ObObj T_val;
int64_t val = 0;
if (3 == entity.get_rowkey_size()) {
if (OB_FAIL(entity.get_rowkey_value(2, T_val))) {
LOG_WARN("failed to get T from entity", K(ret), K(entity));
} else if (OB_FAIL(T_val.get_int(val))) {
LOG_WARN("invalid obj type for T", K(ret), K(T_val));
} else {
T_val.set_int(-val);
if (OB_FAIL(entity.set_rowkey_value(2, T_val))) {
LOG_WARN("failed to negate T value", K(ret));
} else {
LOG_DEBUG("[yzfdebug] nenative T value", K(ret), K(T_val));
}
}
}
return ret;
}
////////////////////////////////////////////////////////////////
ObTableApiExecuteP::ObTableApiExecuteP(const ObGlobalContext &gctx)
:ObTableRpcProcessor(gctx),
allocator_(ObModIds::TABLE_PROC),
get_ctx_(allocator_),
need_rollback_trans_(false),
query_timeout_ts_(0)
{
}
int ObTableApiExecuteP::deserialize()
{
// we should set entity before deserialize
arg_.table_operation_.set_entity(request_entity_);
result_.set_entity(result_entity_);
int ret = ParentType::deserialize();
return ret;
}
int ObTableApiExecuteP::check_arg()
{
int ret = OB_SUCCESS;
if (arg_.consistency_level_ != ObTableConsistencyLevel::STRONG) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("some options not supported yet", K(ret),
"consistency_level", arg_.consistency_level_,
"operation_type", arg_.table_operation_.type());
}
return ret;
}
int ObTableApiExecuteP::check_arg2() const
{
int ret = OB_SUCCESS;
if (arg_.returning_rowkey_
|| arg_.returning_affected_entity_) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("some options not supported yet", K(ret),
"returning_rowkey", arg_.returning_rowkey_,
"returning_affected_entity", arg_.returning_affected_entity_,
"operation_type", arg_.table_operation_.type());
}
return ret;
}
int ObTableApiExecuteP::process()
{
int ret = OB_SUCCESS;
ret = ParentType::process();
int tmp_ret = revert_get_ctx();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("fail to revert get ctx", K(tmp_ret));
}
return ret;
}
int ObTableApiExecuteP::try_process()
{
int ret = OB_SUCCESS;
const ObTableOperation &table_operation = arg_.table_operation_;
switch (table_operation.type()) {
case ObTableOperationType::INSERT:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT;
ret = process_insert();
break;
case ObTableOperationType::GET:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_GET;
ret = process_get();
break;
case ObTableOperationType::DEL:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_DELETE;
ret = process_del();
break;
case ObTableOperationType::UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_UPDATE;
ret = process_update();
break;
case ObTableOperationType::INSERT_OR_UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT_OR_UPDATE;
ret = process_insert_or_update();
break;
case ObTableOperationType::REPLACE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_REPLACE;
ret = process_replace();
break;
case ObTableOperationType::INCREMENT:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INCREMENT;
ret = process_increment();
break;
case ObTableOperationType::APPEND:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_APPEND;
// for both increment and append
ret = process_increment();
break;
default:
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table operation type", K(ret), K(table_operation));
break;
}
audit_row_count_ = 1;
#ifndef NDEBUG
// debug mode
LOG_INFO("[TABLE] execute operation", K(ret), K_(arg), K_(result), "timeout", rpc_pkt_->get_timeout(), K_(retry_count));
#else
// release mode
LOG_TRACE("[TABLE] execute operation", K(ret), K_(arg), K_(result),
"timeout", rpc_pkt_->get_timeout(), "receive_ts", get_receive_timestamp(), K_(retry_count));
#endif
return ret;
}
int ObTableApiExecuteP::revert_get_ctx()
{
int ret = OB_SUCCESS;
if (ObTableOperationType::GET == arg_.table_operation_.type()) {
if (NULL != get_ctx_.scan_result_) {
part_service_->revert_scan_iter(get_ctx_.scan_result_);
get_ctx_.scan_result_ = NULL;
}
if (query_timeout_ts_ <= 0) {
// for robust purpose
query_timeout_ts_ = ObTimeUtility::current_time() + 1000000;
}
if (OB_FAIL(end_trans(need_rollback_trans_, req_, query_timeout_ts_))) {
LOG_WARN("failed to end trans", K(ret));
}
}
return ret;
}
void ObTableApiExecuteP::audit_on_finish()
{
audit_record_.consistency_level_ = ObTableConsistencyLevel::STRONG == arg_.consistency_level_ ?
ObConsistencyLevel::STRONG : ObConsistencyLevel::WEAK;
audit_record_.return_rows_ = arg_.returning_affected_rows_ ? 1 : 0;
audit_record_.table_scan_ = false;
audit_record_.affected_rows_ = result_.get_affected_rows();
audit_record_.try_cnt_ = retry_count_ + 1;
}
uint64_t ObTableApiExecuteP::get_request_checksum()
{
uint64_t checksum = 0;
checksum = ob_crc64(checksum, arg_.table_name_.ptr(), arg_.table_name_.length());
checksum = ob_crc64(checksum, &arg_.consistency_level_, sizeof(arg_.consistency_level_));
checksum = ob_crc64(checksum, &arg_.returning_rowkey_, sizeof(arg_.returning_rowkey_));
checksum = ob_crc64(checksum, &arg_.returning_affected_entity_, sizeof(arg_.returning_affected_entity_));
checksum = ob_crc64(checksum, &arg_.returning_affected_rows_, sizeof(arg_.returning_affected_rows_));
checksum = ob_crc64(checksum, &arg_.binlog_row_image_type_, sizeof(arg_.binlog_row_image_type_));
const uint64_t op_checksum = arg_.table_operation_.get_checksum();
checksum = ob_crc64(checksum, &op_checksum, sizeof(op_checksum));
return checksum;
}
int ObTableApiExecuteP::response(const int retcode)
{
int ret = OB_SUCCESS;
if (!need_retry_in_queue_ && !did_async_end_trans()) {
ret = ObRpcProcessor::response(retcode);
}
return ret;
}
void ObTableApiExecuteP::reset_ctx()
{
(void)revert_get_ctx();
get_ctx_.reset_dml();
ObTableApiProcessorBase::reset_ctx();
need_rollback_trans_ = false;
need_retry_in_queue_ = false;
}
int ObTableApiExecuteP::get_partition_id(uint64_t table_id, const ObRowkey &rowkey, uint64_t &partition_id)
{
int ret = OB_SUCCESS;
partition_id = arg_.partition_id_;
if (OB_INVALID_ID == partition_id) {
ObSEArray<ObRowkey, 1> rowkeys;
ObSEArray<int64_t, 1> part_ids;
ObSEArray<sql::RowkeyArray, 1> rowkeys_per_part;
if (OB_FAIL(rowkeys.push_back(rowkey))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(get_partition_by_rowkey(table_id, rowkeys, part_ids, rowkeys_per_part))) {
LOG_WARN("failed to get partition", K(ret), K(table_id), K(rowkeys));
} else if (1 != part_ids.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("should have one partition", K(ret));
} else {
partition_id = part_ids.at(0);
}
}
return ret;
}
////////////////////////////////////////////////////////////////
// get
int ObTableApiExecuteP::process_get()
{
int ret = OB_SUCCESS;
need_rollback_trans_ = false;
uint64_t &table_id = get_ctx_.param_table_id();
get_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
const bool is_readonly = true;
ObRowkey rowkey = const_cast<ObITableEntity&>(arg_.table_operation_.entity()).get_rowkey();
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret), K(table_id));
} else if (OB_FAIL(get_partition_id(table_id, rowkey, get_ctx_.param_partition_id()))) {
LOG_WARN("failed to get partition id", K(ret));
} else if (OB_FAIL(part_ids.push_back(get_ctx_.param_partition_id()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_SELECT, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start readonly transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_get(get_ctx_, arg_.table_operation_, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to execute get", K(ret), K(table_id));
}
} else {}
// end trans in after_process()
need_rollback_trans_ = (OB_SUCCESS != ret);
return ret;
}
////////////////////////////////////////////////////////////////
// insert_or_update
ObTableAPITransCb *ObTableApiExecuteP::new_callback(rpc::ObRequest *req)
{
ObTableExecuteEndTransCb *cb = OB_NEW(ObTableExecuteEndTransCb, ObModIds::TABLE_PROC, req, arg_.table_operation_.type());
if (NULL != cb) {
// @todo optimize to avoid this copy
int ret = OB_SUCCESS;
if (OB_FAIL(cb->assign_execute_result(result_))) {
LOG_WARN("failed to assign result", K(ret));
cb->~ObTableExecuteEndTransCb();
cb = NULL;
} else {
LOG_DEBUG("yzfdebug copy result", K_(result));
}
}
return cb;
}
int ObTableApiExecuteP::process_insert_or_update()
{
int ret = OB_SUCCESS;
uint64_t &table_id = get_ctx_.param_table_id();
get_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
const bool is_readonly = false;
ObRowkey rowkey = const_cast<ObITableEntity&>(arg_.table_operation_.entity()).get_rowkey();
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret), K(table_id));
} else if (OB_FAIL(get_partition_id(table_id, rowkey, get_ctx_.param_partition_id()))) {
LOG_WARN("failed to get partition id", K(ret));
} else if (OB_FAIL(part_ids.push_back(get_ctx_.param_partition_id()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_INSERT, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_insert_or_update(get_ctx_, arg_.table_operation_, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to insert_or_update", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans", K(ret));
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableApiExecuteP::process_del()
{
int ret = OB_SUCCESS;
uint64_t &table_id = get_ctx_.param_table_id();
get_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
const bool is_readonly = false;
ObRowkey rowkey = const_cast<ObITableEntity&>(arg_.table_operation_.entity()).get_rowkey();
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret), K(table_id));
} else if (OB_FAIL(get_partition_id(table_id, rowkey, get_ctx_.param_partition_id()))) {
LOG_WARN("failed to get partition id", K(ret));
} else if (OB_FAIL(part_ids.push_back(get_ctx_.param_partition_id()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_DELETE, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_delete(get_ctx_, arg_.table_operation_, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to delete", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans", K(ret));
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableApiExecuteP::process_replace()
{
int ret = OB_SUCCESS;
uint64_t &table_id = get_ctx_.param_table_id();
get_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
const bool is_readonly = false;
ObRowkey rowkey = const_cast<ObITableEntity&>(arg_.table_operation_.entity()).get_rowkey();
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret), K(table_id));
} else if (OB_FAIL(get_partition_id(table_id, rowkey, get_ctx_.param_partition_id()))) {
LOG_WARN("failed to get partition id", K(ret));
} else if (OB_FAIL(part_ids.push_back(get_ctx_.param_partition_id()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_REPLACE, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_replace(get_ctx_, arg_.table_operation_, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to replace", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans", K(ret));
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableApiExecuteP::process_insert()
{
int ret = OB_SUCCESS;
ObNewRowIterator *duplicate_row_iter = nullptr;
uint64_t &table_id = get_ctx_.param_table_id();
get_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
const bool is_readonly = false;
ObRowkey rowkey = const_cast<ObITableEntity&>(arg_.table_operation_.entity()).get_rowkey();
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret), K(table_id));
} else if (OB_FAIL(get_partition_id(table_id, rowkey, get_ctx_.param_partition_id()))) {
LOG_WARN("failed to get partition id", K(ret));
} else if (OB_FAIL(part_ids.push_back(get_ctx_.param_partition_id()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_INSERT, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_insert(get_ctx_,
arg_.table_operation_, result_, duplicate_row_iter))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to insert", K(ret), K(table_id));
}
}
int tmp_ret = ret;
const bool did_rollback = (OB_SUCCESS != ret || OB_SUCCESS != result_.get_errno());
if (OB_FAIL(end_trans(did_rollback, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans", K(ret));
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableApiExecuteP::process_update()
{
int ret = OB_SUCCESS;
uint64_t &table_id = get_ctx_.param_table_id();
get_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_);
const bool is_readonly = false;
ObRowkey rowkey = const_cast<ObITableEntity&>(arg_.table_operation_.entity()).get_rowkey();
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret), K(table_id));
} else if (OB_FAIL(get_partition_id(table_id, rowkey, get_ctx_.param_partition_id()))) {
LOG_WARN("failed to get partition id", K(ret));
} else if (OB_FAIL(part_ids.push_back(get_ctx_.param_partition_id()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_UPDATE, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_update(get_ctx_, arg_.table_operation_, nullptr, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to update", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans", K(ret));
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
int ObTableApiExecuteP::process_increment()
{
int ret = OB_SUCCESS;
uint64_t &table_id = get_ctx_.param_table_id();
get_ctx_.init_param(get_timeout_ts(), this, &allocator_,
arg_.returning_affected_rows_,
arg_.entity_type_,
arg_.binlog_row_image_type_,
arg_.returning_affected_entity_,
arg_.returning_rowkey_);
const bool is_readonly = false;
ObRowkey rowkey = const_cast<ObITableEntity&>(arg_.table_operation_.entity()).get_rowkey();
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret), K(table_id));
} else if (OB_FAIL(get_partition_id(table_id, rowkey, get_ctx_.param_partition_id()))) {
LOG_WARN("failed to get partition id", K(ret));
} else if (OB_FAIL(part_ids.push_back(get_ctx_.param_partition_id()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_UPDATE, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_increment(get_ctx_, arg_.table_operation_, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to update", K(ret), K(table_id));
}
}
int tmp_ret = ret;
if (OB_FAIL(end_trans(OB_SUCCESS != ret, req_, get_timeout_ts()))) {
LOG_WARN("failed to end trans", K(ret));
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
return ret;
}
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_TABLE_EXECUTE_PROCESSOR_H
#define _OB_TABLE_EXECUTE_PROCESSOR_H 1
#include "rpc/obrpc/ob_rpc_proxy.h"
#include "rpc/obrpc/ob_rpc_processor.h"
#include "share/table/ob_table_rpc_proxy.h"
#include "ob_table_rpc_processor.h"
#include "ob_table_service.h"
namespace oceanbase
{
namespace observer
{
/// @see RPC_S(PR5 execute, obrpc::OB_TABLE_API_EXECUTE, (table::ObTableOperationRequest), table::ObTableOperationResult);
class ObTableApiExecuteP: public ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_EXECUTE> >
{
typedef ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_EXECUTE> > ParentType;
public:
explicit ObTableApiExecuteP(const ObGlobalContext &gctx);
virtual ~ObTableApiExecuteP() = default;
virtual int deserialize() override;
virtual int process() override;
virtual int response(const int retcode) override;
protected:
virtual int check_arg() override;
virtual int try_process() override;
virtual void reset_ctx() override;
table::ObTableAPITransCb *new_callback(rpc::ObRequest *req) override;
virtual void audit_on_finish() override;
virtual uint64_t get_request_checksum() override;
private:
int check_arg2() const;
int revert_get_ctx();
int get_partition_id(uint64_t table_id, const ObRowkey &rowkey, uint64_t &partition_id);
int process_get();
int process_insert();
int process_del();
int process_update();
int process_insert_or_update();
int process_replace();
int process_increment();
private:
table::ObTableEntity request_entity_;
table::ObTableEntity result_entity_;
common::ObArenaAllocator allocator_;
table::ObTableEntityFactory<table::ObTableEntity> default_entity_factory_;
// the life of scan_ctx_ should be longer than process()
ObTableServiceGetCtx get_ctx_;
bool need_rollback_trans_;
int64_t query_timeout_ts_;
};
} // end namespace observer
} // end namespace oceanbase
#endif /* _OB_TABLE_EXECUTE_PROCESSOR_H */
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "ob_table_query_processor.h"
#include "ob_table_rpc_processor_util.h"
#include "observer/ob_service.h"
#include "storage/ob_partition_service.h"
#include "ob_table_end_trans_cb.h"
#include "sql/optimizer/ob_table_location.h" // ObTableLocation
#include "lib/stat/ob_diagnose_info.h"
#include "lib/stat/ob_session_stat.h"
using namespace oceanbase::observer;
using namespace oceanbase::common;
using namespace oceanbase::table;
using namespace oceanbase::share;
using namespace oceanbase::sql;
ObTableQueryP::ObTableQueryP(const ObGlobalContext &gctx)
:ObTableRpcProcessor(gctx),
allocator_(ObModIds::TABLE_PROC),
table_service_ctx_(allocator_),
result_row_count_(0)
{
// the streaming interface may return multi packet. The memory may be freed after the first packet has been sended.
// the deserialization of arg_ is shallow copy, so we need deep copy data to processor
set_preserve_recv_data();
}
int ObTableQueryP::deserialize()
{
arg_.query_.set_deserialize_allocator(&allocator_);
return ParentType::deserialize();
}
int ObTableQueryP::check_arg()
{
int ret = OB_SUCCESS;
if (!arg_.query_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table query request", K(ret), "query", arg_.query_);
} else if (arg_.consistency_level_ != ObTableConsistencyLevel::STRONG) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("some options not supported yet", K(ret),
"consistency_level", arg_.consistency_level_);
}
return ret;
}
void ObTableQueryP::audit_on_finish()
{
audit_record_.consistency_level_ = ObTableConsistencyLevel::STRONG == arg_.consistency_level_ ?
ObConsistencyLevel::STRONG : ObConsistencyLevel::WEAK;
audit_record_.return_rows_ = result_.get_row_count();
audit_record_.table_scan_ = true; // todo: exact judgement
audit_record_.affected_rows_ = result_.get_row_count();
audit_record_.try_cnt_ = retry_count_ + 1;
}
uint64_t ObTableQueryP::get_request_checksum()
{
uint64_t checksum = 0;
checksum = ob_crc64(checksum, arg_.table_name_.ptr(), arg_.table_name_.length());
checksum = ob_crc64(checksum, &arg_.consistency_level_, sizeof(arg_.consistency_level_));
const uint64_t op_checksum = arg_.query_.get_checksum();
checksum = ob_crc64(checksum, &op_checksum, sizeof(op_checksum));
return checksum;
}
void ObTableQueryP::reset_ctx()
{
table_service_ctx_.reset_query_ctx(part_service_);
need_retry_in_queue_ = false;
result_row_count_ = 0;
ObTableApiProcessorBase::reset_ctx();
}
ObTableAPITransCb *ObTableQueryP::new_callback(rpc::ObRequest *req)
{
UNUSED(req);
return nullptr;
}
int ObTableQueryP::get_partition_ids(uint64_t table_id, ObIArray<int64_t> &part_ids)
{
int ret = OB_SUCCESS;
uint64_t partition_id = arg_.partition_id_;
if (OB_INVALID_ID == partition_id) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("partitioned table not supported", K(ret), K(table_id));
} else {
if (OB_FAIL(part_ids.push_back(partition_id))) {
LOG_WARN("failed to push back", K(ret));
}
}
return ret;
}
int ObTableQueryP::try_process()
{
int ret = OB_SUCCESS;
int64_t rpc_timeout = 0;
if (NULL != rpc_pkt_) {
rpc_timeout = rpc_pkt_->get_timeout();
}
const int64_t timeout_ts = get_timeout_ts();
uint64_t &table_id = table_service_ctx_.param_table_id();
table_service_ctx_.init_param(timeout_ts, this, &allocator_,
false/*ignored*/,
arg_.entity_type_,
table::ObBinlogRowImageType::MINIMAL/*ignored*/);
ObSEArray<int64_t, 1> part_ids;
const bool is_readonly = true;
ObTableQueryResultIterator *result_iterator = nullptr;
int32_t result_count = 0;
if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("should have one partition", K(ret), K(part_ids));
} else if (FALSE_IT(table_service_ctx_.param_partition_id() = part_ids.at(0))) {
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_SELECT, table_id, part_ids, timeout_ts))) {
LOG_WARN("failed to start readonly transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_query(table_service_ctx_, arg_.query_,
result_, result_iterator))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to execute query", K(ret), K(table_id));
}
} else {
// one_result references to result_
ObTableQueryResult *one_result = nullptr;
while (OB_SUCC(ret)) {
++result_count;
// the last result_ does not need flush, it will be send automatically
if (ObTimeUtility::current_time() > timeout_ts) {
ret = OB_TRANS_TIMEOUT;
LOG_WARN("exceed operatiton timeout", K(ret));
} else if (OB_FAIL(result_iterator->get_next_result(one_result))) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next result", K(ret));
}
} else if (result_iterator->has_more_result()) {
if (OB_FAIL(this->flush())) {
if (OB_ITER_END != ret) {
LOG_WARN("failed to flush result packet", K(ret));
} else {
LOG_TRACE("user abort the stream rpc", K(ret));
}
} else {
LOG_DEBUG("[yzfdebug] flush one result", K(ret), "row_count", result_.get_row_count());
result_row_count_ += result_.get_row_count();
result_.reset_except_property();
}
} else {
// no more result
result_row_count_ += result_.get_row_count();
break;
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
LOG_DEBUG("[yzfdebug] last result", K(ret), "row_count", result_.get_row_count());
NG_TRACE_EXT(tag1, OB_ID(return_rows), result_count, OB_ID(arg2), result_row_count_);
}
table_service_ctx_.destroy_result_iterator(part_service_);
bool need_rollback_trans = (OB_SUCCESS != ret);
int tmp_ret = ret;
if (OB_FAIL(end_trans(need_rollback_trans, req_, timeout_ts))) {
LOG_WARN("failed to end trans", K(ret), "rollback", need_rollback_trans);
}
ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret;
// record events
stat_event_type_ = ObTableProccessType::TABLE_API_TABLE_QUERY;// table query
audit_row_count_ = result_row_count_;
#ifndef NDEBUG
// debug mode
LOG_INFO("[TABLE] execute query", K(ret), K_(arg), K(rpc_timeout),
K_(retry_count), K(result_count), K_(result_row_count));
#else
// release mode
LOG_TRACE("[TABLE] execute query", K(ret), K_(arg), K(rpc_timeout), K_(retry_count),
"receive_ts", get_receive_timestamp(), K(result_count), K_(result_row_count));
#endif
return ret;
}
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_TABLE_QUERY_PROCESSOR_H
#define _OB_TABLE_QUERY_PROCESSOR_H 1
#include "rpc/obrpc/ob_rpc_proxy.h"
#include "rpc/obrpc/ob_rpc_processor.h"
#include "share/table/ob_table_rpc_proxy.h"
#include "ob_table_rpc_processor.h"
#include "ob_table_service.h"
namespace oceanbase
{
namespace observer
{
class ObTableQueryP: public ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_EXECUTE_QUERY> >
{
typedef ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_EXECUTE_QUERY> > ParentType;
public:
explicit ObTableQueryP(const ObGlobalContext &gctx);
virtual ~ObTableQueryP() {}
virtual int deserialize() override;
protected:
virtual int check_arg() override;
virtual int try_process() override;
virtual void reset_ctx() override;
virtual table::ObTableAPITransCb *new_callback(rpc::ObRequest *req) override;
virtual void audit_on_finish() override;
virtual uint64_t get_request_checksum() override;
private:
int get_partition_ids(uint64_t table_id, common::ObIArray<int64_t> &part_ids);
DISALLOW_COPY_AND_ASSIGN(ObTableQueryP);
private:
common::ObArenaAllocator allocator_;
ObTableServiceQueryCtx table_service_ctx_;
int64_t result_row_count_;
};
} // end namespace observer
} // end namespace oceanbase
#endif /* _OB_TABLE_QUERY_PROCESSOR_H */
此差异已折叠。
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_TABLE_RPC_PROCESSOR_H
#define _OB_TABLE_RPC_PROCESSOR_H 1
#include "rpc/obrpc/ob_rpc_proxy.h"
#include "rpc/obrpc/ob_rpc_processor.h"
#include "share/table/ob_table_rpc_proxy.h"
#include "sql/ob_sql_trans_control.h"
#include "sql/optimizer/ob_table_location.h" // ObTableLocation
#include "ob_table_service.h"
#include "sql/monitor/ob_exec_stat.h"
namespace oceanbase
{
namespace table
{
class ObTableAPITransCb;
} // end namespace table
namespace observer
{
class ObGlobalContext;
class ObTableService;
struct ObTableApiCredential final
{
OB_UNIS_VERSION(1);
public:
ObTableApiCredential();
~ObTableApiCredential();
public:
int64_t cluster_id_;
uint64_t tenant_id_;
uint64_t user_id_;
uint64_t database_id_;
int64_t expire_ts_;
uint64_t hash_val_;
public:
uint64_t hash(uint64_t seed = 0) const;
TO_STRING_KV(K_(cluster_id),
K_(tenant_id),
K_(user_id),
K_(database_id),
K_(expire_ts),
K_(hash_val));
};
/// @see RPC_S(PR5 login, obrpc::OB_TABLE_API_LOGIN, (table::ObTableLoginRequest), table::ObTableLoginResult);
class ObTableLoginP: public obrpc::ObRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_LOGIN> >
{
typedef obrpc::ObRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_LOGIN> > ParentType;
public:
explicit ObTableLoginP(const ObGlobalContext &gctx)
:gctx_(gctx)
{}
virtual ~ObTableLoginP() = default;
virtual int process() override;
private:
int get_ids();
int verify_password(const ObString &tenant, const ObString &user, const ObString &pass_secret,
const ObString &pass_scramble, const ObString &database, uint64_t &user_token);
int generate_credential(uint64_t tenant_id, uint64_t user_id, uint64_t database,
int64_t ttl_us, uint64_t user_token, ObString &credential);
private:
static const int64_t CREDENTIAL_BUF_SIZE = 256;
private:
const ObGlobalContext &gctx_;
char credential_buf_[CREDENTIAL_BUF_SIZE];
};
class ObTableRetryPolicy
{
public:
ObTableRetryPolicy()
: allow_retry_(true),
allow_rpc_retry_(true),
local_retry_interval_us_(10),
max_local_retry_count_(5)
{}
virtual ~ObTableRetryPolicy() {}
bool allow_retry() const { return allow_retry_; }
// rpc retry will receate the processor,
// so there is no retry count limit for now.
bool allow_rpc_retry() const { return allow_retry_ && allow_rpc_retry_; }
public:
bool allow_retry_;
bool allow_rpc_retry_;
int64_t local_retry_interval_us_;
int64_t max_local_retry_count_;
};
/*
* Normally, the rpc process flow is:
* 1. deserialize
* 2. before_process
* 3. process
* 4. before_response
* 5. response
* 6. after_process
* 7. cleanup
*
* Attention:
* After response or async_commit_trans,
* all buffer related to the request (such as req_) may recycled by the network frame.
* DO NOT access these memory in after_process() and cleanup().
*/
/// Base class of all table api processor
class ObTableApiProcessorBase
{
public:
explicit ObTableApiProcessorBase(const ObGlobalContext &gctx);
virtual ~ObTableApiProcessorBase() = default;
public:
static int init_session();
int check_user_access(const ObString &credential_str);
//@{ transaction control
int start_trans(bool is_readonly, const sql::stmt::StmtType stmt_type, uint64_t table_id,
const common::ObIArray<int64_t> &part_ids, int64_t timeout_ts);
int end_trans(bool is_rollback, rpc::ObRequest *req, int64_t timeout_ts, bool use_sync = false);
inline bool did_async_end_trans() const { return did_async_end_trans_; }
inline transaction::ObTransDesc& get_trans_desc() { return trans_desc_; }
int get_partition_by_rowkey(uint64_t table_id, const ObIArray<common::ObRowkey> &rowkeys,
common::ObIArray<int64_t> &part_ids,
common::ObIArray<sql::RowkeyArray> &rowkeys_per_part);
int get_table_id(const ObString &table_name, const uint64_t arg_table_id, uint64_t &real_table_id) const;
protected:
virtual int check_arg() = 0;
virtual int try_process() = 0;
virtual table::ObTableAPITransCb *new_callback(rpc::ObRequest *req) = 0;
virtual void set_req_has_wokenup() = 0;
virtual void reset_ctx();
int process_with_retry(const ObString &credential, const int64_t timeout_ts);
// audit
bool need_audit() const;
void start_audit(const rpc::ObRequest *req);
void end_audit();
virtual void audit_on_finish() {}
virtual void save_request_string() = 0;
virtual void generate_sql_id() = 0;
private:
int get_participants(uint64_t table_id, const common::ObIArray<int64_t> &part_ids,
common::ObPartitionLeaderArray &partition_leaders);
int get_participants_from_lc(uint64_t table_id, const common::ObIArray<int64_t> &part_ids,
common::ObPartitionLeaderArray &partition_leaders);
int get_participants_optimistic(uint64_t table_id, const common::ObIArray<int64_t> &part_ids,
common::ObPartitionLeaderArray &partition_leaders);
int async_commit_trans(rpc::ObRequest *req, int64_t timeout_ts);
int sync_end_trans(bool is_rollback, int64_t timeout_ts);
int generate_schema_info_arr(const uint64_t table_id,
const common::ObPartitionArray &participants,
transaction::ObPartitionSchemaInfoArray &schema_info_arr);
//@}
protected:
const ObGlobalContext &gctx_;
storage::ObPartitionService *part_service_;
ObTableService *table_service_;
ObTableApiCredential credential_;
int32_t stat_event_type_;
int64_t audit_row_count_;
bool need_audit_;
const char *request_string_;
int64_t request_string_len_;
sql::ObAuditRecordData audit_record_;
ObArenaAllocator audit_allocator_;
ObTableRetryPolicy retry_policy_;
bool need_retry_in_queue_;
int32_t retry_count_;
private:
// trans control
ObPartitionLeaderArray participants_;
sql::TransState trans_state_;
transaction::ObTransDesc trans_desc_;
//part_epoch_list_ record the epoch id of response_partitions_
//when start_participants executed in the leader replica
transaction::ObPartitionEpochArray part_epoch_list_;
bool did_async_end_trans_;
};
template<class T>
class ObTableRpcProcessor: public obrpc::ObRpcProcessor<T>, public ObTableApiProcessorBase
{
typedef obrpc::ObRpcProcessor<T> RpcProcessor;
public:
explicit ObTableRpcProcessor(const ObGlobalContext &gctx) : ObTableApiProcessorBase(gctx) {}
virtual ~ObTableRpcProcessor() = default;
virtual int deserialize() override;
virtual int before_process() override;
virtual int process() override;
virtual int before_response() override;
virtual int response(const int retcode) override;
virtual int after_process() override;
protected:
virtual void set_req_has_wokenup() override;
int64_t get_timeout_ts() const;
virtual void save_request_string() override;
virtual void generate_sql_id() override;
virtual uint64_t get_request_checksum() = 0;
};
} // end namespace observer
} // end namespace oceanbase
#endif /* _OB_TABLE_RPC_PROCESSOR_H */
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -34,6 +34,11 @@ ob_set_subtarget(ob_share config
config/ob_system_config_key.cpp
)
ob_set_subtarget(ob_share table
table/ob_table.cpp
table/ob_table_rpc_struct.cpp
)
file(GLOB SCHEMA_CPPS "inner_table/ob_inner_table_schema.*.cpp")
ob_set_subtarget(ob_share inner_table
inner_table/ob_inner_table_schema_misc.ipp
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -4,3 +4,4 @@ ob_unittest(test_omt_worker omt/test_worker.cpp)
ob_unittest(test_worker_pool omt/test_worker_pool.cpp)
ob_unittest(test_token_calcer omt/test_token_calcer.cpp)
ob_unittest(test_information_schema)
ob_unittest(test_tableapi tableapi/test_tableapi.cpp)
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册