diff --git a/src/observer/table/ob_htable_filter_operator.cpp b/src/observer/table/ob_htable_filter_operator.cpp index 716b66a202fa1f10380cbb8c070c95ca90ef6700..981d1b18363fbd269c03eb2d2f774c35f5799cac 100644 --- a/src/observer/table/ob_htable_filter_operator.cpp +++ b/src/observer/table/ob_htable_filter_operator.cpp @@ -1067,14 +1067,15 @@ void ObHTableRowIterator::set_max_version(int32_t max_version) //////////////////////////////////////////////////////////////// ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query, table::ObTableQueryResult &one_result) - :query_(query), + :query_(&query), row_iterator_(query), - one_result_(one_result), + one_result_(&one_result), hfilter_(NULL), batch_size_(query.get_batch()), max_result_size_(std::min(query.get_max_result_size(), static_cast(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))), - is_first_result_(true) + is_first_result_(true), + is_query_sync_(false) { } @@ -1082,26 +1083,28 @@ ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query, int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result) { int ret = OB_SUCCESS; - if (is_first_result_) { - is_first_result_ = false; - if (0 != one_result_.get_property_count()) { + if (is_first_result_ || is_query_sync_) { + if (0 != one_result_->get_property_count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("property should be empty", K(ret)); } - const ObIArray &select_columns = query_.get_select_columns(); + const ObIArray &select_columns = query_->get_select_columns(); const int64_t N = select_columns.count(); for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { - if (OB_FAIL(one_result_.add_property_name(select_columns.at(i)))) { + if (OB_FAIL(one_result_->add_property_name(select_columns.at(i)))) { LOG_WARN("failed to copy name", K(ret)); } } // end for + if (is_first_result_) { + is_first_result_ = false; + } } else { - one_result_.reset_except_property(); + one_result_->reset_except_property(); } if (OB_SUCC(ret)) { bool has_filter_row = (NULL != hfilter_) && (hfilter_->has_filter_row()); - next_result = &one_result_; + next_result = one_result_; ObTableQueryResult *htable_row = nullptr; // ObNewRow first_entity; // ObObj first_entity_cells[4]; @@ -1156,14 +1159,15 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result) } /* @todo check batch limit and size limit */ // We have got one hbase row, store it to this batch - if (OB_FAIL(one_result_.add_all_row(*htable_row))) { + if (OB_FAIL(one_result_->add_all_row(*htable_row))) { LOG_WARN("failed to add cells to row", K(ret)); } if (NULL != hfilter_) { hfilter_->reset(); } if (OB_SUCC(ret)) { - if (one_result_.reach_batch_size_or_result_size(batch_size_, max_result_size_)) { + if (one_result_->reach_batch_size_or_result_size(batch_size_, max_result_size_)) { + LOG_DEBUG("htable reach_batch_size_or_result_size", K(batch_size_), K(max_result_size_)); break; } } @@ -1173,17 +1177,17 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result) } } if (OB_ITER_END == ret - && one_result_.get_row_count() > 0) { + && one_result_->get_row_count() > 0) { ret = OB_SUCCESS; } - LOG_DEBUG("[yzfdebug] get_next_result", K(ret), "row_count", one_result_.get_row_count()); + LOG_DEBUG("htable get_next_result", K(ret), "row_count", one_result_->get_row_count()); return ret; } int ObHTableFilterOperator::parse_filter_string(common::ObArenaAllocator* allocator) { int ret = OB_SUCCESS; - const ObString &hfilter_string = query_.get_htable_filter().get_filter(); + const ObString &hfilter_string = query_->get_htable_filter().get_filter(); if (hfilter_string.empty()) { hfilter_ = NULL; } else if (NULL == allocator) { diff --git a/src/observer/table/ob_htable_filter_operator.h b/src/observer/table/ob_htable_filter_operator.h index 874d9a36e17cdeb3ed8ca07ce0ab2ef29b763d8d..fb1e3ba2833f1306f07770c8e016a70ebef02586 100644 --- a/src/observer/table/ob_htable_filter_operator.h +++ b/src/observer/table/ob_htable_filter_operator.h @@ -247,15 +247,26 @@ public: void set_max_version(int32_t max_version_value) { row_iterator_.set_max_version(max_version_value); } // parse the filter string int parse_filter_string(common::ObArenaAllocator* allocator); + +public: + // query async + virtual void set_one_result(ObTableQueryResult *result) {one_result_ = result;} + void set_query(const ObTableQuery *query) {query_ = query;} + void set_query_sync() { is_query_sync_ = true ; } + private: - const ObTableQuery &query_; + const ObTableQuery *query_; ObHTableRowIterator row_iterator_; - table::ObTableQueryResult &one_result_; + table::ObTableQueryResult *one_result_; table::ObHTableFilterParser filter_parser_; table::hfilter::Filter *hfilter_; int32_t batch_size_; int64_t max_result_size_; + +private: + // query async bool is_first_result_; + bool is_query_sync_; }; } // end namespace table diff --git a/src/observer/table/ob_table_query_processor.cpp b/src/observer/table/ob_table_query_processor.cpp index 2f86e41fa6bda411afad32f0a0078132f130ce57..647b6a49c956d9826514e283997bf7a1a7d4866a 100644 --- a/src/observer/table/ob_table_query_processor.cpp +++ b/src/observer/table/ob_table_query_processor.cpp @@ -107,6 +107,20 @@ int ObTableQueryP::get_partition_ids(uint64_t table_id, ObIArray &part_ return ret; } +void ObTableQueryP::set_htable_compressor() +{ + int ret = OB_SUCCESS; + // hbase model, compress the result packet + ObCompressorType compressor_type = INVALID_COMPRESSOR; + if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type( + GCONF.tableapi_transport_compress_func, compressor_type))) { + compressor_type = INVALID_COMPRESSOR; + } else if (NONE_COMPRESSOR == compressor_type) { + compressor_type = INVALID_COMPRESSOR; + } + this->set_result_compress_type(compressor_type); +} + int ObTableQueryP::try_process() { int ret = OB_SUCCESS; @@ -143,16 +157,7 @@ int ObTableQueryP::try_process() } else { if (arg_.query_.get_htable_filter().is_valid()) { // hbase model, compress the result packet - ObCompressorType compressor_type = INVALID_COMPRESSOR; - if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type( - GCONF.tableapi_transport_compress_func, compressor_type))) { - compressor_type = INVALID_COMPRESSOR; - } else if (NONE_COMPRESSOR == compressor_type) { - compressor_type = INVALID_COMPRESSOR; - } - this->set_result_compress_type(compressor_type); - ret = OB_SUCCESS; // reset ret - LOG_DEBUG("[yzfdebug] use compressor", K(compressor_type)); + set_htable_compressor(); } // one_result references to result_ ObTableQueryResult *one_result = nullptr; diff --git a/src/observer/table/ob_table_query_processor.h b/src/observer/table/ob_table_query_processor.h index bca6f21abc290fa39396faeb130f2c8045c36ea9..51247d7f04770656065d31ae4dc917eaaf24a2b6 100644 --- a/src/observer/table/ob_table_query_processor.h +++ b/src/observer/table/ob_table_query_processor.h @@ -39,7 +39,8 @@ protected: virtual uint64_t get_request_checksum() override; private: - int get_partition_ids(uint64_t table_id, common::ObIArray &part_ids); + int get_partition_ids(uint64_t table_id, common::ObIArray &part_ids); + void set_htable_compressor(); DISALLOW_COPY_AND_ASSIGN(ObTableQueryP); private: common::ObArenaAllocator allocator_; diff --git a/src/observer/table/ob_table_query_sync_processor.cpp b/src/observer/table/ob_table_query_sync_processor.cpp index c98047e53c9be65556c614043ec7bdbc9b54931d..abeea504b041cf0ee7809dff1c60301468e5da1f 100644 --- a/src/observer/table/ob_table_query_sync_processor.cpp +++ b/src/observer/table/ob_table_query_sync_processor.cpp @@ -22,6 +22,7 @@ #include "observer/ob_server.h" #include "lib/string/ob_strings.h" #include "lib/rc/ob_rc.h" +#include "observer/table/ob_htable_filter_operator.h" using namespace oceanbase::observer; using namespace oceanbase::common; @@ -58,6 +59,15 @@ void ObTableQuerySyncSession::set_result_iterator(ObNormalTableQueryResultIterat } } +void ObTableQuerySyncSession::set_htable_result_iterator(table::ObHTableFilterOperator *query_result) +{ + htable_result_iterator_ = query_result; + if (OB_NOT_NULL(htable_result_iterator_)) { + htable_result_iterator_->set_query(&query_); + htable_result_iterator_->set_query_sync(); + } +} + int ObTableQuerySyncSession::init() { int ret = OB_SUCCESS; @@ -365,6 +375,21 @@ int ObTableQuerySyncP::get_session_id(uint64_t &real_sessid, uint64_t arg_sessid return ret; } +void ObTableQuerySyncP::set_htable_compressor() +{ + int ret = OB_SUCCESS; + // hbase model, compress the result packet + ObCompressorType compressor_type = INVALID_COMPRESSOR; + if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type( + GCONF.tableapi_transport_compress_func, compressor_type))) { + compressor_type = INVALID_COMPRESSOR; + } else if (NONE_COMPRESSOR == compressor_type) { + compressor_type = INVALID_COMPRESSOR; + } + this->set_result_compress_type(compressor_type); +} + + int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session) { int ret = OB_SUCCESS; @@ -402,7 +427,13 @@ int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSessio int ObTableQuerySyncP::query_scan_with_old_context(const int64_t timeout) { int ret = OB_SUCCESS; - ObTableQueryResultIterator *result_iterator = query_session_->get_result_iterator(); + ObTableQueryResultIterator *result_iterator; + if (arg_.query_.get_htable_filter().is_valid()) { + result_iterator = query_session_->get_htable_result_iterator(); + set_htable_compressor(); + } else { + result_iterator = query_session_->get_result_iterator(); + } if (OB_ISNULL(result_iterator)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("query result iterator null", K(ret)); @@ -430,6 +461,9 @@ int ObTableQuerySyncP::query_scan_with_new_context( ObTableQuerySyncSession *query_session, table::ObTableQueryResultIterator *result_iterator, const int64_t timeout) { int ret = OB_SUCCESS; + if (arg_.query_.get_htable_filter().is_valid()) { + set_htable_compressor(); + } ObTableQueryResult *query_result = nullptr; if (ObTimeUtility::current_time() > timeout) { ret = OB_TRANS_TIMEOUT; @@ -445,7 +479,12 @@ int ObTableQuerySyncP::query_scan_with_new_context( } else if (result_iterator->has_more_result()) { result_.is_end_ = false; query_session->deep_copy_select_columns(arg_.query_); - query_session->set_result_iterator(dynamic_cast(result_iterator)); + if (arg_.query_.get_htable_filter().is_valid()) { + query_session->set_htable_result_iterator(dynamic_cast(result_iterator)); + } else { + query_session->set_result_iterator(dynamic_cast(result_iterator)); + } + } else { result_.is_end_ = true; } @@ -497,7 +536,7 @@ int ObTableQuerySyncP::query_scan_with_init() int ObTableQuerySyncP::query_scan_without_init() { int ret = OB_SUCCESS; - if (OB_ISNULL(query_session_->get_result_iterator()) || OB_ISNULL(query_session_->get_table_service_ctx())) { + if (OB_ISNULL(query_session_->get_table_service_ctx())) { ret = OB_ERR_NULL_VALUE; LOG_WARN("unexpected null result iterator or table service context", K(ret)); } else if (OB_FAIL(query_scan_with_old_context(timeout_ts_))) { diff --git a/src/observer/table/ob_table_query_sync_processor.h b/src/observer/table/ob_table_query_sync_processor.h index aa4471cb9fbe933f8c05e76c1a82aea8b69ed114..2537047001c21d9a480d21f895f79ee73f1ac672 100644 --- a/src/observer/table/ob_table_query_sync_processor.h +++ b/src/observer/table/ob_table_query_sync_processor.h @@ -34,6 +34,7 @@ public: timestamp_(0), query_(), result_iterator_(nullptr), + htable_result_iterator_(nullptr), allocator_(ObModIds::TABLE_PROC), table_service_ctx_(allocator_), iterator_mementity_(nullptr) @@ -42,6 +43,7 @@ public: void set_timestamp(int64_t timestamp) { timestamp_ = timestamp; } void set_result_iterator(ObNormalTableQueryResultIterator* iter); + void set_htable_result_iterator(table::ObHTableFilterOperator *iter); int deep_copy_select_columns(const ObTableQuery &query); void set_in_use(bool in_use) {in_use_ = in_use;} bool is_in_use() {return in_use_;} @@ -50,6 +52,7 @@ public: int64_t get_timestamp() { return timestamp_; } ObTableServiceQueryCtx *get_table_service_ctx() {return &table_service_ctx_;} ObNormalTableQueryResultIterator *get_result_iterator() { return result_iterator_; } + table::ObHTableFilterOperator *get_htable_result_iterator() { return htable_result_iterator_; } ObArenaAllocator *get_allocator() {return &allocator_;} public: @@ -63,6 +66,7 @@ private: uint64_t timestamp_; ObTableQuery query_; // only select_columns is correct ObNormalTableQueryResultIterator *result_iterator_; + table::ObHTableFilterOperator *htable_result_iterator_; ObArenaAllocator allocator_; ObTableServiceQueryCtx table_service_ctx_; lib::MemoryContext iterator_mementity_; @@ -181,6 +185,7 @@ private: int query_scan_with_old_context(const int64_t timeout); int query_scan_with_new_context(ObTableQuerySyncSession * session_ctx, table::ObTableQueryResultIterator *result_iterator, const int64_t timeout); + void set_htable_compressor(); private: void set_trans_from_session(ObTableQuerySyncSession *query_session); diff --git a/src/observer/table/ob_table_service.cpp b/src/observer/table/ob_table_service.cpp index 6c9e725ee5557266e1b7d7147d759e7f95fbf952..b7d2395184dcc6ddf27be4e60feea5322e7cb2ac 100644 --- a/src/observer/table/ob_table_service.cpp +++ b/src/observer/table/ob_table_service.cpp @@ -21,6 +21,7 @@ #include "lib/thread_local/ob_tsi_factory.h" #include "ob_htable_filter_operator.h" #include "sql/engine/expr/ob_expr_add.h" + using namespace oceanbase::observer; using namespace oceanbase::common; using namespace oceanbase::table;