ob_htable_filter_operator.h 9.6 KB
Newer Older
X
xj0 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
 
#ifndef _OB_HTABLE_FILTER_OPERATOR_H
#define _OB_HTABLE_FILTER_OPERATOR_H 1
#include "lib/string/ob_string.h"
#include "lib/container/ob_array.h"
#include "share/table/ob_table.h"
#include "share/table/ob_table_rpc_struct.h"
#include "common/row/ob_row_iterator.h"
#include "ob_htable_utils.h"
#include "ob_htable_filter_parser.h"
#include "ob_htable_filters.h"
#include <utility>

namespace oceanbase
{
namespace table
{
class ObHColumnDescriptor final
{
public:
  ObHColumnDescriptor()
      :time_to_live_(0)
  {}
  int from_string(const common::ObString &str);

  void set_time_to_live(int32_t v) { time_to_live_ = v; }
  int32_t get_time_to_live() const { return time_to_live_; }
private:
  int32_t time_to_live_; // Time-to-live of cell contents, in seconds.
};

enum class ObHTableMatchCode
{
  INCLUDE = 0,
  INCLUDE_AND_SEEK_NEXT_COL = 1,
  INCLUDE_AND_SEEK_NEXT_ROW = 2,
  SKIP = 3,
  SEEK_NEXT_COL = 4,
  SEEK_NEXT_ROW = 5,
  SEEK_NEXT_USING_HINT = 6,
  DONE = 7,
  DONE_SCAN = 8,
  DONE_REVERSE_SCAN = 9,
};

// Interface ObHTableColumnTracker
class ObHTableColumnTracker
{
public:
  // types and constants
  typedef std::pair<common::ObString, int32_t> ColumnCount;
  class ColumnCountComparator;
  class ColumnCountReverseComparator;
public:
  ObHTableColumnTracker()
      :max_versions_(1),
       min_versions_(0),
       oldest_stamp_(0)
  {}
  virtual ~ObHTableColumnTracker() {}
  virtual int init(const table::ObHTableFilter &htable_filter, common::ObQueryFlag::ScanOrder &scan_order) = 0;
  virtual int check_column(const ObHTableCell &cell, ObHTableMatchCode &match_code) = 0;
  virtual int check_versions(const ObHTableCell &cell, ObHTableMatchCode &match_code) = 0;
  virtual const ColumnCount *get_curr_column() const = 0;
  virtual void reset() = 0;
  virtual bool done() const = 0;
  virtual int get_next_column_or_row(const ObHTableCell &cell, ObHTableMatchCode &match_code) = 0;
  virtual common::ObQueryFlag::ScanOrder get_scan_order() { return tracker_scan_order_; }
  virtual void set_scan_order(common::ObQueryFlag::ScanOrder tracker_scan_order) { tracker_scan_order_ = tracker_scan_order; }
  // Give the tracker a chance to declare it's done based on only the timestamp.
  bool is_done(int64_t timestamp) const;
  void set_ttl(int32_t ttl_value);
protected:
  int32_t max_versions_;  // default: 1
  int32_t min_versions_;  // default: 0
  int64_t oldest_stamp_;  // default: 0
  common::ObQueryFlag::ScanOrder tracker_scan_order_;
protected:
  bool is_expired(int64_t timestamp) const { return (-timestamp) < oldest_stamp_; }
private:
  // disallow copy
  DISALLOW_COPY_AND_ASSIGN(ObHTableColumnTracker);
};

class ObHTableExplicitColumnTracker: public ObHTableColumnTracker
{
public:
  ObHTableExplicitColumnTracker();
  virtual ~ObHTableExplicitColumnTracker() {}
  virtual int init(const table::ObHTableFilter &htable_filter, common::ObQueryFlag::ScanOrder &scan_order) override;

  virtual int check_column(const ObHTableCell &cell, ObHTableMatchCode &match_code) override;
  virtual int check_versions(const ObHTableCell &cell, ObHTableMatchCode &match_code) override;
  virtual int get_next_column_or_row(const ObHTableCell &cell, ObHTableMatchCode &match_code) override;
  virtual const ColumnCount *get_curr_column() const override { return curr_column_; }
  virtual void reset() override;
  virtual bool done() const override;
private:
  // disallow copy
  DISALLOW_COPY_AND_ASSIGN(ObHTableExplicitColumnTracker);
  // function members

  void done_with_column(const ObHTableCell &cell);
  ObHTableMatchCode check_version(int64_t timestamp);
private:
  common::ObSEArray<ColumnCount, common::OB_DEFAULT_SE_ARRAY_COUNT> columns_;
  int64_t curr_column_idx_;
  ColumnCount *curr_column_;
  int32_t current_count_;
};

class ObHTableWildcardColumnTracker: public ObHTableColumnTracker
{
public:
  ObHTableWildcardColumnTracker();
  virtual ~ObHTableWildcardColumnTracker() {}
  virtual int init(const table::ObHTableFilter &htable_filter, common::ObQueryFlag::ScanOrder &scan_order) override;
  virtual int check_column(const ObHTableCell &cell, ObHTableMatchCode &match_code) override;
  virtual int check_versions(const ObHTableCell &cell, ObHTableMatchCode &match_code) override;
  virtual int get_next_column_or_row(const ObHTableCell &cell, ObHTableMatchCode &match_code) override;
  virtual const ColumnCount *get_curr_column() const override { return NULL; }
  virtual void reset() override;
  virtual bool done() const override { return false; }
private:
  // disallow copy
  DISALLOW_COPY_AND_ASSIGN(ObHTableWildcardColumnTracker);
  // function members
  int reset_cell(const ObHTableCell &cell);
  ObHTableMatchCode check_version(int64_t timestamp);
private:
  // states
  common::ObArenaAllocator allocator_;
  ObString current_qualifier_;
  int32_t current_count_;
};

class ObHTableScanMatcher
{
public:
  explicit ObHTableScanMatcher(const table::ObHTableFilter &htable_filter, ObHTableColumnTracker *column_tracker = nullptr);
  virtual ~ObHTableScanMatcher() {}
  void init(ObHTableColumnTracker *tracker, table::hfilter::Filter *hfilter) { column_tracker_ = tracker; hfilter_ = hfilter; }
  void set_hfilter(table::hfilter::Filter *hfilter) { hfilter_ = hfilter; }

  int match(const ObHTableCell &cell, ObHTableMatchCode &match_code);
  int create_key_for_next_col(common::ObArenaAllocator &allocator, const ObHTableCell &cell, ObHTableCell *&next_cell);

  const ObHTableCell* get_curr_row() const;
  void clear_curr_row() { curr_row_.set_ob_row(NULL); }
  int set_to_new_row(const ObHTableCell &curr_row);
  bool is_curr_row_empty() const { return NULL == curr_row_.get_ob_row(); }
  ObHTableMatchCode merge_filter_return_code(const ObHTableCell &cell,
                                             const ObHTableMatchCode match_code,
                                             hfilter::Filter::ReturnCode filter_rc);
private:
  DISALLOW_COPY_AND_ASSIGN(ObHTableScanMatcher);
  int pre_check(const ObHTableCell &cell, ObHTableMatchCode &match_code, bool &need_match_column);
  int match_column(const ObHTableCell &cell, ObHTableMatchCode &match_code);
private:
  ObNegativeTimeRange time_range_;
  ObHTableColumnTracker *column_tracker_;
  table::hfilter::Filter *hfilter_;
  common::ObArenaAllocator allocator_;
  ObHTableCellEntity curr_row_;  // the first cell of current row
  common::ObNewRow curr_ob_row_;
};

class ObHTableRowIterator: public ObTableQueryResultIterator
{
public:
  ObHTableRowIterator(const ObTableQuery &query);
  virtual ~ObHTableRowIterator();
  /// Fetch next row
  virtual int get_next_result(ObTableQueryResult *&one_row) override;

  int seek(const ObHTableCell &key);
  void set_scan_result(common::ObNewRowIterator *scan_result) { child_op_ = scan_result; }
  bool has_more_result() const { return has_more_cells_; }
  void set_hfilter(table::hfilter::Filter *hfilter);
  void set_ttl(int32_t ttl_value);
  int add_same_kq_to_res(ObIArray<common::ObNewRow> &same_kq_cells, ObTableQueryResult *&out_result);
  ObIArray<common::ObNewRow> &get_same_kq_cells() { return same_kq_cells_; }
private:
  int next_cell();
  int reverse_next_cell(ObIArray<common::ObNewRow> &same_kq_cells, ObTableQueryResult *&out_result);
  int seek_or_skip_to_next_row(const ObHTableCell &cell);
  int seek_or_skip_to_next_col(const ObHTableCell &cell);
  bool reach_batch_limit() const;
  bool reach_size_limit() const;
private:
  common::ObNewRowIterator *child_op_;
  const table::ObHTableFilter &htable_filter_;
  table::hfilter::Filter *hfilter_;
  int32_t limit_per_row_per_cf_;
  int32_t offset_per_row_per_cf_;
  int64_t max_result_size_;
  int32_t batch_size_;
  int32_t time_to_live_; // Time-to-live of cell contents, in seconds.

  table::ObTableQueryResult one_hbase_row_;
  ObHTableCellEntity curr_cell_;
  common::ObArenaAllocator allocator_;  // used for deep copy of curr_cell_
  ObHTableColumnTracker *column_tracker_;
  ObHTableScanMatcher *matcher_;
  ObHTableWildcardColumnTracker column_tracker_wildcard_;
  ObHTableExplicitColumnTracker column_tracker_explicit_;
  ObHTableScanMatcher matcher_impl_;
  common::ObQueryFlag::ScanOrder scan_order_;
  ObSEArray<common::ObNewRow, 16> same_kq_cells_;
  int32_t cell_count_;
  int32_t count_per_row_;
  bool has_more_cells_;
};

// entry class
class ObHTableFilterOperator: public ObTableQueryResultIterator
{
public:
  ObHTableFilterOperator(const ObTableQuery &query, table::ObTableQueryResult &one_result);
  virtual ~ObHTableFilterOperator() {}
  /// Fetch next batch result
  virtual int get_next_result(ObTableQueryResult *&one_result) override;
  virtual bool has_more_result() const override { return row_iterator_.has_more_result(); }
  void set_scan_result(common::ObNewRowIterator *scan_result) { row_iterator_.set_scan_result(scan_result); }
  void set_ttl(int32_t ttl_value) { row_iterator_.set_ttl(ttl_value); }
  // parse the filter string
  int parse_filter_string(common::ObArenaAllocator* allocator);
private:
  const ObTableQuery &query_;
  ObHTableRowIterator row_iterator_;
  table::ObTableQueryResult &one_result_;
  table::ObHTableFilterParser filter_parser_;
  table::hfilter::Filter *hfilter_;
  int32_t batch_size_;
  int64_t max_result_size_;
  bool is_first_result_;
};

} // end namespace table
} // end namespace oceanbase

#endif /* _OB_HTABLE_FILTER_OPERATOR_H */