ob_macro_block_writer.h 9.7 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_STORAGE_BLOCKSSTABLE_MACRO_BLOCK_WRITER_H_
#define OCEANBASE_STORAGE_BLOCKSSTABLE_MACRO_BLOCK_WRITER_H_
#include "ob_micro_block_writer.h"
#include "ob_micro_block_index_writer.h"
#include "ob_micro_block_reader.h"
#include "lib/compress/ob_compressor.h"
#include "lib/io/ob_io_manager.h"
#include "lib/container/ob_array_wrap.h"
#include "share/schema/ob_table_schema.h"
#include "ob_row_reader.h"
#include "ob_column_map.h"
#include "ob_micro_block_reader.h"
#include "storage/ob_multi_version_col_desc_generate.h"
#include "ob_store_file.h"
#include "storage/blocksstable/ob_block_mark_deletion_maker.h"
#include "storage/blocksstable/ob_macro_block.h"
#include "storage/blocksstable/ob_lob_merge_writer.h"
#include "storage/blocksstable/ob_store_file_system.h"
#include "storage/blocksstable/ob_macro_block_reader.h"
#include "storage/blocksstable/ob_sparse_micro_block_reader.h"
#include "storage/ob_pg_mgr.h"
#include "ob_block_index_intermediate.h"

namespace oceanbase {
namespace blocksstable {

// macro block store struct
//  |- ObCommonHeader
//  |- ObSSTableMacroBlockHeader
//  |- column id list
//  |- column type list
//  |- column checksum
//  |- MicroBlock 1
//  |- MicroBlock 2
//  |- MicroBlock N
//  |- MicroBlock index
//  |- index endkey char stream
class ObMacroBlockWriter {
G
gm 已提交
51
public:
O
oceanbase-admin 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
  ObMacroBlockWriter();
  virtual ~ObMacroBlockWriter();
  void reset();
  int open(ObDataStoreDesc& data_store_desc, const ObMacroDataSeq& start_seq,
      const ObIArray<ObMacroBlockInfoPair>* lob_blocks = NULL, ObMacroBlockWriter* index_writer = NULL);
  int append_macro_block(const ObMacroBlockCtx& macro_block_ctx);
  int append_micro_block(const ObMicroBlock& micro_block);
  int append_row(const storage::ObStoreRow& row, const bool virtual_append = false);
  int close(storage::ObStoreRow* root = NULL, char* root_buf = NULL);
  inline ObMacroBlocksWriteCtx& get_macro_block_write_ctx()
  {
    return block_write_ctx_;
  }
  inline ObMacroBlocksWriteCtx& get_lob_macro_block_write_ctx()
  {
    return lob_writer_.get_macro_block_write_ctx();
  }
  inline ObMacroBlocksWriteCtx& get_index_macro_block_write_ctx()
  {
    return task_index_writer_->get_macro_block_write_ctx();
  }
O
obdev 已提交
73
  int dump_micro_block_writer_buffer();
O
oceanbase-admin 已提交
74 75 76
  TO_STRING_KV(K_(block_write_ctx));

  struct IndexMicroBlockBuilder {
G
gm 已提交
77
  public:
O
oceanbase-admin 已提交
78 79 80 81 82 83 84 85
    IndexMicroBlockBuilder()
    {}
    virtual ~IndexMicroBlockBuilder() = default;
    TO_STRING_KV(K(writer_.get_row_count()));
    ObMicroBlockWriter writer_;
    ObBlockIntermediateBuilder row_builder_;
  };
  struct IndexMicroBlockDesc {
G
gm 已提交
86
  public:
O
oceanbase-admin 已提交
87 88 89 90 91 92 93 94 95
    IndexMicroBlockDesc()
    {}
    virtual ~IndexMicroBlockDesc() = default;
    TO_STRING_KV(K(writer_.get_row_count()));
    ObMicroBlockWriter writer_;
    char last_key_buf_[common::OB_MAX_ROW_KEY_LENGTH];
    common::ObStoreRowkey last_key_;
  };
  struct IndexMicroBlockDescCompare final {
G
gm 已提交
96
  public:
O
oceanbase-admin 已提交
97 98 99 100 101 102 103 104 105 106
    IndexMicroBlockDescCompare()
    {}
    ~IndexMicroBlockDescCompare() = default;
    bool operator()(const IndexMicroBlockDesc* left, const IndexMicroBlockDesc* right)
    {
      int32_t cmp_ret = left->last_key_.compare(right->last_key_);
      return cmp_ret < 0;
    }
  };

G
gm 已提交
107
private:
O
oceanbase-admin 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  int append_row(const storage::ObStoreRow& row, const int64_t split_size, const bool ignore_lob = false);
  int check_order(const storage::ObStoreRow& row);
  int build_micro_block(const bool force_split = false);
  int build_micro_block_desc(const ObMicroBlock& micro_block, ObMicroBlockDesc& micro_block_desc);
  int build_micro_block_desc_with_rewrite(const ObMicroBlock& micro_block, ObMicroBlockDesc& micro_block_desc);
  int build_micro_block_desc_with_reuse(const ObMicroBlock& micro_block, ObMicroBlockDesc& micro_block_desc);
  int write_micro_block(const ObMicroBlockDesc& micro_block_desc, const bool force_split = false);
  int check_micro_block_need_merge(const ObMicroBlock& micro_block, bool& need_merge);
  int merge_micro_block(const ObMicroBlock& micro_block);
  int flush_macro_block(ObMacroBlock& macro_block);
  int flush_index_macro_block(ObMacroBlock& macro_block);
  int try_switch_macro_block();
  int wait_io_finish();
  int check_write_complete(const MacroBlockId& macro_block_id);
  int can_mark_deletion(const ObStoreRowkey& start_key, const ObStoreRowkey& end_key, bool& can_mark_deletion);
  int save_last_key(const ObStoreRowkey& last_key);
  int save_pre_micro_last_key(const ObStoreRowkey& pre_micro_last_key);
  int add_row_checksum(const common::ObNewRow& row);
  int calc_micro_column_checksum(const int64_t column_cnt, ObIMicroBlockReader& reader, int64_t* column_checksum);
  int flush_reuse_macro_block(const ObMacroBlockCtx& macro_block_ctx);
  ObIMicroBlockReader* get_micro_block_reader(const int64_t row_store_type);
  inline bool enable_sparse_format() const
  {
    return data_store_desc_->enable_sparse_format();
  }
  int check_micro_block_checksum(const char* buf, const int64_t size,
      const ObIMicroBlockWriter* micro_writer /*do checksum for this micro writer*/);
  int check_micro_block(const char* compressed_buf, const int64_t compressed_size, const char* uncompressed_buf,
      const int64_t uncompressed_size, ObIMicroBlockWriter* micro_writer /*check for this micro writer*/);
  int build_column_map(const ObDataStoreDesc* data_desc, ObColumnMap& column_map);
  int open_bf_cache_writer(const ObDataStoreDesc& desc);
  int flush_bf_to_cache(ObMacroBloomFilterCacheWriter& bf_cache_writer, const int32_t row_count);

G
gm 已提交
141
private:
O
oceanbase-admin 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
  int build_index_micro_block(const int32_t height, const storage::ObStoreRow*& mid_row);
  int build_intermediate_row(const ObString& rowkey, const ObBlockIntermediateHeader& header,
      ObBlockIntermediateBuilder& builder, const storage::ObStoreRow*& row);
  int build_intermediate_row(const ObStoreRowkey& rowkey, const ObBlockIntermediateHeader& header,
      ObBlockIntermediateBuilder& builder, const storage::ObStoreRow*& row);
  int close_index_tree();
  int create_index_block_builder();
  int do_sort_micro_block();
  OB_INLINE int64_t get_current_macro_seq()
  {
    return current_macro_seq_;
  }
  int get_root_row(storage::ObStoreRow*& root, char* root_buf);
  int get_index_block_desc(IndexMicroBlockDesc*& index_micro_block_desc);
  int merge_root_micro_block();
  int save_root_micro_block();
  int update_index_tree(const int32_t height, const storage::ObStoreRow* intermediate_row);
  int write_micro_block(const ObMicroBlockDesc& micro_block_desc, int64_t& data_offset);

  int prepare_micro_block_reader(const char* buf, const int64_t size, ObIMicroBlockReader*& micro_reader);
  int print_micro_block_row(ObIMicroBlockReader* micro_reader);

G
gm 已提交
164
private:
O
oceanbase-admin 已提交
165 166 167 168 169 170 171 172
  static const int64_t DEFAULT_MACRO_BLOCK_COUNT = 128;
  static const int64_t DEFAULT_MICRO_BLOCK_TREE_HIGH = 4;
  static const int64_t DEFAULT_MICRO_BLOCK_WRITER_COUNT = 64;
  static const int64_t INDEX_MACRO_BLOCK_MAX_SEQ_NUM = 0x100000;  // 1048576
  typedef common::ObSEArray<MacroBlockId, DEFAULT_MACRO_BLOCK_COUNT> MacroBlockList;
  typedef common::ObSEArray<IndexMicroBlockBuilder*, DEFAULT_MICRO_BLOCK_TREE_HIGH> IndexMicroBlockBuildList;
  typedef common::ObSEArray<IndexMicroBlockDesc*, DEFAULT_MICRO_BLOCK_WRITER_COUNT> IndexMicroBlockDescList;

G
gm 已提交
173
private:
O
oceanbase-admin 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
  ObDataStoreDesc* data_store_desc_;
  ObDataStoreDesc* index_store_desc_;
  ObMicroBlockCompressor compressor_;
  IndexMicroBlockBuildList index_block_builders_;
  IndexMicroBlockDescList task_top_block_descs_;
  ObIMicroBlockWriter* micro_writer_;
  ObMicroBlockWriter flat_writer_;
  ObRowWriter row_writer_;
  char rowkey_buf_[common::OB_MAX_ROW_KEY_LENGTH];
  ObMicroBlockReader flat_reader_;
  ObSparseMicroBlockReader sparse_reader_;
  ObMacroBlockWriter* sstable_index_writer_;
  ObMacroBlockWriter* task_index_writer_;
  ObMacroBlock macro_blocks_[2];
  ObMacroBloomFilterCacheWriter bf_cache_writer_[2];  // associate with macro_blocks
  int64_t current_index_;
  int64_t current_macro_seq_;  // set by sstable layer;
  ObMacroBlocksWriteCtx block_write_ctx_;
  ObMacroBlockHandle macro_handle_;
  char last_key_buf_[common::OB_MAX_ROW_KEY_LENGTH];
  common::ObStoreRowkey last_key_;
  bool last_key_with_L_flag_;
  // for mark deletion
  ObBlockMarkDeletionMaker* mark_deletion_maker_;
  bool need_deletion_check_;
  bool pre_deletion_flag_;  // record the delete flag of prev micro block
  char pre_micro_last_key_buf_[common::OB_MAX_ROW_KEY_LENGTH];
  common::ObStoreRowkey pre_micro_last_key_;
  bool has_lob_;
  blocksstable::ObLobMergeWriter lob_writer_;
  int64_t* curr_micro_column_checksum_;
  common::ObArenaAllocator allocator_;
  ObColumnMap column_map_;
  ObColumnMap index_column_map_;
  blocksstable::ObMacroBlockReader macro_reader_;
  char obj_buf_[common::OB_ROW_MAX_COLUMNS_COUNT * sizeof(common::ObObj)];          // for reader to get row
  char checker_obj_buf_[common::OB_ROW_MAX_COLUMNS_COUNT * sizeof(common::ObObj)];  // for calc or varify checksum, can
                                                                                    // NOT use same buf of data row
  ObMicroBlockReader check_flat_reader_;
  ObSparseMicroBlockReader check_sparse_reader_;
  common::ObArray<uint32_t> micro_rowkey_hashs_;
  storage::ObSSTableRowkeyHelper* rowkey_helper_;
  ObSSTableMacroBlockChecker macro_block_checker_;
  common::SpinRWLock lock_;
};

}  // end namespace blocksstable
}  // end namespace oceanbase
#endif