提交 19b1ebd0 编写于 作者: O obdev 提交者: wangzelin.wzl

fix: htable support query async

上级 eb18e0ba
......@@ -1067,14 +1067,15 @@ void ObHTableRowIterator::set_max_version(int32_t max_version)
////////////////////////////////////////////////////////////////
ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query,
table::ObTableQueryResult &one_result)
:query_(query),
:query_(&query),
row_iterator_(query),
one_result_(one_result),
one_result_(&one_result),
hfilter_(NULL),
batch_size_(query.get_batch()),
max_result_size_(std::min(query.get_max_result_size(),
static_cast<int64_t>(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))),
is_first_result_(true)
is_first_result_(true),
is_query_sync_(false)
{
}
......@@ -1082,26 +1083,28 @@ ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query,
int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
{
int ret = OB_SUCCESS;
if (is_first_result_) {
is_first_result_ = false;
if (0 != one_result_.get_property_count()) {
if (is_first_result_ || is_query_sync_) {
if (0 != one_result_->get_property_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("property should be empty", K(ret));
}
const ObIArray<ObString> &select_columns = query_.get_select_columns();
const ObIArray<ObString> &select_columns = query_->get_select_columns();
const int64_t N = select_columns.count();
for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i)
{
if (OB_FAIL(one_result_.add_property_name(select_columns.at(i)))) {
if (OB_FAIL(one_result_->add_property_name(select_columns.at(i)))) {
LOG_WARN("failed to copy name", K(ret));
}
} // end for
if (is_first_result_) {
is_first_result_ = false;
}
} else {
one_result_.reset_except_property();
one_result_->reset_except_property();
}
if (OB_SUCC(ret)) {
bool has_filter_row = (NULL != hfilter_) && (hfilter_->has_filter_row());
next_result = &one_result_;
next_result = one_result_;
ObTableQueryResult *htable_row = nullptr;
// ObNewRow first_entity;
// ObObj first_entity_cells[4];
......@@ -1156,14 +1159,15 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
}
/* @todo check batch limit and size limit */
// We have got one hbase row, store it to this batch
if (OB_FAIL(one_result_.add_all_row(*htable_row))) {
if (OB_FAIL(one_result_->add_all_row(*htable_row))) {
LOG_WARN("failed to add cells to row", K(ret));
}
if (NULL != hfilter_) {
hfilter_->reset();
}
if (OB_SUCC(ret)) {
if (one_result_.reach_batch_size_or_result_size(batch_size_, max_result_size_)) {
if (one_result_->reach_batch_size_or_result_size(batch_size_, max_result_size_)) {
LOG_DEBUG("htable reach_batch_size_or_result_size", K(batch_size_), K(max_result_size_));
break;
}
}
......@@ -1173,17 +1177,17 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
}
}
if (OB_ITER_END == ret
&& one_result_.get_row_count() > 0) {
&& one_result_->get_row_count() > 0) {
ret = OB_SUCCESS;
}
LOG_DEBUG("[yzfdebug] get_next_result", K(ret), "row_count", one_result_.get_row_count());
LOG_DEBUG("htable get_next_result", K(ret), "row_count", one_result_->get_row_count());
return ret;
}
int ObHTableFilterOperator::parse_filter_string(common::ObArenaAllocator* allocator)
{
int ret = OB_SUCCESS;
const ObString &hfilter_string = query_.get_htable_filter().get_filter();
const ObString &hfilter_string = query_->get_htable_filter().get_filter();
if (hfilter_string.empty()) {
hfilter_ = NULL;
} else if (NULL == allocator) {
......
......@@ -247,15 +247,26 @@ public:
void set_max_version(int32_t max_version_value) { row_iterator_.set_max_version(max_version_value); }
// parse the filter string
int parse_filter_string(common::ObArenaAllocator* allocator);
public:
// query async
virtual void set_one_result(ObTableQueryResult *result) {one_result_ = result;}
void set_query(const ObTableQuery *query) {query_ = query;}
void set_query_sync() { is_query_sync_ = true ; }
private:
const ObTableQuery &query_;
const ObTableQuery *query_;
ObHTableRowIterator row_iterator_;
table::ObTableQueryResult &one_result_;
table::ObTableQueryResult *one_result_;
table::ObHTableFilterParser filter_parser_;
table::hfilter::Filter *hfilter_;
int32_t batch_size_;
int64_t max_result_size_;
private:
// query async
bool is_first_result_;
bool is_query_sync_;
};
} // end namespace table
......
......@@ -107,6 +107,20 @@ int ObTableQueryP::get_partition_ids(uint64_t table_id, ObIArray<int64_t> &part_
return ret;
}
void ObTableQueryP::set_htable_compressor()
{
int ret = OB_SUCCESS;
// hbase model, compress the result packet
ObCompressorType compressor_type = INVALID_COMPRESSOR;
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type(
GCONF.tableapi_transport_compress_func, compressor_type))) {
compressor_type = INVALID_COMPRESSOR;
} else if (NONE_COMPRESSOR == compressor_type) {
compressor_type = INVALID_COMPRESSOR;
}
this->set_result_compress_type(compressor_type);
}
int ObTableQueryP::try_process()
{
int ret = OB_SUCCESS;
......@@ -143,16 +157,7 @@ int ObTableQueryP::try_process()
} else {
if (arg_.query_.get_htable_filter().is_valid()) {
// hbase model, compress the result packet
ObCompressorType compressor_type = INVALID_COMPRESSOR;
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type(
GCONF.tableapi_transport_compress_func, compressor_type))) {
compressor_type = INVALID_COMPRESSOR;
} else if (NONE_COMPRESSOR == compressor_type) {
compressor_type = INVALID_COMPRESSOR;
}
this->set_result_compress_type(compressor_type);
ret = OB_SUCCESS; // reset ret
LOG_DEBUG("[yzfdebug] use compressor", K(compressor_type));
set_htable_compressor();
}
// one_result references to result_
ObTableQueryResult *one_result = nullptr;
......
......@@ -39,7 +39,8 @@ protected:
virtual uint64_t get_request_checksum() override;
private:
int get_partition_ids(uint64_t table_id, common::ObIArray<int64_t> &part_ids);
int get_partition_ids(uint64_t table_id, common::ObIArray<int64_t> &part_ids);
void set_htable_compressor();
DISALLOW_COPY_AND_ASSIGN(ObTableQueryP);
private:
common::ObArenaAllocator allocator_;
......
......@@ -22,6 +22,7 @@
#include "observer/ob_server.h"
#include "lib/string/ob_strings.h"
#include "lib/rc/ob_rc.h"
#include "observer/table/ob_htable_filter_operator.h"
using namespace oceanbase::observer;
using namespace oceanbase::common;
......@@ -58,6 +59,15 @@ void ObTableQuerySyncSession::set_result_iterator(ObNormalTableQueryResultIterat
}
}
void ObTableQuerySyncSession::set_htable_result_iterator(table::ObHTableFilterOperator *query_result)
{
htable_result_iterator_ = query_result;
if (OB_NOT_NULL(htable_result_iterator_)) {
htable_result_iterator_->set_query(&query_);
htable_result_iterator_->set_query_sync();
}
}
int ObTableQuerySyncSession::init()
{
int ret = OB_SUCCESS;
......@@ -365,6 +375,21 @@ int ObTableQuerySyncP::get_session_id(uint64_t &real_sessid, uint64_t arg_sessid
return ret;
}
void ObTableQuerySyncP::set_htable_compressor()
{
int ret = OB_SUCCESS;
// hbase model, compress the result packet
ObCompressorType compressor_type = INVALID_COMPRESSOR;
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type(
GCONF.tableapi_transport_compress_func, compressor_type))) {
compressor_type = INVALID_COMPRESSOR;
} else if (NONE_COMPRESSOR == compressor_type) {
compressor_type = INVALID_COMPRESSOR;
}
this->set_result_compress_type(compressor_type);
}
int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session)
{
int ret = OB_SUCCESS;
......@@ -402,7 +427,13 @@ int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSessio
int ObTableQuerySyncP::query_scan_with_old_context(const int64_t timeout)
{
int ret = OB_SUCCESS;
ObTableQueryResultIterator *result_iterator = query_session_->get_result_iterator();
ObTableQueryResultIterator *result_iterator;
if (arg_.query_.get_htable_filter().is_valid()) {
result_iterator = query_session_->get_htable_result_iterator();
set_htable_compressor();
} else {
result_iterator = query_session_->get_result_iterator();
}
if (OB_ISNULL(result_iterator)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("query result iterator null", K(ret));
......@@ -430,6 +461,9 @@ int ObTableQuerySyncP::query_scan_with_new_context(
ObTableQuerySyncSession *query_session, table::ObTableQueryResultIterator *result_iterator, const int64_t timeout)
{
int ret = OB_SUCCESS;
if (arg_.query_.get_htable_filter().is_valid()) {
set_htable_compressor();
}
ObTableQueryResult *query_result = nullptr;
if (ObTimeUtility::current_time() > timeout) {
ret = OB_TRANS_TIMEOUT;
......@@ -445,7 +479,12 @@ int ObTableQuerySyncP::query_scan_with_new_context(
} else if (result_iterator->has_more_result()) {
result_.is_end_ = false;
query_session->deep_copy_select_columns(arg_.query_);
query_session->set_result_iterator(dynamic_cast<ObNormalTableQueryResultIterator *>(result_iterator));
if (arg_.query_.get_htable_filter().is_valid()) {
query_session->set_htable_result_iterator(dynamic_cast<table::ObHTableFilterOperator *>(result_iterator));
} else {
query_session->set_result_iterator(dynamic_cast<ObNormalTableQueryResultIterator *>(result_iterator));
}
} else {
result_.is_end_ = true;
}
......@@ -497,7 +536,7 @@ int ObTableQuerySyncP::query_scan_with_init()
int ObTableQuerySyncP::query_scan_without_init()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(query_session_->get_result_iterator()) || OB_ISNULL(query_session_->get_table_service_ctx())) {
if (OB_ISNULL(query_session_->get_table_service_ctx())) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("unexpected null result iterator or table service context", K(ret));
} else if (OB_FAIL(query_scan_with_old_context(timeout_ts_))) {
......
......@@ -34,6 +34,7 @@ public:
timestamp_(0),
query_(),
result_iterator_(nullptr),
htable_result_iterator_(nullptr),
allocator_(ObModIds::TABLE_PROC),
table_service_ctx_(allocator_),
iterator_mementity_(nullptr)
......@@ -42,6 +43,7 @@ public:
void set_timestamp(int64_t timestamp) { timestamp_ = timestamp; }
void set_result_iterator(ObNormalTableQueryResultIterator* iter);
void set_htable_result_iterator(table::ObHTableFilterOperator *iter);
int deep_copy_select_columns(const ObTableQuery &query);
void set_in_use(bool in_use) {in_use_ = in_use;}
bool is_in_use() {return in_use_;}
......@@ -50,6 +52,7 @@ public:
int64_t get_timestamp() { return timestamp_; }
ObTableServiceQueryCtx *get_table_service_ctx() {return &table_service_ctx_;}
ObNormalTableQueryResultIterator *get_result_iterator() { return result_iterator_; }
table::ObHTableFilterOperator *get_htable_result_iterator() { return htable_result_iterator_; }
ObArenaAllocator *get_allocator() {return &allocator_;}
public:
......@@ -63,6 +66,7 @@ private:
uint64_t timestamp_;
ObTableQuery query_; // only select_columns is correct
ObNormalTableQueryResultIterator *result_iterator_;
table::ObHTableFilterOperator *htable_result_iterator_;
ObArenaAllocator allocator_;
ObTableServiceQueryCtx table_service_ctx_;
lib::MemoryContext iterator_mementity_;
......@@ -181,6 +185,7 @@ private:
int query_scan_with_old_context(const int64_t timeout);
int query_scan_with_new_context(ObTableQuerySyncSession * session_ctx, table::ObTableQueryResultIterator *result_iterator,
const int64_t timeout);
void set_htable_compressor();
private:
void set_trans_from_session(ObTableQuerySyncSession *query_session);
......
......@@ -21,6 +21,7 @@
#include "lib/thread_local/ob_tsi_factory.h"
#include "ob_htable_filter_operator.h"
#include "sql/engine/expr/ob_expr_add.h"
using namespace oceanbase::observer;
using namespace oceanbase::common;
using namespace oceanbase::table;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册