ob_chunk_datum_store.h 29.8 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_BASIC_OB_DATUM_STORE2_H_
#define OCEANBASE_BASIC_OB_DATUM_STORE2_H_

#include "share/ob_define.h"
#include "lib/container/ob_se_array.h"
#include "lib/allocator/page_arena.h"
#include "lib/utility/ob_print_utils.h"
#include "lib/list/ob_dlist.h"
#include "common/row/ob_row.h"
#include "common/row/ob_row_iterator.h"
#include "share/datum/ob_datum.h"
#include "sql/engine/expr/ob_expr.h"
#include "storage/blocksstable/ob_tmp_file.h"
#include "sql/engine/basic/ob_sql_mem_callback.h"

namespace oceanbase {
namespace sql {

// Random access row store, support disk store.
// All row must have same cell count and  projector.
class ObChunkDatumStore {
  OB_UNIS_VERSION_V(1);

G
gm 已提交
36
public:
O
oceanbase-admin 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49
  /*
   * StoredRow memory layout
   * N Datum + extend_size(can be 0) + real data
   * | datum1 | datum2 | ... | datumN | extend_size = 0 | data1 | data2 | ... | dataN |
   *     |__________________________________________________|^      ^              ^
   *              |_________________________________________________|              |
   *                             |_________________________________________________|
   */
  struct StoredRow {
    StoredRow() : cnt_(0), row_size_(0)
    {}
    int to_expr(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx) const;
    int to_expr(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx, int64_t count) const;
50 51 52 53 54 55 56 57 58 59 60 61 62 63
    // Build a stored row by exprs.
    // @param [out] sr, result stored row
    // @param epxrs,
    // @param ctx
    // @param buf
    // @param buf_len, use Block::row_store_size() to detect the needed buffer size.
    // @param extra_size, extra store size
    // @param unswizzling
    // @return OB_SUCCESS or OB_BUF_NOT_ENOUGH if buf not enough
    static int build(StoredRow *&sr, const ObExprPtrIArray &exprs, ObEvalCtx &ctx, char *buf, const int64_t buf_len,
        const uint32_t extra_size = 0);
    static int build(StoredRow *&sr, const ObExprPtrIArray &exprs, ObEvalCtx &ctx, common::ObIAllocator &alloc,
        const uint32_t extra_size = 0);

O
oceanbase-admin 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96

    inline common::ObDatum* cells()
    {
      return reinterpret_cast<common::ObDatum*>(payload_);
    }
    inline const common::ObDatum* cells() const
    {
      return reinterpret_cast<const common::ObDatum*>(payload_);
    }
    inline void* get_extra_payload() const
    {
      return static_cast<void*>(const_cast<char*>(payload_ + sizeof(ObDatum) * cnt_));
    }
    int assign(const StoredRow* sr);
    void unswizzling(char* base = NULL);
    void swizzling(char* base = NULL);

    TO_STRING_KV(K_(cnt), K_(row_size), "cells", common::ObArrayWrap<common::ObDatum>(cells(), cnt_));

    uint32_t cnt_;
    uint32_t row_size_;
    char payload_[0];
  } __attribute__((packed));

  /*
   * Considering that many scenarios will temporarily save the next row of data,
   * so write a class to provide this way, Such as Sort, MergeDisintct, etc.
   * Features:
   * 1) Provide storage from ObIArray<ObExpr*> to StoredRow
   * 2) Provide reuse mode, memory can be reused
   * 3) Provide conversion from StoredRow to ObIArray<ObExpr*>
   */
  class LastStoredRow {
G
gm 已提交
97
  public:
98 99 100 101 102 103 104
    LastStoredRow(ObIAllocator& alloc)
        : store_row_(nullptr),
          alloc_(alloc),
          max_size_(0),
          reuse_(false),
          pre_alloc_row1_(nullptr),
          pre_alloc_row2_(nullptr)
O
oceanbase-admin 已提交
105 106 107 108 109 110 111 112 113 114 115
    {}
    ~LastStoredRow()
    {}

    int save_store_row(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx, const int64_t extra_size = 0)
    {
      int ret = OB_SUCCESS;
      bool reuse = reuse_;
      char* buf = NULL;
      int64_t row_size = 0;
      int64_t buffer_len = 0;
116
      StoredRow* new_row = NULL;
O
oceanbase-admin 已提交
117 118 119 120 121
      if (0 == exprs.count()) {
        // no column. scenario like distinct 1
      } else if (OB_FAIL(ObChunkDatumStore::row_copy_size(exprs, ctx, row_size))) {
        SQL_ENG_LOG(WARN, "failed to calc copy size", K(ret));
      } else {
122
        int64_t head_size = sizeof(StoredRow);
O
oceanbase-admin 已提交
123 124
        reuse = OB_ISNULL(store_row_) ? false : reuse && (max_size_ >= row_size + head_size + extra_size);
        if (reuse && OB_NOT_NULL(store_row_)) {
125 126
          // switch buffer for write
          store_row_ = (store_row_ == pre_alloc_row1_ ? pre_alloc_row2_ : pre_alloc_row1_);
O
oceanbase-admin 已提交
127 128 129 130
          buf = reinterpret_cast<char*>(store_row_);
          new_row = store_row_;
          buffer_len = max_size_;
        } else {
131
          // alloc 2 buffer with same length
O
oceanbase-admin 已提交
132
          buffer_len = (!reuse_ ? row_size : row_size * 2) + head_size + extra_size;
133 134 135
          char* buf1 = nullptr;
          char* buf2 = nullptr;
          if (OB_ISNULL(buf1 = reinterpret_cast<char*>(alloc_.alloc(buffer_len)))) {
O
oceanbase-admin 已提交
136 137
            ret = OB_ALLOCATE_MEMORY_FAILED;
            SQL_ENG_LOG(ERROR, "alloc buf failed", K(ret));
138 139 140
          } else if (OB_ISNULL(buf2 = reinterpret_cast<char*>(alloc_.alloc(buffer_len)))) {
            ret = OB_ALLOCATE_MEMORY_FAILED;
            SQL_ENG_LOG(ERROR, "alloc buf failed", K(ret));
141
          } else if (OB_ISNULL(pre_alloc_row1_ = new (buf1) StoredRow())) {
142 143
            ret = OB_ALLOCATE_MEMORY_FAILED;
            SQL_ENG_LOG(ERROR, "failed to new row", K(ret));
144
          } else if (OB_ISNULL(pre_alloc_row2_ = new (buf2) StoredRow())) {
O
oceanbase-admin 已提交
145 146
            ret = OB_ALLOCATE_MEMORY_FAILED;
            SQL_ENG_LOG(ERROR, "failed to new row", K(ret));
147 148
          } else {
            buf = buf1;
O
oceanbase-admin 已提交
149 150 151
          }
        }
        if (OB_SUCC(ret)) {
152 153
          if (OB_FAIL(StoredRow::build(store_row_, exprs, ctx, buf, buffer_len, extra_size))) {
            SQL_ENG_LOG(WARN, "failed to build stored row", K(ret), K(buffer_len), K(row_size));
O
oceanbase-admin 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166
          } else {
            max_size_ = buffer_len;
          }
        }
      }
      return ret;
    }
    int save_store_row(const ObChunkDatumStore::StoredRow& row, const int64_t extra_size = 0)
    {
      int ret = OB_SUCCESS;
      bool reuse = reuse_;
      char* buf = NULL;
      int64_t buffer_len = 0;
167
      StoredRow* new_row = NULL;
O
oceanbase-admin 已提交
168
      int64_t row_size = row.row_size_;
169
      int64_t head_size = sizeof(StoredRow);
O
oceanbase-admin 已提交
170 171 172 173 174 175 176 177 178 179
      reuse = OB_ISNULL(store_row_) ? false : reuse && (max_size_ >= row_size + head_size + extra_size);
      if (reuse && OB_NOT_NULL(store_row_)) {
        buf = reinterpret_cast<char*>(store_row_);
        new_row = store_row_;
        buffer_len = max_size_;
      } else {
        buffer_len = (!reuse_ ? row_size : row_size * 2) + head_size + extra_size;
        if (OB_ISNULL(buf = reinterpret_cast<char*>(alloc_.alloc(buffer_len)))) {
          ret = OB_ALLOCATE_MEMORY_FAILED;
          SQL_ENG_LOG(ERROR, "alloc buf failed", K(ret));
180
        } else if (OB_ISNULL(new_row = new (buf) StoredRow())) {
O
oceanbase-admin 已提交
181 182 183 184 185 186
          ret = OB_ALLOCATE_MEMORY_FAILED;
          SQL_ENG_LOG(ERROR, "failed to new row", K(ret));
        }
      }
      if (OB_SUCC(ret)) {
        int64_t pos = head_size;
187 188
        if (OB_FAIL(new_row->assign(&row))) {
          SQL_ENG_LOG(WARN, "stored row assign failed", K(ret));
O
oceanbase-admin 已提交
189 190 191 192 193 194 195
        } else {
          max_size_ = buffer_len;
          store_row_ = new_row;
        }
      }
      return ret;
    }
196
    void set_store_row(StoredRow* in_store_row)
O
oceanbase-admin 已提交
197 198 199 200 201 202 203 204 205 206
    {
      store_row_ = in_store_row;
    }
    void reset()
    {
      store_row_ = nullptr;
      max_size_ = 0;
    }

    TO_STRING_KV(K_(max_size), K_(reuse), KPC_(store_row));
207
    StoredRow* store_row_;
O
oceanbase-admin 已提交
208 209 210
    ObIAllocator& alloc_;
    int64_t max_size_;
    bool reuse_;
211 212 213

  private:
    // To avoid writing memory overwrite, alloc 2 row for alternate writing
214 215
    StoredRow* pre_alloc_row1_;
    StoredRow* pre_alloc_row2_;
O
oceanbase-admin 已提交
216 217 218
  };

  class ShadowStoredRow {
G
gm 已提交
219
  public:
O
oceanbase-admin 已提交
220 221 222 223 224 225 226 227 228 229
    ShadowStoredRow() : alloc_(nullptr), store_row_(nullptr), saved_(false)
    {}
    ~ShadowStoredRow()
    {
      reset();
    }

    int init(common::ObIAllocator& allocator, int64_t datum_cnt)
    {
      int ret = OB_SUCCESS;
230
      int64_t buffer_len = datum_cnt * sizeof(ObDatum) + sizeof(StoredRow);
O
oceanbase-admin 已提交
231 232 233 234 235 236 237 238 239
      char* buf = nullptr;
      if (NULL != alloc_) {
        ret = common::OB_INIT_TWICE;
        SQL_ENG_LOG(WARN, "init twice", K(ret));
      } else if (OB_ISNULL(buf = reinterpret_cast<char*>(allocator.alloc(buffer_len)))) {
        ret = OB_ALLOCATE_MEMORY_FAILED;
        SQL_ENG_LOG(ERROR, "alloc buf failed", K(ret));
      } else {
        alloc_ = &allocator;
240
        store_row_ = new (buf) StoredRow();
O
oceanbase-admin 已提交
241
        store_row_->cnt_ = datum_cnt;
242
        store_row_->row_size_ = datum_cnt * sizeof(ObDatum) + sizeof(StoredRow);
O
oceanbase-admin 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
        saved_ = false;
      }
      return ret;
    }
    // copy exprs to store row
    int shadow_copy(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx)
    {
      int ret = OB_SUCCESS;
      if (OB_ISNULL(store_row_) || OB_UNLIKELY(store_row_->cnt_ != exprs.count())) {
        ret = OB_ERR_UNEXPECTED;
        SQL_ENG_LOG(WARN, "NULL datums or count mismatch", K(ret), KPC(store_row_), K(exprs.count()));
      } else {
        ObDatum* datum = nullptr;
        ObDatum* cells = store_row_->cells();
        for (int64_t i = 0; OB_SUCC(ret) && i < exprs.count(); i++) {
          if (OB_FAIL(exprs.at(i)->eval(ctx, datum))) {
            SQL_ENG_LOG(WARN, "failed to evaluate expr datum", K(ret), K(i));
          } else {
            cells[i] = *datum;
          }
        }
        saved_ = true;
      }
      return ret;
    }
    // restore store row for shadow row
    int restore(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx)
    {
      int ret = OB_SUCCESS;
      if (OB_ISNULL(store_row_)) {
        ret = common::OB_ERR_UNEXPECTED;
        SQL_ENG_LOG(WARN, "NULL store_row_", K(ret), KP(store_row_));
      } else if (saved_) {
        ret = store_row_->to_expr(exprs, ctx);
      }
      return ret;
    }
    // reset && release referenced memory
    void reset()
    {
      if (NULL != alloc_ && NULL != store_row_) {
        alloc_->free(store_row_);
      }
      alloc_ = NULL;
      store_row_ = NULL;
      saved_ = false;
    }

291
    StoredRow* get_store_row() const
O
oceanbase-admin 已提交
292 293 294 295 296
    {
      return store_row_;
    }
    TO_STRING_KV(KPC_(store_row));

G
gm 已提交
297
  private:
O
oceanbase-admin 已提交
298
    common::ObIAllocator* alloc_;
299
    StoredRow* store_row_;
O
oceanbase-admin 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
    bool saved_;
  };

  class BlockBuffer;
  struct Block {
    static const int64_t MAGIC = 0xbc054e02d8536315;
    static const int32_t ROW_HEAD_SIZE = sizeof(StoredRow);
    Block() : magic_(0), blk_size_(0), rows_(0)
    {}

    static int inline min_buf_size(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx, int64_t& size)
    {
      int ret = OB_SUCCESS;
      size = 0;
      if (OB_FAIL(row_store_size(exprs, ctx, size))) {
        SQL_ENG_LOG(WARN, "failed to calc store row size", K(ret));
      } else {
        size += BlockBuffer::HEAD_SIZE + sizeof(BlockBuffer);
      }
      return ret;
    }
    static int inline row_store_size(
        const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx, int64_t& size, uint32_t row_extend_size = 0)
    {
      int ret = OB_SUCCESS;
      size = 0;
      if (OB_FAIL(ObChunkDatumStore::row_copy_size(exprs, ctx, size))) {
        SQL_ENG_LOG(WARN, "failed to calc store row size", K(ret));
      } else {
        size += ROW_HEAD_SIZE + row_extend_size;
      }
      return ret;
    }
    static int64_t inline min_buf_size(const int64_t row_store_size)
    {
      return BlockBuffer::HEAD_SIZE + sizeof(BlockBuffer) + row_store_size;
    }

    // following interface for ObDatum only,unused for now
    static int64_t inline min_buf_size(common::ObDatum* datums, const int64_t cnt)
    {
      return BlockBuffer::HEAD_SIZE + sizeof(BlockBuffer) + row_store_size(datums, cnt);
    }
    static int64_t inline row_store_size(common::ObDatum* datums, const int64_t cnt, uint32_t row_extend_size = 0)
    {
      return ROW_HEAD_SIZE + row_extend_size + ObChunkDatumStore::row_copy_size(datums, cnt);
    }

    int append_row(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx* ctx, BlockBuffer* buf, int64_t row_extend_size,
        StoredRow** stored_row = nullptr);
    int add_row(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx, const int64_t row_size,
        uint32_t row_extend_size, StoredRow** stored_row = nullptr);
    int copy_stored_row(const StoredRow& stored_row, StoredRow** dst_sr);
    // copy block payload to unswizzling_payload
    // orginal payload memory unchanged
    int gen_unswizzling_payload(char* unswizzling_payload, uint32 size);
    int unswizzling();
    int swizzling(int64_t* col_cnt);
    inline bool magic_check()
    {
      return MAGIC == magic_;
    }
    int get_store_row(int64_t& cur_pos, const StoredRow*& sr);
    inline Block* get_next() const
    {
      return next_;
    }
    inline bool is_empty()
    {
      return get_buffer()->is_empty();
    }
    inline void set_block_size(uint32 blk_size)
    {
      blk_size_ = blk_size;
    }
    inline BlockBuffer* get_buffer()
    {
      return static_cast<BlockBuffer*>(static_cast<void*>(payload_ + blk_size_ - BlockBuffer::HEAD_SIZE));
    }
    inline int64_t data_size()
    {
      return get_buffer()->data_size();
    }
    inline uint32_t rows()
    {
      return rows_;
    }
    inline int64_t remain()
    {
      return get_buffer()->remain();
    }
    friend class BlockBuffer;
    TO_STRING_KV(K_(magic), K_(blk_size), K_(rows));
    union {
      int64_t magic_;  // for dump
      Block* next_;    // for block list in mem
    };
    uint32 blk_size_; /* current blk's size, for dump/read */
    uint32 rows_;
    char payload_[0];
  } __attribute__((packed));

  struct BlockList {
G
gm 已提交
403
  public:
O
oceanbase-admin 已提交
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
    BlockList() : head_(NULL), last_(NULL), size_(0)
    {}
    inline int64_t get_size() const
    {
      return size_;
    }
    inline bool is_empty() const
    {
      return size_ == 0;
    }
    inline Block* get_first() const
    {
      return head_;
    }
    inline void reset()
    {
      size_ = 0;
      head_ = NULL;
      last_ = NULL;
    }

    inline void add_last(Block* blk)
    {
      if (NULL == head_) {
        head_ = blk;
        last_ = blk;
        blk->next_ = NULL;
      } else {
        last_->next_ = blk;
        blk->next_ = NULL;
        last_ = blk;
      }
      size_++;
    }

    inline Block* remove_first()
    {
      Block* cur = head_;
      if (NULL != head_) {
        head_ = head_->next_;
        cur->next_ = NULL;
        size_--;
        if (0 == size_) {
          last_ = NULL;
        }
      }
      return cur;
    }
    TO_STRING_KV(K_(size), K_(head), K_(last), K_(*head), K_(last));

G
gm 已提交
454
  private:
O
oceanbase-admin 已提交
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
    Block* head_;
    Block* last_;
    int64_t size_;
  };

  /* contiguous memory:
   * |----------------|---Block
   * |next_           |-|-------------|
   * |cnt_            | |--HEAD_SIZE  |
   * |block_size_     |-|             |--block_size
   * |payload[]       |               |
   * |                |---------------|
   * |----------------|--BlockBuffer
   * |data->BlockHead |-|
   * |cur_pos         | |--TAIL_SIZE
   * |cap_=block_size |-|
   * |----------------|
   * */
  class BlockBuffer {
G
gm 已提交
474
  public:
O
oceanbase-admin 已提交
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
    static const int64_t HEAD_SIZE = sizeof(Block); /* n_rows, check_sum */
    BlockBuffer() : data_(NULL), cur_pos_(0), cap_(0)
    {}

    int init(char* buf, const int64_t buf_size);
    inline int64_t remain() const
    {
      return cap_ - cur_pos_;
    }
    inline char* data()
    {
      return data_;
    }
    inline Block* get_block()
    {
      return block;
    }
    inline char* head() const
    {
      return data_ + cur_pos_;
    }
    inline int64_t capacity() const
    {
      return cap_;
    }
    inline int64_t mem_size() const
    {
      return cap_ + sizeof(BlockBuffer);
    }
    inline int64_t data_size() const
    {
      return cur_pos_;
    }
    inline bool is_inited() const
    {
      return NULL != data_;
    }
    inline bool is_empty() const
    {
      return HEAD_SIZE >= cur_pos_;
    }

    inline void reset()
    {
      cur_pos_ = 0;
      cap_ = 0;
      data_ = NULL;
    }
    inline void reuse()
    {
      cur_pos_ = 0;
      advance(HEAD_SIZE);
      block->rows_ = 0;
    }
    inline int advance(int64_t size);

    TO_STRING_KV(KP_(data), K_(cur_pos), K_(cap));

    friend ObChunkDatumStore;
    friend Block;

G
gm 已提交
536
  private:
O
oceanbase-admin 已提交
537 538 539 540 541 542 543 544 545 546
    union {
      char* data_;
      Block* block;
    };
    int64_t cur_pos_;
    int64_t cap_;
  };

  class ChunkIterator;
  class RowIterator {
G
gm 已提交
547
  public:
O
oceanbase-admin 已提交
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
    friend class ObChunkDatumStore;
    RowIterator();
    virtual ~RowIterator()
    {
      reset();
    }
    int init(ChunkIterator* chunk_it);

    /* from StoredRow to NewRow */
    int get_next_row(const StoredRow*& sr);
    int get_next_block_row(const StoredRow*& sr);
    int convert_to_row(const StoredRow* sr, const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx);

    void reset()
    {
      reset_cursor();
    }
    bool is_valid() const
    {
      return store_ != NULL && cur_iter_blk_ != NULL;
    }
    inline bool cur_blk_has_next() const
    {
      return (cur_iter_blk_ != NULL && cur_row_in_blk_ < cur_iter_blk_->rows_);
    }
    inline bool has_next() const
    {
      return cur_iter_blk_ != NULL && (cur_iter_blk_->get_next() != NULL || cur_row_in_blk_ < cur_iter_blk_->rows_);
    }

    TO_STRING_KV(KP_(store), K_(*store), K_(cur_iter_blk), K_(cur_row_in_blk), K_(cur_pos_in_blk), K_(n_blocks),
        K_(cur_nth_block));

G
gm 已提交
581
  private:
O
oceanbase-admin 已提交
582 583 584 585 586 587 588 589 590 591
    explicit RowIterator(ObChunkDatumStore* row_store);
    void reset_cursor()
    {
      cur_iter_blk_ = NULL;
      cur_row_in_blk_ = 0;
      cur_pos_in_blk_ = 0;
      n_blocks_ = 0;
      cur_nth_block_ = 0;
    }

G
gm 已提交
592
  protected:
O
oceanbase-admin 已提交
593 594 595 596 597 598 599 600 601
    ObChunkDatumStore* store_;
    Block* cur_iter_blk_;
    int64_t cur_row_in_blk_;  // cur nth row in cur block for in-mem debug
    int64_t cur_pos_in_blk_;  // cur nth row in cur block
    int64_t n_blocks_;
    int64_t cur_nth_block_;
  };

  class ChunkIterator {
G
gm 已提交
602
  public:
O
oceanbase-admin 已提交
603 604
    enum IterEndState { PROCESSING = 0x00, MEM_ITER_END = 0x01, DISK_ITER_END = 0x02 };

G
gm 已提交
605
  public:
O
oceanbase-admin 已提交
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659
    friend class ObChunkDatumStore;
    ChunkIterator();
    virtual ~ChunkIterator();
    int init(ObChunkDatumStore* row_store, int64_t chunk_read_size = 0);
    int load_next_chunk(RowIterator& it);
    inline bool has_next_chunk()
    {
      return store_->n_blocks_ > 0 && (cur_nth_blk_ < store_->n_blocks_ - 1);
    }
    void set_chunk_read_size(int64_t chunk_read_size)
    {
      chunk_read_size_ = chunk_read_size;
    }
    inline int64_t get_chunk_read_size()
    {
      return chunk_read_size_;
    }
    inline int64_t get_row_cnt() const
    {
      return store_->get_row_cnt();
    }
    inline int64_t get_cur_chunk_row_cnt() const
    {
      return chunk_n_rows_;
    }
    inline int64_t get_chunk_read_size() const
    {
      return chunk_read_size_;
    }
    void reset();
    inline bool is_valid()
    {
      return store_ != NULL;
    }
    inline bool read_file_iter_end()
    {
      return iter_end_flag_ & DISK_ITER_END;
    }
    inline void set_read_file_iter_end()
    {
      iter_end_flag_ |= DISK_ITER_END;
    }
    inline bool read_mem_iter_end()
    {
      return iter_end_flag_ & MEM_ITER_END;
    }
    inline void set_read_mem_iter_end()
    {
      iter_end_flag_ |= MEM_ITER_END;
    }

    TO_STRING_KV(KP_(store), KP_(cur_iter_blk), KP_(cur_iter_blk_buf), K_(cur_chunk_n_blocks), K_(cur_iter_pos),
        K_(file_size), K_(chunk_read_size), KP_(chunk_mem));

G
gm 已提交
660
  private:
O
oceanbase-admin 已提交
661 662
    void reset_cursor(const int64_t file_size);

G
gm 已提交
663
  protected:
O
oceanbase-admin 已提交
664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
    ObChunkDatumStore* store_;
    Block* cur_iter_blk_;
    BlockBuffer* cur_iter_blk_buf_; /*for reuse of cur_iter_blk_;
                                      cause Block::get_buffer() depends on blk_size_
                                      but blk_size_ will change with block reusing
                                    */
    Block* swap_iter_blk_;
    blocksstable::ObTmpFileIOHandle aio_read_handle_;
    blocksstable::ObTmpFileIOHandle swap_aio_read_handle_;
    blocksstable::ObTmpFileIOHandle* cur_aio_read_handle_;
    blocksstable::ObTmpFileIOHandle* next_aio_read_handle_;
    bool next_iter_end_;
    int64_t cur_nth_blk_;         // reading nth blk start from 1
    int64_t cur_chunk_n_blocks_;  // the number of blocks of current chunk
    int64_t cur_iter_pos_;        // pos in file
    int64_t file_size_;
    int64_t chunk_read_size_;
    char* chunk_mem_;
    int64_t chunk_n_rows_;
    int32_t iter_end_flag_;
  };

  class Iterator {
G
gm 已提交
687
  public:
O
oceanbase-admin 已提交
688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727
    friend class ObChunkDatumStore;
    Iterator() : start_iter_(false)
    {}
    virtual ~Iterator()
    {}
    int init(ObChunkDatumStore* row_store, int64_t chunk_read_size = 0);
    void set_chunk_read_size(int64_t chunk_read_size)
    {
      chunk_it_.set_chunk_read_size(chunk_read_size);
    }
    int get_next_row(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx, const StoredRow** sr = nullptr);
    int get_next_row(common::ObDatum** datums);
    int get_next_row(const StoredRow*& sr);
    int get_next_row_skip_const(ObEvalCtx& ctx, const common::ObIArray<ObExpr*>& exprs);
    int convert_to_row(const StoredRow* sr, const common::ObIArray<ObExpr*>& exprs, ObEvalCtx& ctx)
    {
      return row_it_.convert_to_row(sr, exprs, ctx);
    }
    // unsed
    int convert_to_row(const StoredRow* sr, common::ObDatum** datums);

    void reset()
    {
      row_it_.reset();
      chunk_it_.reset();
      start_iter_ = false;
    }
    inline bool has_next()
    {
      return chunk_it_.has_next_chunk() || (row_it_.is_valid() && row_it_.has_next());
    }
    bool is_valid()
    {
      return chunk_it_.is_valid();
    }
    inline int64_t get_chunk_read_size()
    {
      return chunk_it_.get_chunk_read_size();
    }

G
gm 已提交
728
  private:
O
oceanbase-admin 已提交
729 730
    explicit Iterator(ObChunkDatumStore* row_store);

G
gm 已提交
731
  protected:
O
oceanbase-admin 已提交
732 733 734 735 736
    bool start_iter_;
    ChunkIterator chunk_it_;
    RowIterator row_it_;
  };

G
gm 已提交
737
public:
O
oceanbase-admin 已提交
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
  const static int64_t BLOCK_SIZE = (64L << 10);
  static const int32_t DATUM_SIZE = sizeof(common::ObDatum);

  explicit ObChunkDatumStore(common::ObIAllocator* alloc = NULL);

  virtual ~ObChunkDatumStore()
  {
    reset();
  }

  int init(int64_t mem_limit, uint64_t tenant_id = common::OB_SERVER_TENANT_ID,
      int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID,
      const char* label = common::ObModIds::OB_SQL_CHUNK_ROW_STORE, bool enable_dump = true,
      uint32_t row_extra_size = 0);

  void set_allocator(common::ObIAllocator& alloc)
  {
    allocator_ = &alloc;
  }

  void reset();
  // Keeping one memory block, reader must call reuse() too.
  void reuse();

  /// begin iterator
  int begin(ChunkIterator& it, int64_t chunk_read_size = 0)
  {
    return it.init(this, chunk_read_size);
  }

  int begin(Iterator& it, int64_t chunk_read_size = 0)
  {
    return it.init(this, chunk_read_size);
  }

  int add_row(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx* ctx, StoredRow** stored_row = nullptr);
  int add_row(const StoredRow& sr, StoredRow** stored_row = nullptr);
  int add_row(const StoredRow& sr, ObEvalCtx* ctx, StoredRow** stored_row = nullptr);
  int finish_add_row(bool need_dump = true);

  // Try add row to row store if the memory not exceeded after row added.
  // return OB_SUCCESS too when row not added (%row_added flag set to false).
  int try_add_row(const common::ObIArray<ObExpr*>& exprs, ObEvalCtx* ctx, const int64_t memory_limit, bool& row_added);
  OB_INLINE bool is_inited() const
  {
    return inited_;
  }
  bool is_file_open() const
  {
    return io_.fd_ >= 0;
  }

  // void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; }
  // void set_mem_ctx_id(const int64_t ctx_id) { ctx_id_ = ctx_id; }
  void set_mem_limit(const int64_t limit)
  {
    mem_limit_ = limit;
  }
  void set_dumped(bool dumped)
  {
    enable_dump_ = dumped;
  }
  inline int64_t get_mem_limit()
  {
    return mem_limit_;
  }
  void set_block_size(const int64_t size)
  {
    default_block_size_ = size;
  }
  inline int64_t get_block_cnt() const
  {
    return n_blocks_;
  }
  inline int64_t get_block_list_cnt()
  {
    return blocks_.get_size();
  }
  inline int64_t get_row_cnt() const
  {
    return row_cnt_;
  }
  inline int64_t get_col_cnt() const
  {
    return col_count_;
  }
  inline int64_t get_mem_hold() const
  {
    return mem_hold_;
  }
  inline int64_t get_mem_used() const
  {
    return mem_used_;
  }
  inline int64_t get_file_size() const
  {
    return file_size_;
  }
  inline int64_t min_blk_size(const int64_t row_store_size)
  {
    int64_t size = std::max(std::max(static_cast<int64_t>(BLOCK_SIZE), default_block_size_), row_store_size);
    size = next_pow2(size);
    return size;
  }
  static int init_block_buffer(void* mem, const int64_t size, Block*& block);
  int add_block(Block* block, bool need_swizzling, bool* added = nullptr);
  int append_block(char* buf, int size, bool need_swizzling);
  void remove_added_blocks();
  bool has_dumped()
  {
    return has_dumped_;
  }
  inline int64_t get_row_cnt_in_memory() const
  {
    return row_cnt_ - dumped_row_cnt_;
  }
  inline int64_t get_row_cnt_on_disk() const
  {
    return dumped_row_cnt_;
  }
  void set_callback(ObSqlMemoryCallback* callback)
  {
    callback_ = callback;
  }
  int dump(bool reuse, bool all_dump);
  void set_dir_id(int64_t dir_id)
  {
    io_.dir_id_ = dir_id;
  }
  int alloc_dir_id();
  TO_STRING_KV(K_(tenant_id), K_(label), K_(ctx_id), K_(mem_limit), K_(row_cnt), K_(file_size));

  int append_datum_store(const ObChunkDatumStore& other_store);
  int assign(const ObChunkDatumStore& other_store);
  bool is_empty() const
  {
    return blocks_.is_empty();
  }

  int update_iterator(Iterator& org_it);
  int clean_block(Block* clean_block);

G
gm 已提交
880
private:
O
oceanbase-admin 已提交
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
  OB_INLINE int add_row(
      const common::ObIArray<ObExpr*>& exprs, ObEvalCtx* ctx, const int64_t row_size, StoredRow** stored_row);
  static int get_timeout(int64_t& timeout_ms);
  void* alloc_blk_mem(const int64_t size, const bool for_iterator);
  void free_blk_mem(void* mem, const int64_t size = 0);
  void free_block(Block* item);
  void free_blk_list();
  bool shrink_block(int64_t size);
  int alloc_block_buffer(Block*& block, const int64_t min_size, const bool for_iterator);
  int alloc_block_buffer(Block*& block, const int64_t data_size, const int64_t min_size, const bool for_iterator);
  // new block is not needed if %min_size is zero. (finish add row)
  int switch_block(const int64_t min_size);
  int clean_memory_data(bool reuse);

  inline void use_block(Block* item)
  {
    cur_blk_ = item;
    cur_blk_buffer_ = cur_blk_->get_buffer();
    int64_t used = cur_blk_buffer_->capacity() + sizeof(BlockBuffer);
    mem_used_ += used;
  }
  inline int dump_one_block(BlockBuffer* item);

  int write_file(void* buf, int64_t size);
L
ls0 已提交
905 906
  int read_file(void *buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle &handle,
      const int64_t file_size, const int64_t cur_pos);
O
oceanbase-admin 已提交
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
  int aio_read_file(void* buf, const int64_t size, const int64_t offset, blocksstable::ObTmpFileIOHandle& handle);
  int aio_read_file(ChunkIterator& it, int64_t read_size);
  bool need_dump(int64_t extra_size);
  BlockBuffer* new_block();
  void set_io(int64_t size, char* buf)
  {
    io_.size_ = size;
    io_.buf_ = buf;
  }
  bool find_block_can_hold(const int64_t size, bool& need_shrink);
  int get_store_row(RowIterator& it, const StoredRow*& sr);
  int load_next_block(ChunkIterator& it);
  int load_next_chunk_blocks(ChunkIterator& it);
  inline void callback_alloc(int64_t size)
  {
    if (callback_ != nullptr)
      callback_->alloc(size);
  }
  inline void callback_free(int64_t size)
  {
    if (callback_ != nullptr)
      callback_->free(size);
  }

931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
  static inline int row_copy_size(const common::ObIArray<ObExpr *> &exprs, ObEvalCtx &ctx, int64_t &size)
  {
    int ret = OB_SUCCESS;
    common::ObDatum *datum = nullptr;
    size = DATUM_SIZE * exprs.count();
    for (int64_t i = 0; i < exprs.count() && OB_SUCC(ret); ++i) {
      if (OB_FAIL(exprs.at(i)->eval(ctx, datum))) {
        SQL_ENG_LOG(WARN, "failed to eval expr datum", KPC(exprs.at(i)), K(ret));
      } else {
        size += datum->len_;
      }
    }
    return ret;
  }

  static inline int64_t row_copy_size(common::ObDatum *datums, const int64_t cnt)
  {
    int64_t size = DATUM_SIZE * cnt;
    for (int64_t i = 0; i < cnt; ++i) {
      size += datums[i].len_;
    }
    return size;
  }

G
gm 已提交
955
private:
O
oceanbase-admin 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995
  bool inited_;
  uint64_t tenant_id_;
  const char* label_;
  int64_t ctx_id_;
  int64_t mem_limit_;

  Block* cur_blk_;
  BlockBuffer* cur_blk_buffer_;
  BlockList blocks_;            // ASSERT: all linked blocks has at least one row stored
  BlockList free_list_;         // empty blocks
  int64_t max_blk_size_;        // max block ever allocated
  int64_t min_blk_size_;        // min block ever allocated
  int64_t default_block_size_;  // default(min) block size; blocks larger then this will not be reused
  int64_t n_blocks_;
  int64_t row_cnt_;
  int64_t col_count_;

  blocksstable::ObTmpFileIOHandle aio_write_handle_;

  bool enable_dump_;
  bool has_dumped_;
  int64_t dumped_row_cnt_;

  // int fd_;
  blocksstable::ObTmpFileIOInfo io_;
  int64_t file_size_;
  int64_t n_block_in_file_;

  // BlockList blocks_;  // ASSERT: all linked blocks has at least one row stored
  int64_t mem_hold_;
  int64_t mem_used_;
  common::DefaultPageAllocator inner_allocator_;
  common::ObIAllocator* allocator_;

  uint32_t row_extend_size_;
  ObSqlMemoryCallback* callback_;

  DISALLOW_COPY_AND_ASSIGN(ObChunkDatumStore);
};

996 997
typedef ObChunkDatumStore::StoredRow ObStoredDatumRow;

O
oceanbase-admin 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
inline int ObChunkDatumStore::BlockBuffer::advance(int64_t size)
{
  int ret = common::OB_SUCCESS;
  if (size < -cur_pos_) {
    // overflow
    ret = common::OB_INVALID_ARGUMENT;
    SQL_ENG_LOG(WARN, "invalid argument", K(size), K_(cur_pos));
  } else if (size > remain()) {
    ret = common::OB_BUF_NOT_ENOUGH;
    SQL_ENG_LOG(WARN, "buffer not enough", K(size), "remain", remain());
  } else {
    cur_pos_ += size;
  }
  return ret;
}
}  // end namespace sql
}  // end namespace oceanbase

#endif  // OCEANBASE_BASIC_OB_DATUM_STORE2_H_