ob_table_insert_up_op.h 7.4 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_DML_OB_TABLE_INSERT_UP_OP_H_
#define OCEANBASE_DML_OB_TABLE_INSERT_UP_OP_H_

#include "sql/engine/dml/ob_table_modify_op.h"
W
wangzelin.wzl 已提交
17 18 19 20 21 22 23 24
#include "sql/engine/dml/ob_conflict_checker.h"

namespace oceanbase
{
namespace sql
{
class ObTableInsertUpOpInput : public ObTableModifyOpInput
{
O
oceanbase-admin 已提交
25
  OB_UNIS_VERSION_V(1);
G
gm 已提交
26
public:
W
wangzelin.wzl 已提交
27 28 29 30 31
  ObTableInsertUpOpInput(ObExecContext &ctx, const ObOpSpec &spec)
      : ObTableModifyOpInput(ctx, spec)
  {
  }
  virtual int init(ObTaskInfo &task_info) override
O
oceanbase-admin 已提交
32 33 34 35 36
  {
    return ObTableModifyOpInput::init(task_info);
  }
};

W
wangzelin.wzl 已提交
37 38
class ObTableInsertUpSpec : public ObTableModifySpec
{
O
oceanbase-admin 已提交
39
  OB_UNIS_VERSION_V(1);
W
wangzelin.wzl 已提交
40
  typedef common::ObArrayWrap<ObInsertUpCtDef*> InsUpdCtDefArray;
G
gm 已提交
41
public:
W
wangzelin.wzl 已提交
42 43 44 45 46 47 48 49
  ObTableInsertUpSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type)
    : ObTableModifySpec(alloc, type),
      insert_up_ctdefs_(),
      conflict_checker_ctdef_(alloc),
      all_saved_exprs_(alloc),
      alloc_(alloc)
  {
  }
O
oceanbase-admin 已提交
50

W
wangzelin.wzl 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63
  //This interface is only allowed to be used in a single-table DML operator,
  //it is invalid when multiple tables are modified in one DML operator
  virtual int get_single_dml_ctdef(const ObDMLBaseCtDef *&dml_ctdef) const override
  {
    int ret = common::OB_SUCCESS;
    if (OB_UNLIKELY(insert_up_ctdefs_.count() != 1)) {
      ret = common::OB_ERR_UNEXPECTED;
      SQL_ENG_LOG(WARN, "table ctdef is invalid", K(ret), K(insert_up_ctdefs_.count()));
    } else {
      dml_ctdef = insert_up_ctdefs_.at(0)->ins_ctdef_;
    }
    return ret;
  }
O
oceanbase-admin 已提交
64

W
wangzelin.wzl 已提交
65 66 67 68 69 70
  InsUpdCtDefArray insert_up_ctdefs_;
  ObConflictCheckerCtdef conflict_checker_ctdef_;
  // insert_row + child_->output_
  ExprFixedArray all_saved_exprs_;
protected:
  common::ObIAllocator &alloc_;
O
oceanbase-admin 已提交
71 72
};

W
wangzelin.wzl 已提交
73 74
class ObTableInsertUpOp : public ObTableModifyOp
{
G
gm 已提交
75
public:
W
wangzelin.wzl 已提交
76 77 78 79 80 81 82 83 84 85 86
  ObTableInsertUpOp(ObExecContext &ctx, const ObOpSpec &spec, ObOpInput *input)
    : ObTableModifyOp(ctx, spec, input),
      insert_rows_(0),
      upd_changed_rows_(0),
      found_rows_(0),
      upd_rtctx_(eval_ctx_, ctx, *this),
      conflict_checker_(ctx.get_allocator(),
                        eval_ctx_,
                        MY_SPEC.conflict_checker_ctdef_),
      is_ignore_(false)

O
oceanbase-admin 已提交
87
  {
W
wangzelin.wzl 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  }

  virtual ~ObTableInsertUpOp() {}

  virtual int inner_open() override;
  virtual int inner_rescan() override;
  virtual int inner_close() override;
  virtual int inner_get_next_row() override;

  int calc_auto_increment(const ObUpdCtDef &upd_ctdef);

  int update_auto_increment(const ObExpr &expr,
                            const uint64_t cid);

  // 执行所有尝试插入的 das task, fetch冲突行的主表主键
  int fetch_conflict_rowkey();
  int get_next_conflict_rowkey(DASTaskIter &task_iter);

  virtual void destroy()
  {
    // destroy
    conflict_checker_.destroy();
    insert_up_rtdefs_.release_array();
    insert_up_row_store_.~ObChunkDatumStore();
O
obdev 已提交
112
    upd_rtctx_.cleanup();
O
oceanbase-admin 已提交
113 114 115
    ObTableModifyOp::destroy();
  }

G
gm 已提交
116
protected:
O
oceanbase-admin 已提交
117

W
wangzelin.wzl 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
  // 物化所有要被replace into的行到replace_row_store_
  int load_batch_insert_up_rows(bool &is_iter_end, int64_t &insert_rows);

  int get_next_row_from_child();

  int do_insert_up();

  // 检查是否有duplicated key 错误发生
  bool check_is_duplicated();

  // 遍历replace_row_store_中的所有行,并且更新冲突map,
  int do_insert_up_cache();

  int set_heap_table_new_pk(const ObUpdCtDef &upd_ctdef,
                            ObUpdRtDef &upd_rtdef);

  // 遍历主表的hash map,确定最后提交delete + update + insert das_tasks
  // 提交顺序上必须先提交delete
  int prepare_final_insert_up_task();
O
oceanbase-admin 已提交
137

W
wangzelin.wzl 已提交
138 139 140
  int lock_one_row_to_das(const ObUpdCtDef &upd_ctdef,
                          ObUpdRtDef &upd_rtdef,
                          const ObDASTabletLoc *tablet_loc);
O
oceanbase-admin 已提交
141

W
wangzelin.wzl 已提交
142
  int insert_row_to_das();
O
oceanbase-admin 已提交
143

W
wangzelin.wzl 已提交
144
  int delete_upd_old_row_to_das();
O
oceanbase-admin 已提交
145

W
wangzelin.wzl 已提交
146
  int insert_upd_new_row_to_das();
O
oceanbase-admin 已提交
147

W
wangzelin.wzl 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
  int delete_one_upd_old_row_das(const ObUpdCtDef &upd_ctdef,
                                 ObUpdRtDef &upd_rtdef,
                                 const ObDASTabletLoc *tablet_loc);

  int insert_one_upd_new_row_das(const ObUpdCtDef &upd_ctdef,
                                 ObUpdRtDef &upd_rtdef,
                                 const ObDASTabletLoc *tablet_loc);

  int insert_row_to_das(const ObInsCtDef &ins_ctdef,
                        ObInsRtDef &ins_rtdef,
                        const ObDASTabletLoc *tablet_loc);


  int calc_update_tablet_loc(const ObUpdCtDef &upd_ctdef,
                             ObUpdRtDef &upd_rtdef,
                             ObDASTabletLoc *&old_tablet_loc,
                             ObDASTabletLoc *&new_tablet_loc);

  int calc_update_multi_tablet_id(const ObUpdCtDef &upd_ctdef,
                                  ObExpr &part_id_expr,
                                  common::ObTabletID &tablet_id);

  // 其实这里就是计算update 的old_row 的pkey
  int calc_upd_old_row_tablet_loc(const ObUpdCtDef &upd_ctdef,
                                  ObUpdRtDef &upd_rtdef,
                                  ObDASTabletLoc *&tablet_loc);

  int calc_upd_new_row_tablet_loc(const ObUpdCtDef &upd_ctdef,
                                  ObUpdRtDef &upd_rtdef,
                                  ObDASTabletLoc *&tablet_loc);

  // TODO @kaizhan.dkz 这个函数后续被删除
  int calc_insert_tablet_loc(const ObInsCtDef &ins_ctdef,
                             ObInsRtDef &ins_rtdef,
                             ObDASTabletLoc *&tablet_loc);

  int do_update(const ObConflictValue &constraint_value);

  int do_lock(const ObConflictValue &constraint_value);

  int do_insert(const ObConflictValue &constraint_value);

  int do_update_with_ignore();

  // 提交当前所有的 das task;
  int post_all_dml_das_task(ObDMLRtCtx &das_ctx, bool del_task_ahead);

  // batch的执行插入 process_row and then write to das,
  int try_insert_row();

  // batch的执行插入 process_row and then write to das,
  int update_row_to_das(bool need_do_trigger);

  int inner_open_with_das();

  int init_insert_up_rtdef();

  int deal_hint_part_selection(ObObjectID partition_id);
O
oceanbase-admin 已提交
206

G
gm 已提交
207
private:
W
wangzelin.wzl 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
  int check_insert_up_ctdefs_valid() const;

  const ObIArray<ObExpr *> &get_primary_table_insert_row();

  const ObIArray<ObExpr *> &get_primary_table_columns();

  const ObIArray<ObExpr *> &get_primary_table_upd_new_row();

  const ObIArray<ObExpr *> &get_primary_table_upd_old_row();

  int reset_das_env();

  void add_need_conflict_result_flag();

  int reuse();
O
oceanbase-admin 已提交
223

G
gm 已提交
224
protected:
W
wangzelin.wzl 已提交
225 226 227
  const static int64_t OB_DEFAULT_INSERT_UP_MEMORY_LIMIT = 2 * 1024 * 1024L;  // 2M in default
  int64_t insert_rows_;
  int64_t upd_changed_rows_;
O
oceanbase-admin 已提交
228
  int64_t found_rows_;
W
wangzelin.wzl 已提交
229 230 231 232 233
  ObDMLRtCtx upd_rtctx_;
  ObConflictChecker conflict_checker_;
  common::ObArrayWrap<ObInsertUpRtDef> insert_up_rtdefs_;
  ObChunkDatumStore insert_up_row_store_; //所有的insert_up的行的集合
  bool is_ignore_; // 暂时记录一下是否是ignore的insert_up SQL语句
O
oceanbase-admin 已提交
234
};
W
wangzelin.wzl 已提交
235 236 237
} // end namespace sql
} // end namespace oceanbase
#endif // OCEANBASE_DML_OB_TABLE_INSERT_UP_OP_H_