ob_hash_join_op.h 24.0 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef SRC_SQL_ENGINE_JOIN_OB_HASH_JOIN_OP_H_
#define SRC_SQL_ENGINE_JOIN_OB_HASH_JOIN_OP_H_

#include "sql/engine/join/ob_join_op.h"
#include "share/datum/ob_datum_funcs.h"
#include "sql/engine/join/ob_hash_join_basic.h"
#include "lib/container/ob_bit_set.h"
#include "sql/engine/ob_sql_mem_mgr_processor.h"
#include "lib/container/ob_2d_array.h"
#include "sql/engine/aggregate/ob_exec_hash_struct.h"

namespace oceanbase {
namespace sql {

class ObHashJoinSpec : public ObJoinSpec {
  OB_UNIS_VERSION_V(1);

G
gm 已提交
30
public:
O
oceanbase-admin 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43
  ObHashJoinSpec(common::ObIAllocator& alloc, const ObPhyOperatorType type);

  ExprFixedArray equal_join_conds_;
  ExprFixedArray all_join_keys_;
  common::ObHashFuncs all_hash_funcs_;
  bool has_join_bf_;
};

// hash join has no expression result overwrite problem:
//  LEFT: is block, do not care the overwrite.
//  RIGHT: overwrite with blank_right_row() in JS_FILL_LEFT state, right child also iterated end.

class ObHashJoinOp : public ObJoinOp {
G
gm 已提交
44
public:
O
oceanbase-admin 已提交
45 46 47 48 49 50 51
  ObHashJoinOp(ObExecContext& exec_ctx, const ObOpSpec& spec, ObOpInput* input);
  ~ObHashJoinOp()
  {}

  using BucketFunc = std::function<int64_t(int64_t, int64_t)>;
  using NextFunc = std::function<int(const ObHashJoinStoredJoinRow*& right_read_row)>;

G
gm 已提交
52
private:
O
oceanbase-admin 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65
  enum HJState { INIT, NORMAL, NEXT_BATCH };
  enum HJProcessor { NONE = 0, NEST_LOOP = 1, RECURSIVE = 2, IN_MEMORY = 4 };
  enum ObJoinState {
    JS_JOIN_END,
    JS_READ_RIGHT,
    JS_READ_HASH_ROW,
    JS_LEFT_ANTI_SEMI,  // for anti_semi
    JS_FILL_LEFT,       // for left_outer,full_outer
    JS_STATE_COUNT
  };
  enum ObFuncType { FT_ITER_GOING = 0, FT_ITER_END, FT_TYPE_COUNT };
  enum HJLoopState { LOOP_START, LOOP_GOING, LOOP_RECURSIVE, LOOP_END };

G
gm 已提交
66
private:
O
oceanbase-admin 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  struct HashTableCell {
    HashTableCell() = default;
    // do NOT init these members below in constructor, since we will set them
    // before using them. for performance.
    ObHashJoinStoredJoinRow* stored_row_;
    HashTableCell* next_tuple_;
    TO_STRING_KV(K_(stored_row), K(static_cast<void*>(next_tuple_)));
  };
  struct PartHashJoinTable {
    PartHashJoinTable()
        : buckets_(nullptr),
          all_cells_(nullptr),
          collision_cnts_(nullptr),
          nbuckets_(0),
          row_count_(0),
          inited_(false),
          ht_alloc_(nullptr)
    {}
    void reset()
    {
      if (OB_NOT_NULL(buckets_)) {
        buckets_->reset();
      }
      if (OB_NOT_NULL(collision_cnts_)) {
        collision_cnts_->reset();
      }
      if (OB_NOT_NULL(all_cells_)) {
        all_cells_->reset();
      }
      nbuckets_ = 0;
    }
    int init(ObIAllocator& alloc);
    void free(ObIAllocator* alloc)
    {
      reset();
      if (OB_NOT_NULL(buckets_)) {
        buckets_->destroy();
        alloc->free(buckets_);
        buckets_ = nullptr;
      }
      if (OB_NOT_NULL(collision_cnts_)) {
        collision_cnts_->destroy();
        alloc->free(collision_cnts_);
        collision_cnts_ = nullptr;
      }
      if (OB_NOT_NULL(all_cells_)) {
        all_cells_->destroy();
        alloc->free(all_cells_);
        all_cells_ = nullptr;
      }
      if (OB_NOT_NULL(ht_alloc_)) {
        ht_alloc_->reset();
        ht_alloc_->~ModulePageAllocator();
        alloc->free(ht_alloc_);
        ht_alloc_ = nullptr;
      }
      inited_ = false;
    }
    void inc_collision(int64_t bucket_id)
    {
      if (255 > collision_cnts_->at(bucket_id)) {
        ++collision_cnts_->at(bucket_id);
      }
    }
    void get_collision_info(uint8_t& min_cnt, uint8_t& max_cnt, int64_t& total_cnt, int64_t& used_bucket_cnt)
    {
      min_cnt = 255;
      max_cnt = 0;
      total_cnt = 0;
      used_bucket_cnt = 0;
      if (collision_cnts_->count() != nbuckets_) {
        SQL_ENG_LOG(
            WARN, "unexpected: collision_cnts_->count() != nbuckets_", K(collision_cnts_->count()), K(nbuckets_));
      } else {
        for (int64_t i = 0; i < nbuckets_; ++i) {
          int64_t cnt = collision_cnts_->at(i);
          if (0 < cnt) {
            ++used_bucket_cnt;
            total_cnt += cnt;
            if (min_cnt > cnt) {
              min_cnt = cnt;
            }
            if (max_cnt < cnt) {
              max_cnt = cnt;
            }
          }
        }
      }
      if (0 == used_bucket_cnt) {
        min_cnt = 0;  // the initial value is 255.
      }
    }
    using BucketArray =
        common::ObSegmentArray<HashTableCell*, OB_MALLOC_MIDDLE_BLOCK_SIZE, common::ModulePageAllocator>;
    using AllCellArray = common::ObSegmentArray<HashTableCell, OB_MALLOC_BIG_BLOCK_SIZE, common::ModulePageAllocator>;
    using CollisionCntArray = common::ObSegmentArray<uint8_t, OB_MALLOC_BIG_BLOCK_SIZE, common::ModulePageAllocator>;

    BucketArray* buckets_;
    AllCellArray* all_cells_;
    CollisionCntArray* collision_cnts_;
    int64_t nbuckets_;
    int64_t row_count_;
    bool inited_;
    ModulePageAllocator* ht_alloc_;
  };

  struct HistItem {
    int64_t hash_value_;
    ObHashJoinStoredJoinRow* store_row_;
    TO_STRING_KV(K_(hash_value), K(static_cast<void*>(store_row_)));
  };
  struct ResultItem {
    HistItem left_;
    HistItem right_;
    bool is_match_;
  };
  class HashJoinHistogram {
G
gm 已提交
184
  public:
O
oceanbase-admin 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
    HashJoinHistogram()
        : h1_(nullptr),
          h2_(nullptr),
          prefix_hist_count_(nullptr),
          prefix_hist_count2_(nullptr),
          hist_alloc_(nullptr),
          enable_bloom_filter_(false),
          bloom_filter_(nullptr),
          alloc_(nullptr),
          row_count_(0),
          bucket_cnt_(0)
    {}
    ~HashJoinHistogram()
    {
      reset();
    }
    void reset()
    {
      if (OB_NOT_NULL(alloc_)) {
        if (OB_NOT_NULL(h1_)) {
          h1_->reset();
          alloc_->free(h1_);
          h1_ = nullptr;
        }
        if (OB_NOT_NULL(h2_)) {
          h2_->reset();
          alloc_->free(h2_);
          h2_ = nullptr;
        }
        if (OB_NOT_NULL(prefix_hist_count_)) {
          prefix_hist_count_->reset();
          alloc_->free(prefix_hist_count_);
          prefix_hist_count_ = nullptr;
        }
        if (OB_NOT_NULL(prefix_hist_count2_)) {
          prefix_hist_count2_->reset();
          alloc_->free(prefix_hist_count2_);
          prefix_hist_count2_ = nullptr;
        }
        if (OB_NOT_NULL(bloom_filter_)) {
          bloom_filter_->~ObGbyBloomFilter();
          alloc_->free(bloom_filter_);
          bloom_filter_ = nullptr;
        }
        if (OB_NOT_NULL(hist_alloc_)) {
          hist_alloc_->reset();
          hist_alloc_->~ModulePageAllocator();
          alloc_->free(hist_alloc_);
          hist_alloc_ = nullptr;
        }
      }
      alloc_ = nullptr;
      row_count_ = 0;
      bucket_cnt_ = 0;
      enable_bloom_filter_ = false;
    }

    OB_INLINE int64_t get_bucket_idx(const uint64_t hash_value)
    {
      return hash_value & (bucket_cnt_ - 1);
    }
    int init(ObIAllocator* alloc, int64_t row_count, int64_t bucket_cnt, bool enable_bloom_filter);
    static int64_t calc_memory_size(int64_t row_count)
    {
      return next_pow2(row_count * RATIO_OF_BUCKETS) * (sizeof(HistItem) * 2 + sizeof(int64_t));
    }
    bool empty() const
    {
      return 0 == row_count_;
    }
    int reorder_histogram(BucketFunc bucket_func);
    int calc_prefix_histogram();
    void switch_histogram();
    void switch_prefix_hist_count();

G
gm 已提交
260
  public:
O
oceanbase-admin 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274
    using HistItemArray = common::ObSegmentArray<HistItem, OB_MALLOC_BIG_BLOCK_SIZE, common::ModulePageAllocator>;
    using HistPrefixArray = common::ObSegmentArray<int64_t, OB_MALLOC_BIG_BLOCK_SIZE, common::ModulePageAllocator>;
    HistItemArray* h1_;
    HistItemArray* h2_;
    HistPrefixArray* prefix_hist_count_;
    HistPrefixArray* prefix_hist_count2_;
    ModulePageAllocator* hist_alloc_;
    bool enable_bloom_filter_;
    ObGbyBloomFilter* bloom_filter_;
    ObIAllocator* alloc_;
    int64_t row_count_;
    int64_t bucket_cnt_;
  };
  class PartitionSplitter {
G
gm 已提交
275
  public:
O
oceanbase-admin 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
    PartitionSplitter()
        : alloc_(nullptr),
          part_count_(0),
          hj_parts_(nullptr),
          max_level_(0),
          part_shift_(0),
          level1_bit_(0),
          level2_bit_(0),
          level_one_part_count_(0),
          level_two_part_count_(0),
          part_histogram_(),
          total_row_count_(0)
    {}
    ~PartitionSplitter()
    {
      reset();
    }
    void reset()
    {
      part_count_ = 0;
      hj_parts_ = nullptr;
      max_level_ = 0;
      part_shift_ = 0;
      level1_bit_ = 0;
      level2_bit_ = 0;
      level_one_part_count_ = 0;
      level_two_part_count_ = 0;
      part_histogram_.reset();
      total_row_count_ = 0;
      alloc_ = nullptr;
    }

    void set_part_count(int64_t part_shift, int64_t level1_part_count, int64_t level2_part_count)
    {
      part_shift_ = part_shift;
      level_one_part_count_ = level1_part_count;
      level_two_part_count_ = level2_part_count;
313 314
      level1_bit_ = (0 == level1_part_count) ? 0 : __builtin_ctz(level1_part_count);
      level2_bit_ = (0 == level2_part_count) ? 0 : __builtin_ctz(level2_part_count);
O
oceanbase-admin 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
    }
    bool is_valid()
    {
      return nullptr != hj_parts_;
    }
    int init(ObIAllocator* alloc, int64_t part_count, ObHashJoinPartition* hj_parts, int64_t max_level,
        int64_t part_shift, int64_t level1_part_count, int64_t level2_part_count);
    int repartition_by_part_array(const int64_t part_level);
    int repartition_by_part_histogram(const int64_t part_level);
    int build_hash_table_by_part_hist(HashJoinHistogram* all_part_hists, bool enable_bloom_filter);
    int build_hash_table_by_part_array(HashJoinHistogram* all_part_hists, bool enable_bloom_filter);

    OB_INLINE int64_t get_part_idx(const uint64_t hash_value)
    {
      int64_t part_idx = 0;
      if (0 == level_two_part_count_) {
        part_idx = get_part_level_one_idx(hash_value);
      } else {
        int64_t level1_part_idx = get_part_level_one_idx(hash_value);
        part_idx = get_part_level_two_idx(hash_value);
        part_idx = part_idx + level1_part_idx * level_two_part_count_;
      }
      return part_idx;
    }
    OB_INLINE int64_t get_part_level_one_idx(const uint64_t hash_value)
    {
      return (hash_value >> part_shift_) & (level_one_part_count_ - 1);
    }
    OB_INLINE int64_t get_part_level_two_idx(const uint64_t hash_value)
    {
      return ((hash_value >> part_shift_) >> level1_bit_) & (level_two_part_count_ - 1);
    }
    OB_INLINE bool is_level_one(int64_t part_level)
    {
      return 1 == part_level;
    }
    OB_INLINE bool get_total_row_count()
    {
      return total_row_count_;
    }

G
gm 已提交
356
  public:
O
oceanbase-admin 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369
    ObIAllocator* alloc_;
    int64_t part_count_;
    ObHashJoinPartition* hj_parts_;
    int64_t max_level_;
    int64_t part_shift_;
    int64_t level1_bit_;
    int64_t level2_bit_;
    int64_t level_one_part_count_;
    int64_t level_two_part_count_;
    HashJoinHistogram part_histogram_;
    int64_t total_row_count_;
  };

G
gm 已提交
370
public:
O
oceanbase-admin 已提交
371 372 373 374 375 376
  virtual int inner_open() override;
  virtual int rescan() override;
  virtual int inner_get_next_row() override;
  virtual void destroy() override;
  virtual int inner_close() override;

G
gm 已提交
377
private:
O
oceanbase-admin 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
  void calc_cache_aware_partition_count();
  int recursive_postprocess();
  int insert_batch_row(const int64_t cur_partition_in_memory);
  int insert_all_right_row(const int64_t row_count);
  OB_INLINE int64_t get_part_level_one_idx(const uint64_t hash_value)
  {
    return (hash_value >> part_shift_) & (level1_part_count_ - 1);
  }
  OB_INLINE int64_t get_part_level_two_idx(const uint64_t hash_value)
  {
    return ((hash_value >> part_shift_) >> level1_bit_) & (level2_part_count_ - 1);
  }
  OB_INLINE int64_t get_cache_aware_part_idx(const uint64_t hash_value)
  {
    int64_t part_idx = 0;
    if (0 == level2_part_count_) {
      part_idx = get_part_level_one_idx(hash_value);
    } else {
      int64_t level1_part_idx = get_part_level_one_idx(hash_value);
      part_idx = get_part_level_two_idx(hash_value);
      part_idx = part_idx + level1_part_idx * level2_part_count_;
    }
    return part_idx;
  }
  void init_system_parameters();
  inline int64_t get_level_one_part(int64_t hash_val)
  {
    return hash_val & (level1_part_count_ - 1);
  }
  inline int init_mem_context(uint64_t tenant_id);
  void part_rescan();
  int part_rescan(bool reset_all);
  void reset();
  void reset_base();

  int inner_join_read_hashrow_func_going();
  int other_join_read_hashrow_func_going();

  int inner_join_read_hashrow_func_end();
  int other_join_read_hashrow_func_end();

  int set_hash_function(int8_t hash_join_hasher);

  int next();
  int join_end_operate();
  int join_end_func_end();
424
  int get_next_left_row() override;
O
oceanbase-admin 已提交
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
  int reuse_for_next_chunk();
  int load_next_chunk();
  int build_hash_table_for_nest_loop(int64_t& num_left_rows);
  int nest_loop_process(bool& need_not_read_right);
  int64_t calc_partition_count(int64_t input_size, int64_t part_size, int64_t max_part_count);
  int64_t calc_partition_count_by_cache_aware(int64_t row_count, int64_t max_part_count, int64_t global_mem_bound_size);
  int64_t calc_max_data_size(const int64_t extra_memory_size);
  int get_max_memory_size(int64_t input_size);
  int64_t calc_bucket_number(const int64_t row_count);
  int calc_basic_info();
  int get_processor_type();
  int build_hash_table_in_memory(int64_t& num_left_rows);
  int in_memory_process(bool& need_not_read_right);
  int init_join_partition();
  int force_dump(bool for_left);
  void update_remain_data_memory_size(int64_t row_count, int64_t total_mem_size, bool& need_dump);
  bool need_more_remain_data_memory_size(int64_t row_count, int64_t total_mem_size, double& data_ratio);
  int update_remain_data_memory_size_periodically(int64_t row_count, bool& need_dump);
  int dump_build_table(int64_t row_count);
  int split_partition(int64_t& num_left_rows);
  int prepare_hash_table();
  void trace_hash_table_collision(int64_t row_cnt);
  int build_hash_table_for_recursive();
  int split_partition_and_build_hash_table(int64_t& num_left_rows);
  int recursive_process(bool& need_not_read_right);
  int adaptive_process(bool& need_not_read_right);
451
  int get_next_right_row() override;
O
oceanbase-admin 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
  int read_right_operate();
  int calc_hash_value(const ObIArray<ObExpr*>& join_keys, const ObIArray<ObHashFunc>& hash_funcs, uint64_t& hash_value);
  int calc_right_hash_value();
  int finish_dump(bool for_left, bool need_dump, bool force = false);
  int read_right_func_end();
  int calc_equal_conds(bool& is_match);
  int read_hashrow();
  int dump_probe_table();
  int read_hashrow_func_going();
  int read_hashrow_func_end();
  int find_next_matched_tuple(HashTableCell*& tuple);
  int left_anti_semi_operate();
  int left_anti_semi_going();
  int left_anti_semi_end();
  int find_next_unmatched_tuple(HashTableCell*& tuple);
  int fill_left_operate();
  int convert_exprs(const ObHashJoinStoredJoinRow* store_row, const ObIArray<ObExpr*>& exprs, bool& has_fill);
  int fill_left_going();
  int fill_left_end();
  int join_rows_with_right_null();
  int join_rows_with_left_null();
  int only_join_left_row();
  int only_join_right_row();
  int dump_remain_partition();

  int save_last_right_row();
  int restore_last_right_row();
  int get_next_batch_right_rows();
  int get_match_row(bool& is_matched);
  int get_next_right_row_for_batch(NextFunc next_func);

G
gm 已提交
483
private:
O
oceanbase-admin 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556
  OB_INLINE int64_t get_part_idx(const uint64_t hash_value)
  {
    return (hash_value >> part_shift_) & (part_count_ - 1);
  }
  OB_INLINE int64_t get_bucket_idx(const uint64_t hash_value)
  {
    return hash_value & (hash_table_.nbuckets_ - 1);
  }
  OB_INLINE bool top_part_level()
  {
    return 0 == part_level_;
  }
  void set_processor(HJProcessor p)
  {
    hj_processor_ = p;
  }
  OB_INLINE bool need_right_bitset() const
  {
    return (RIGHT_OUTER_JOIN == MY_SPEC.join_type_ || FULL_OUTER_JOIN == MY_SPEC.join_type_ ||
            RIGHT_SEMI_JOIN == MY_SPEC.join_type_ || RIGHT_ANTI_JOIN == MY_SPEC.join_type_);
  }
  OB_INLINE bool all_dumped()
  {
    return -1 == cur_dumped_partition_;
  }
  OB_INLINE int64_t get_mem_used()
  {
    return nullptr == mem_context_ ? 0 : mem_context_->used();
  }
  OB_INLINE int64_t get_data_mem_used()
  {
    return sql_mem_processor_.get_data_size();
  }
  OB_INLINE bool need_dump()
  {
    return get_mem_used() > sql_mem_processor_.get_mem_bound();
  }
  OB_INLINE bool all_in_memory(int64_t size) const
  {
    return size < remain_data_memory_size_;
  }
  void clean_batch_mgr()
  {
    if (nullptr != batch_mgr_) {
      batch_mgr_->reset();
      if (left_batch_ != NULL) {
        batch_mgr_->free(left_batch_);
        left_batch_ = NULL;
      }
      if (right_batch_ != NULL) {
        batch_mgr_->free(right_batch_);
        right_batch_ = NULL;
      }
    }
  }
  void reset_nest_loop()
  {
    nth_nest_loop_ = 0;
    cur_nth_row_ = 0;
    nth_right_row_ = -1;
    reset_statistics();
  }
  void reset_statistics()
  {
    bitset_filter_cnt_ = 0;
    probe_cnt_ = 0;
    hash_equal_cnt_ = 0;
    hash_link_cnt_ = 0;
  }
  int64_t get_extra_memory_size() const
  {
    int64_t row_count = profile_.get_row_count();
    int64_t bucket_cnt = profile_.get_bucket_size();
O
obdev 已提交
557
    const int64_t DEFAULT_EXTRA_SIZE = 2 * 1024 * 1024;
O
oceanbase-admin 已提交
558 559
    int64_t extra_memory_size = bucket_cnt * (sizeof(HashTableCell*) + sizeof(uint8_t));
    extra_memory_size += (row_count * sizeof(HashTableCell));
O
obdev 已提交
560
    return extra_memory_size < 0 ? DEFAULT_EXTRA_SIZE : extra_memory_size;
O
oceanbase-admin 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
  }
  void clean_nest_loop_chunk()
  {
    hash_table_.reset();
    if (nullptr != bloom_filter_) {
      bloom_filter_->reset();
    }
    right_bit_set_.reset();
    alloc_->reuse();
    reset_nest_loop();
    nest_loop_state_ = LOOP_START;
  }
  OB_INLINE void mark_return()
  {
    need_return_ = true;
  }
  int init_bloom_filter(ObIAllocator& alloc, int64_t bucket_cnt);
  void free_bloom_filter();

  bool can_use_cache_aware_opt();
  int read_hashrow_normal();
  int read_hashrow_for_cache_aware(NextFunc next_func);
  int init_histograms(HashJoinHistogram*& part_histograms, int64_t part_count);
  int partition_and_build_histograms();
  int repartition(PartitionSplitter& part_splitter, HashJoinHistogram*& part_histograms, ObHashJoinPartition* hj_parts,
      bool is_build_side);
  int get_next_probe_partition();

G
gm 已提交
589
private:
O
oceanbase-admin 已提交
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
  typedef int (ObHashJoinOp::*ReadFunc)();
  typedef int (ObHashJoinOp::*state_function_func_type)();
  typedef int (ObHashJoinOp::*state_operation_func_type)();
  static const int64_t RATIO_OF_BUCKETS = 2;
  // min row count for estimated row count
  static const int64_t MIN_ROW_COUNT = 10000;
  // max row count for estimated row count
  static const int64_t MAX_ROW_COUNT = 1000000;
  // max memory size limit --unused
  static const int64_t DEFAULT_MEM_LIMIT = 100 * 1024 * 1024;

  static const int64_t CACHE_AWARE_PART_CNT = 128;
  static const int64_t BATCH_RESULT_SIZE = 512;
  static const int64_t INIT_LTB_SIZE = 64;
  static const int64_t INIT_L2_CACHE_SIZE = 1 * 1024 * 1024;  // 1M
  static const int64_t MIN_PART_COUNT = 8;
  static const int64_t PAGE_SIZE = ObChunkDatumStore::BLOCK_SIZE;
  static const int64_t MIN_MEM_SIZE = (MIN_PART_COUNT + 1) * PAGE_SIZE;
  // use nested loop join if partition level exceeds MAX_PART_LEVEL.
  static const int64_t MAX_PART_LEVEL = 4;
  static const int64_t PART_SPLIT_LEVEL_ONE = 1;
  static const int64_t PART_SPLIT_LEVEL_TWO = 2;

  static const int8_t ENABLE_HJ_NEST_LOOP = 0x01;
  static const int8_t ENABLE_HJ_RECURSIVE = 0x02;
  static const int8_t ENABLE_HJ_IN_MEMORY = 0x04;
  static const int8_t HJ_PROCESSOR_MASK = ENABLE_HJ_NEST_LOOP | ENABLE_HJ_RECURSIVE | ENABLE_HJ_IN_MEMORY;
  static int8_t HJ_PROCESSOR_ALGO;

  static const int8_t DEFAULT_MURMUR_HASH = 0x01;
  static const int8_t ENABLE_WY_HASH = 0x02;
  static const int8_t ENABLE_XXHASH64 = 0x04;
  static const int8_t HASH_FUNCTION_MASK = DEFAULT_MURMUR_HASH | ENABLE_WY_HASH | ENABLE_XXHASH64;

  // hard code seed, 24bit max prime number
  static const int64_t HASH_SEED = 16777213;
  // about 120M
  static const int64_t MAX_NEST_LOOP_RIGHT_ROW_COUNT = 1000000000;
  static bool TEST_NEST_LOOP_TO_RECURSIVE;

  // make PART_COUNT and MAX_PAGE_COUNT configurable by unittest
  static int64_t PART_COUNT;
  static int64_t MAX_PAGE_COUNT;
  state_operation_func_type state_operation_func_[JS_STATE_COUNT];
  state_function_func_type state_function_func_[JS_STATE_COUNT][FT_TYPE_COUNT];
  HJState hj_state_;
  HJProcessor hj_processor_;
  ObHashJoinBufMgr* buf_mgr_;
  ObHashJoinBatchMgr* batch_mgr_;
  ObHashJoinBatch* left_batch_;
  ObHashJoinBatch* right_batch_;
  common::ObHashFuncs tmp_hash_funcs_;
  common::ObArrayHelper<common::ObHashFunc> left_hash_funcs_;
  common::ObArrayHelper<common::ObHashFunc> right_hash_funcs_;
  common::ObArrayHelper<ObExpr*> left_join_keys_;
  common::ObArrayHelper<ObExpr*> right_join_keys_;
  ReadFunc going_func_;
  ReadFunc end_func_;
  int32_t part_level_;
  int32_t part_shift_;
  int64_t part_count_;
  bool force_hash_join_spill_;
  int8_t hash_join_processor_;
  int64_t tenant_id_;
  int64_t input_size_;
  int64_t total_extra_size_;
  int64_t predict_row_cnt_;
  ObSqlWorkAreaProfile profile_;
  ObSqlMemMgrProcessor sql_mem_processor_;
  ObJoinState state_;
  uint64_t cur_right_hash_value_;  // cur right row's hash_value
  bool right_has_matched_;         // if cur right row has matched
  bool tuple_need_join_;           // for left_semi, left_anti_semi
  bool first_get_row_;
  int64_t cur_bkid_;  // for left,anti
  int64_t remain_data_memory_size_;
  int64_t nth_nest_loop_;
  int64_t cur_nth_row_;
  PartHashJoinTable hash_table_;
  HashTableCell* cur_tuple_;       // null or last matched tuple
  common::ObNewRow cur_left_row_;  // like cur_row_ in operator, get row from rowstore
J
jg0 已提交
671
  lib::MemoryContext mem_context_;
O
oceanbase-admin 已提交
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
  common::ObIAllocator* alloc_;  // for buckets
  ModulePageAllocator* bloom_filter_alloc_;
  ObGbyBloomFilter* bloom_filter_;
  common::ObBitSet<common::OB_DEFAULT_BITSET_SIZE, common::ModulePageAllocator, true> right_bit_set_;
  int64_t nth_right_row_;
  int64_t ltb_size_;
  int64_t l2_cache_size_;
  int64_t price_per_row_;
  int64_t max_partition_count_per_level_;
  int64_t cur_dumped_partition_;
  HJLoopState nest_loop_state_;
  bool is_last_chunk_;
  bool has_right_bitset_;
  ObHashJoinPartition* hj_part_array_;
  ObHashJoinPartition* right_hj_part_array_;
  // store row read from partition
  const ObHashJoinStoredJoinRow* left_read_row_;
  const ObHashJoinStoredJoinRow* right_read_row_;
  bool postprocessed_left_;
  bool has_fill_right_row_;
  bool has_fill_left_row_;
693
  ObChunkDatumStore::ShadowStoredRow right_last_row_;
O
oceanbase-admin 已提交
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727
  bool need_return_;
  bool iter_end_;
  bool opt_cache_aware_;
  bool has_right_material_data_;
  bool enable_bloom_filter_;
  HashJoinHistogram* part_histograms_;
  int64_t cur_full_right_partition_;
  bool right_iter_end_;
  int64_t cur_bucket_idx_;
  int64_t max_bucket_idx_;
  bool enable_batch_;
  int64_t level1_bit_;
  int64_t level1_part_count_;
  int64_t level2_part_count_;
  PartitionSplitter right_splitter_;
  HashJoinHistogram* cur_left_hist_;
  HashJoinHistogram* cur_right_hist_;
  int64_t cur_probe_row_idx_;
  int64_t max_right_bucket_idx_;

  // statistics
  int64_t probe_cnt_;
  int64_t bitset_filter_cnt_;
  int64_t hash_link_cnt_;
  int64_t hash_equal_cnt_;
};

inline int ObHashJoinOp::init_mem_context(uint64_t tenant_id)
{
  int ret = common::OB_SUCCESS;
  if (OB_LIKELY(NULL == mem_context_)) {
    lib::ContextParam param;
    param.set_properties(lib::USE_TL_PAGE_OPTIONAL)
        .set_mem_attr(tenant_id, common::ObModIds::OB_ARENA_HASH_JOIN, common::ObCtxIds::WORK_AREA);
J
jg0 已提交
728
    if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
O
oceanbase-admin 已提交
729 730 731 732 733 734 735 736 737 738 739 740 741 742
      SQL_ENG_LOG(WARN, "create entity failed", K(ret));
    } else if (OB_ISNULL(mem_context_)) {
      SQL_ENG_LOG(WARN, "mem entity is null", K(ret));
    } else {
      alloc_ = &mem_context_->get_malloc_allocator();
    }
  }
  return ret;
}

}  // end namespace sql
}  // end namespace oceanbase

#endif /* SRC_SQL_ENGINE_JOIN_OB_HASH_JOIN_OP_H_ */