tianmu_handler.cpp 62.5 KB
Newer Older
H
hustjieke 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/* Copyright (c) 2022 StoneAtom, Inc. All rights reserved.
   Use is subject to license terms

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335 USA
*/

18
#include "tianmu_handler.h"
H
hustjieke 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

#include <iostream>

#include "mysql/plugin.h"

#include "common/assert.h"
#include "common/exception.h"
#include "core/compilation_tools.h"
#include "core/compiled_query.h"
#include "core/temp_table.h"
#include "core/tools.h"
#include "core/transaction.h"
#include "core/value.h"
#include "system/configuration.h"
#include "util/fs.h"

#define MYSQL_SERVER 1

37
namespace Tianmu {
38
namespace handler {
H
hustjieke 已提交
39

40
const Alter_inplace_info::HA_ALTER_FLAGS ha_tianmu::TIANMU_SUPPORTED_ALTER_ADD_DROP_ORDER =
41
    Alter_inplace_info::ADD_COLUMN | Alter_inplace_info::DROP_COLUMN | Alter_inplace_info::ALTER_STORED_COLUMN_ORDER;
42
const Alter_inplace_info::HA_ALTER_FLAGS ha_tianmu::TIANMU_SUPPORTED_ALTER_COLUMN_NAME =
H
hustjieke 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
    Alter_inplace_info::ALTER_COLUMN_DEFAULT | Alter_inplace_info::ALTER_COLUMN_NAME;
/////////////////////////////////////////////////////////////////////
//
// NOTICE: ALL EXCEPTIONS SHOULD BE CAUGHT in the handler API!!!
//         MySQL doesn't use exception.
//
/////////////////////////////////////////////////////////////////////

/*
 Example of simple lock controls. The "share" it creates is structure we will
 pass to each rcbase handler. Do you have to have one of these? Well, you have
 pieces that are used for locking, and they are needed to function.
 */

my_bool rcbase_query_caching_of_table_permitted(THD *thd, [[maybe_unused]] char *full_name,
                                                [[maybe_unused]] uint full_name_len,
                                                [[maybe_unused]] ulonglong *unused) {
60 61
  if (!thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
    return ((my_bool)TRUE);
H
hustjieke 已提交
62 63 64 65 66 67
  return ((my_bool)FALSE);
}

static core::Value GetValueFromField(Field *f) {
  core::Value v;

68 69
  if (f->is_null())
    return v;
H
hustjieke 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95

  switch (f->type()) {
    case MYSQL_TYPE_TINY:
    case MYSQL_TYPE_SHORT:
    case MYSQL_TYPE_LONG:
    case MYSQL_TYPE_INT24:
    case MYSQL_TYPE_LONGLONG:
      v.SetInt(f->val_int());
      break;
    case MYSQL_TYPE_DECIMAL:
    case MYSQL_TYPE_FLOAT:
    case MYSQL_TYPE_DOUBLE:
      v.SetDouble(f->val_real());
      break;
    case MYSQL_TYPE_NEWDECIMAL: {
      auto dec_f = dynamic_cast<Field_new_decimal *>(f);
      v.SetInt(std::lround(dec_f->val_real() * types::PowOfTen(dec_f->dec)));
      break;
    }
    case MYSQL_TYPE_TIMESTAMP: {
      MYSQL_TIME my_time;
      std::memset(&my_time, 0, sizeof(my_time));
      f->get_time(&my_time);
      // convert to UTC
      if (!common::IsTimeStampZero(my_time)) {
        my_bool myb;
96
        my_time_t secs_utc = current_txn_->Thd()->variables.time_zone->TIME_to_gmt_sec(&my_time, &myb);
H
hustjieke 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
        common::GMTSec2GMTTime(&my_time, secs_utc);
      }
      types::DT dt = {};
      dt.year = my_time.year;
      dt.month = my_time.month;
      dt.day = my_time.day;
      dt.hour = my_time.hour;
      dt.minute = my_time.minute;
      dt.second = my_time.second;
      v.SetInt(dt.val);
      break;
    }
    case MYSQL_TYPE_TIME:
    case MYSQL_TYPE_TIME2:
    case MYSQL_TYPE_DATE:
    case MYSQL_TYPE_DATETIME:
    case MYSQL_TYPE_NEWDATE:
    case MYSQL_TYPE_TIMESTAMP2:
    case MYSQL_TYPE_DATETIME2: {
      MYSQL_TIME my_time;
      std::memset(&my_time, 0, sizeof(my_time));
      f->get_time(&my_time);
      types::DT dt = {};
      dt.year = my_time.year;
      dt.month = my_time.month;
      dt.day = my_time.day;
      dt.hour = my_time.hour;
      dt.minute = my_time.minute;
      dt.second = my_time.second;
      v.SetInt(dt.val);
      break;
    }
    case MYSQL_TYPE_YEAR:  // what the hell?
    {
      types::DT dt = {};
      dt.year = f->val_int();
      v.SetInt(dt.val);
      break;
    }
    case MYSQL_TYPE_VARCHAR:
    case MYSQL_TYPE_TINY_BLOB:
    case MYSQL_TYPE_MEDIUM_BLOB:
    case MYSQL_TYPE_LONG_BLOB:
    case MYSQL_TYPE_BLOB:
    case MYSQL_TYPE_VAR_STRING:
    case MYSQL_TYPE_STRING: {
      String str;
      f->val_str(&str);
      v.SetString(const_cast<char *>(str.ptr()), str.length());
      break;
    }
    case MYSQL_TYPE_SET:
    case MYSQL_TYPE_ENUM:
    case MYSQL_TYPE_GEOMETRY:
    case MYSQL_TYPE_NULL:
    case MYSQL_TYPE_BIT:
    default:
      throw common::Exception("unsupported mysql type " + std::to_string(f->type()));
      break;
  }
  return v;
}

160
ha_tianmu::ha_tianmu(handlerton *hton, TABLE_SHARE *table_arg) : handler(hton, table_arg) {
H
hustjieke 已提交
161 162 163
  ref_length = sizeof(uint64_t);
}

164
const char **ha_tianmu::bas_ext() const {
165
  static const char *ha_rcbase_exts[] = {common::TIANMU_EXT, 0};
H
hustjieke 已提交
166 167 168 169 170 171 172
  return ha_rcbase_exts;
}

namespace {
std::vector<bool> GetAttrsUseIndicator(TABLE *table) {
  int col_id = 0;
  std::vector<bool> attr_uses;
173 174 175 176 177 178
  enum_sql_command sql_command = SQLCOM_END;
  if (table->in_use && table->in_use->lex)
    sql_command = table->in_use->lex->sql_command;
  bool check_tianmu_delete_or_update = (sql_command == SQLCOM_DELETE) || (sql_command == SQLCOM_DELETE_MULTI) ||
                                       (sql_command == SQLCOM_UPDATE) || (sql_command == SQLCOM_UPDATE_MULTI);

H
hustjieke 已提交
179
  for (Field **field = table->field; *field; ++field, ++col_id) {
180 181 182 183 184 185 186 187 188 189
    /*
      The binlog in row format will record the information in each column of the currently modified row and generate a
      change log, The information of each column in the current row is obtained from the engine layer, so when the
      current statement is delete or update, you need to fill in data for each column.
      Here, should set each column to be valid.
    */
    if (check_tianmu_delete_or_update) {
      attr_uses.push_back(true);
      continue;
    }
H
hustjieke 已提交
190 191 192 193 194 195 196 197 198 199
    if (bitmap_is_set(table->read_set, col_id) || bitmap_is_set(table->write_set, col_id))
      attr_uses.push_back(true);
    else
      attr_uses.push_back(false);
  }
  return attr_uses;
}
}  // namespace

static bool is_delay_insert(THD *thd) {
200
  return tianmu_sysvar_insert_delayed &&
H
hustjieke 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
         (thd_sql_command(thd) == SQLCOM_INSERT || thd_sql_command(thd) == SQLCOM_INSERT_SELECT) &&
         thd->lex->duplicates != DUP_UPDATE;
}

/*
 The idea with handler::store_lock() is the following:

 The statement decided which locks we should need for the table
 for updates/deletes/inserts we get WRITE locks, for SELECT... we get
 read locks.

 Before adding the lock into the table lock handler (see thr_lock.c)
 mysqld calls store lock with the requested locks.  Store lock can now
 modify a write lock to a read lock (or some other lock), ignore the
 lock (if we don't want to use MySQL table locks at all) or add locks
 for many tables (like we do when we are using a MERGE handler).

 Berkeley DB for example  changes all WRITE locks to TL_WRITE_ALLOW_WRITE
 (which signals that we are doing WRITES, but we are still allowing other
 reader's and writer's.

 When releasing locks, store_lock() are also called. In this case one
 usually doesn't have to do anything.

 In some exceptional cases MySQL may send a request for a TL_IGNORE;
 This means that we are requesting the same lock as last time and this
 should also be ignored. (This may happen when someone does a flush
 table when we have opened a part of the tables, in which case mysqld
 closes and reopens the tables and tries to get the same locks at last
 time).  In the future we will probably try to remove this.

 Called from lock.cc by get_lock_data().
 */
234
THR_LOCK_DATA **ha_tianmu::store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type) {
H
hustjieke 已提交
235 236 237 238 239 240 241
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) {
    if (is_delay_insert(thd))
      lock_type = TL_READ;
    else
      lock_type = TL_WRITE_CONCURRENT_INSERT;
  }

242 243 244
  if (lock_type != TL_IGNORE && lock_.type == TL_UNLOCK)
    lock_.type = lock_type;
  *to++ = &lock_;
H
hustjieke 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257 258
  return to;
}

/*
 First you should go read the section "locking functions for mysql" in
 lock.cc to understand this.
 This create a lock on the table. If you are implementing a storage engine
 that can handle transacations look at ha_berkely.cc to see how you will
 want to goo about doing this. Otherwise you should consider calling flock()
 here.

 Called from lock.cc by lock_external() and unlock_external(). Also called
 from sql_table.cc by copy_data_between_tables().
 */
259
int ha_tianmu::external_lock(THD *thd, int lock_type) {
H
hustjieke 已提交
260 261 262 263 264
  DBUG_ENTER(__PRETTY_FUNCTION__);

  int ret = 1;

  // static const char *ss[] = { "READ LOCK", "WRITE LOCK", "UNLOCK", };
265
  // rclog << lock << "external lock table " << table_name_ << " type: " <<
H
hustjieke 已提交
266 267
  // ss[lock_type] << " command: " << thd->lex->sql_command << unlock;

268 269
  if (thd->lex->sql_command == SQLCOM_LOCK_TABLES)
    DBUG_RETURN(HA_ERR_WRONG_COMMAND);
H
hustjieke 已提交
270

271
  if (is_delay_insert(thd) && table_share->tmp_table == NO_TMP_TABLE && lock_type == F_WRLCK) {
272 273
    DBUG_RETURN(0);
  }
H
hustjieke 已提交
274 275 276

  try {
    if (lock_type == F_UNLCK) {
277 278
      if (thd->lex->sql_command == SQLCOM_UNLOCK_TABLES)
        current_txn_->ExplicitUnlockTables();
H
hustjieke 已提交
279

280 281
      if (thd->killed)
        ha_rcengine_->Rollback(thd, true);
282
      if (current_txn_) {
283
        current_txn_->RemoveTable(share_);
284 285
        if (current_txn_->Empty()) {
          ha_rcengine_->ClearTx(thd);
H
hustjieke 已提交
286 287 288
        }
      }
    } else {
289
      auto tx = ha_rcengine_->GetTx(thd);
290 291
      if (thd->lex->sql_command == SQLCOM_LOCK_TABLES)
        tx->ExplicitLockTables();
H
hustjieke 已提交
292 293

      if (lock_type == F_RDLCK) {
294
        tx->AddTableRD(share_);
H
hustjieke 已提交
295
      } else {
296 297
        tx->AddTableWR(share_);
        trans_register_ha(thd, false, rcbase_hton, nullptr);
298
        if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
299
          trans_register_ha(thd, true, rcbase_hton, nullptr);
H
hustjieke 已提交
300 301 302 303
      }
    }
    ret = 0;
  } catch (std::exception &e) {
304 305
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "Tianmu internal error", MYF(0));
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s.", e.what());
H
hustjieke 已提交
306 307
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
308
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
309 310 311
  }

  // destroy the tx on failure
312 313
  if (ret != 0)
    ha_rcengine_->ClearTx(thd);
H
hustjieke 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410

  DBUG_RETURN(ret);
}

namespace {
inline bool has_dup_key(std::shared_ptr<index::RCTableIndex> &indextab, TABLE *table, size_t &row) {
  common::ErrorCode ret;
  std::vector<std::string_view> records;
  KEY *key = table->key_info + table->s->primary_key;

  for (uint i = 0; i < key->actual_key_parts; i++) {
    uint col = key->key_part[i].field->field_index;
    Field *f = table->field[col];
    switch (f->type()) {
      case MYSQL_TYPE_TINY:
      case MYSQL_TYPE_SHORT:
      case MYSQL_TYPE_LONG:
      case MYSQL_TYPE_INT24:
      case MYSQL_TYPE_LONGLONG: {
        int64_t v = f->val_int();
        records.emplace_back((const char *)&v, sizeof(int64_t));
        break;
      }
      case MYSQL_TYPE_DECIMAL:
      case MYSQL_TYPE_FLOAT:
      case MYSQL_TYPE_DOUBLE: {
        double v = f->val_real();
        records.emplace_back((const char *)&v, sizeof(double));
        break;
      }
      case MYSQL_TYPE_NEWDECIMAL: {
        auto dec_f = dynamic_cast<Field_new_decimal *>(f);
        double v = std::lround(dec_f->val_real() * types::PowOfTen(dec_f->dec));
        records.emplace_back((const char *)&v, sizeof(double));
        break;
      }
      case MYSQL_TYPE_TIME:
      case MYSQL_TYPE_TIME2:
      case MYSQL_TYPE_DATE:
      case MYSQL_TYPE_DATETIME:
      case MYSQL_TYPE_NEWDATE:
      case MYSQL_TYPE_TIMESTAMP2:
      case MYSQL_TYPE_DATETIME2: {
        MYSQL_TIME my_time;
        std::memset(&my_time, 0, sizeof(my_time));
        f->get_time(&my_time);
        types::DT dt(my_time);
        records.emplace_back((const char *)&dt.val, sizeof(int64_t));
        break;
      }

      case MYSQL_TYPE_TIMESTAMP: {
        MYSQL_TIME my_time;
        std::memset(&my_time, 0, sizeof(my_time));
        f->get_time(&my_time);
        auto saved = my_time.second_part;
        // convert to UTC
        if (!common::IsTimeStampZero(my_time)) {
          my_bool myb;
          my_time_t secs_utc = current_thd->variables.time_zone->TIME_to_gmt_sec(&my_time, &myb);
          common::GMTSec2GMTTime(&my_time, secs_utc);
        }
        my_time.second_part = saved;
        types::DT dt(my_time);
        records.emplace_back((const char *)&dt.val, sizeof(int64_t));
        break;
      }
      case MYSQL_TYPE_YEAR:  // what the hell?
      {
        types::DT dt = {};
        dt.year = f->val_int();
        records.emplace_back((const char *)&dt.val, sizeof(int64_t));
        break;
      }
      case MYSQL_TYPE_VARCHAR:
      case MYSQL_TYPE_TINY_BLOB:
      case MYSQL_TYPE_MEDIUM_BLOB:
      case MYSQL_TYPE_LONG_BLOB:
      case MYSQL_TYPE_BLOB:
      case MYSQL_TYPE_VAR_STRING:
      case MYSQL_TYPE_STRING: {
        String str;
        f->val_str(&str);
        records.emplace_back(str.ptr(), str.length());
        break;
      }
      case MYSQL_TYPE_SET:
      case MYSQL_TYPE_ENUM:
      case MYSQL_TYPE_GEOMETRY:
      case MYSQL_TYPE_NULL:
      case MYSQL_TYPE_BIT:
      default:
        throw common::Exception("unsupported mysql type " + std::to_string(f->type()));
        break;
    }
  }

411
  ret = indextab->GetRowByKey(current_txn_, records, row);
H
hustjieke 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
  return (ret == common::ErrorCode::SUCCESS);
}
}  // namespace

/*
 write_row() inserts a row. No extra() hint is given currently if a bulk load
 is happeneding. buf() is a byte array of data. You can use the field
 information to extract the data from the native byte array type.
 Example of this would be:
 for (Field **field=table->field ; *field ; field++)
 {
 ...
 }

 See ha_tina.cc for an example of extracting all of the data as strings.
 ha_berekly.cc has an example of how to store it intact by "packing" it
 for ha_berkeley's own native storage type.

 See the note for update_row() on auto_increments and timestamps. This
 case also applied to write_row().

 Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
434
 sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
H
hustjieke 已提交
435
 */
436
int ha_tianmu::write_row([[maybe_unused]] uchar *buf) {
H
hustjieke 已提交
437 438 439 440
  int ret = 1;
  DBUG_ENTER(__PRETTY_FUNCTION__);
  try {
    if (ha_thd()->lex->duplicates == DUP_UPDATE) {
441
      if (auto indextab = ha_rcengine_->GetTableIndex(table_name_)) {
H
hustjieke 已提交
442
        if (size_t row; has_dup_key(indextab, table, row)) {
443
          dupkey_pos_ = row;
H
hustjieke 已提交
444 445 446 447
          DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
        }
      }
    }
448
    ret = ha_rcengine_->InsertRow(table_name_, current_txn_, table, share_);
H
hustjieke 已提交
449 450 451
  } catch (common::OutOfMemoryException &e) {
    DBUG_RETURN(ER_LOCK_WAIT_TIMEOUT);
  } catch (common::DatabaseException &e) {
452
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s.", e.what());
H
hustjieke 已提交
453 454
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
  } catch (common::FormatException &e) {
455
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s Row: %ld, field %u.", e.what(),
456
               e.m_row_no, e.m_field_no);
H
hustjieke 已提交
457 458
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
  } catch (common::FileException &e) {
459
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s.", e.what());
H
hustjieke 已提交
460 461
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
  } catch (common::Exception &e) {
462
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s.", e.what());
H
hustjieke 已提交
463 464 465
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
466
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s.", e.what());
H
hustjieke 已提交
467 468
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
469
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
  }
  DBUG_RETURN(ret);
}

/*
 Yes, update_row() does what you expect, it updates a row. old_data will have
 the previous row record in it, while new_data will have the newest data in
 it.
 Keep in mind that the server can do updates based on ordering if an ORDER BY
 clause was used. Consecutive ordering is not guarenteed.
 Currently new_data will not have an updated auto_increament record, or
 and updated timestamp field. You can do these for example by doing these:
 if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
 table->timestamp_field->set_time();
 if (table->next_number_field && record == table->record[0])
 update_auto_increment();

 Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
 */
489
int ha_tianmu::update_row(const uchar *old_data, uchar *new_data) {
H
hustjieke 已提交
490 491 492 493 494 495 496 497
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int ret = HA_ERR_INTERNAL_ERROR;
  auto org_bitmap = dbug_tmp_use_all_columns(table, table->write_set);

  std::shared_ptr<void> defer(nullptr,
                              [org_bitmap, this](...) { dbug_tmp_restore_column_map(table->write_set, org_bitmap); });

  try {
498
    auto tab = current_txn_->GetTableByPath(table_name_);
H
hustjieke 已提交
499 500 501 502 503 504 505 506 507 508 509 510 511

    for (uint i = 0; i < table->s->fields; i++) {
      if (!bitmap_is_set(table->write_set, i)) {
        continue;
      }
      auto field = table->field[i];
      if (field->real_maybe_null()) {
        if (field->is_null_in_record(old_data) && field->is_null_in_record(new_data)) {
          continue;
        }

        if (field->is_null_in_record(new_data)) {
          core::Value null;
512
          tab->UpdateItem(current_position_, i, null);
H
hustjieke 已提交
513 514 515 516 517 518 519 520 521 522
          continue;
        }
      }
      auto o_ptr = field->ptr - table->record[0] + old_data;
      auto n_ptr = field->ptr - table->record[0] + new_data;
      if (field->is_null_in_record(old_data) || std::memcmp(o_ptr, n_ptr, field->pack_length()) != 0) {
        my_bitmap_map *org_bitmap2 = dbug_tmp_use_all_columns(table, table->read_set);
        std::shared_ptr<void> defer(
            nullptr, [org_bitmap2, this](...) { dbug_tmp_restore_column_map(table->read_set, org_bitmap2); });
        core::Value v = GetValueFromField(field);
523
        tab->UpdateItem(current_position_, i, v);
H
hustjieke 已提交
524 525
      }
    }
526
    ha_rcengine_->IncTianmuStatUpdate();
H
hustjieke 已提交
527 528
    DBUG_RETURN(0);
  } catch (common::DatabaseException &e) {
529
    TIANMU_LOG(LogCtl_Level::ERROR, "Update exception: %s.", e.what());
H
hustjieke 已提交
530
  } catch (common::FileException &e) {
531
    TIANMU_LOG(LogCtl_Level::ERROR, "Update exception: %s.", e.what());
H
hustjieke 已提交
532 533
  } catch (common::DupKeyException &e) {
    ret = HA_ERR_FOUND_DUPP_KEY;
534
    TIANMU_LOG(LogCtl_Level::ERROR, "Update exception: %s.", e.what());
H
hustjieke 已提交
535
  } catch (common::Exception &e) {
536
    TIANMU_LOG(LogCtl_Level::ERROR, "Update exception: %s.", e.what());
H
hustjieke 已提交
537
  } catch (std::exception &e) {
538
    TIANMU_LOG(LogCtl_Level::ERROR, "Update exception: %s.", e.what());
H
hustjieke 已提交
539
  } catch (...) {
540
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
  }
  DBUG_RETURN(ret);
}

/*
 This will delete a row. buf will contain a copy of the row to be deleted.
 The server will call this right after the current row has been called (from
 either a previous rnd_nexT() or index call).
 If you keep a pointer to the last row or can access a primary key it will
 make doing the deletion quite a bit easier.
 Keep in mind that the server does no guarentee consecutive deletions. ORDER BY
 clauses can be used.

 Called in sql_acl.cc and sql_udf.cc to manage internal table information.
 Called in sql_delete.cc, sql_insert.cc, and sql_select.cc. In sql_select it is
 used for removing duplicates while in insert it is used for REPLACE calls.
 */
558
int ha_tianmu::delete_row([[maybe_unused]] const uchar *buf) {
H
hustjieke 已提交
559
  DBUG_ENTER(__PRETTY_FUNCTION__);
560 561 562 563 564
  int ret = HA_ERR_INTERNAL_ERROR;
  auto org_bitmap = dbug_tmp_use_all_columns(table, table->write_set);

  std::shared_ptr<void> defer(nullptr,
                              [org_bitmap, this](...) { dbug_tmp_restore_column_map(table->write_set, org_bitmap); });
H
hustjieke 已提交
565

566
  try {
567
    auto tab = current_txn_->GetTableByPath(table_name_);
568 569

    for (uint i = 0; i < table->s->fields; i++) {
570
      tab->DeleteItem(current_position_, i);
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
    }
    DBUG_RETURN(0);
  } catch (common::DatabaseException &e) {
    TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what());
  } catch (common::FileException &e) {
    TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what());
  } catch (common::Exception &e) {
    TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what());
  } catch (std::exception &e) {
    TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what());
  } catch (...) {
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
  }
  DBUG_RETURN(ret);
}
H
hustjieke 已提交
586 587 588 589 590 591 592 593 594 595 596
/*
 Used to delete all rows in a table. Both for cases of truncate and
 for cases where the optimizer realizes that all rows will be
 removed as a result of a SQL statement.

 Called from item_sum.cc by Item_func_group_concat::clear(),
 Item_sum_count_distinct::clear(), and Item_func_group_concat::clear().
 Called from sql_delete.cc by mysql_delete().
 Called from sql_select.cc by JOIN::rein*it.
 Called from sql_union.cc by st_select_lex_unit::exec().
 */
597
int ha_tianmu::delete_all_rows() {
H
hustjieke 已提交
598
  DBUG_ENTER(__PRETTY_FUNCTION__);
599 600
  int ret = 1;
  try {
601
    ha_rcengine_->TruncateTable(table_name_, ha_thd());
602 603 604 605 606 607 608
    ret = 0;
  } catch (std::exception &e) {
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
  } catch (...) {
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
  }
  DBUG_RETURN(ret);
H
hustjieke 已提交
609 610
}

611
int ha_tianmu::rename_table(const char *from, const char *to) {
H
hustjieke 已提交
612
  try {
613
    ha_rcengine_->RenameTable(current_txn_, from, to, ha_thd());
H
hustjieke 已提交
614 615 616
    return 0;
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
617
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
618 619
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
620
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
621 622 623 624
  }
  return 1;
}

625
void ha_tianmu::update_create_info([[maybe_unused]] HA_CREATE_INFO *create_info) {}
H
hustjieke 已提交
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671

/*
 ::info() is used to return information to the optimizer.
 see my_base.h for the complete description

 Currently this table handler doesn't implement most of the fields
 really needed. SHOW also makes use of this data
 Another note, you will probably want to have the following in your
 code:
 if (records < 2)
 records = 2;
 The reason is that the server will optimize for cases of only a single
 record. If in a table scan you don't know the number of records
 it will probably be better to set records to two so you can return
 as many records as you need.
 Along with records a few more variables you may wish to set are:
 records
 deleted
 data_file_length
 index_file_length
 delete_length
 check_time
 Take a look at the public variables in handler.h for more information.

 Called in:
 filesort.cc
 ha_heap.cc
 item_sum.cc
 opt_sum.cc
 sql_delete.cc
 sql_delete.cc
 sql_derived.cc
 sql_select.cc
 sql_select.cc
 sql_select.cc
 sql_select.cc
 sql_select.cc
 sql_show.cc
 sql_show.cc
 sql_show.cc
 sql_show.cc
 sql_table.cc
 sql_union.cc
 sql_update.cc

 */
672
int ha_tianmu::info(uint flag) {
H
hustjieke 已提交
673 674 675
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int ret = 1;
  try {
676
    std::scoped_lock guard(global_mutex_);
H
hustjieke 已提交
677 678
    if (flag & HA_STATUS_VARIABLE) {
      std::shared_ptr<core::RCTable> tab;
679
      if (current_txn_ != nullptr) {
680
        tab = current_txn_->GetTableByPath(table_name_);
H
hustjieke 已提交
681
      } else
682
        tab = ha_rcengine_->GetTableRD(table_name_);
683
      stats.records = (ha_rows)(tab->NumOfValues() - tab->NumOfDeleted());
H
hustjieke 已提交
684 685 686 687
      stats.data_file_length = 0;
      stats.mean_rec_length = 0;
      if (stats.records > 0) {
        std::vector<core::AttrInfo> attr_info(tab->GetAttributesInfo());
688
        uint no_attrs = tab->NumOfAttrs();
689
        for (uint j = 0; j < no_attrs; j++) stats.data_file_length += attr_info[j].comp_size;  // compressed size
H
hustjieke 已提交
690 691 692 693
        stats.mean_rec_length = ulong(stats.data_file_length / stats.records);
      }
    }

694 695
    if (flag & HA_STATUS_CONST)
      stats.create_time = share_->GetCreateTime();
H
hustjieke 已提交
696

697 698
    if (flag & HA_STATUS_TIME)
      stats.update_time = share_->GetUpdateTime();
H
hustjieke 已提交
699 700 701

    if (flag & HA_STATUS_ERRKEY) {
      errkey = 0;  // TODO: for now only support one pk index
702
      my_store_ptr(dup_ref, ref_length, dupkey_pos_);
H
hustjieke 已提交
703 704 705 706 707
    }

    ret = 0;
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
708
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
709 710
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
711
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
712 713 714 715 716 717
  }

  DBUG_RETURN(ret);
}

/* this should return 0 for concurrent insert to work */
718
my_bool tianmu_check_status([[maybe_unused]] void *param) { return 0; }
H
hustjieke 已提交
719 720 721 722 723 724 725 726 727 728

/*
 Used for opening tables. The name will be the name of the file.
 A table is opened when it needs to be opened. For instance
 when a request comes in for a select on the table (tables are not
 open and closed for each request, they are cached).

 Called from handler.cc by handler::ha_open(). The server opens all tables by
 calling ha_open() which then calls the handler specific open().
 */
729
int ha_tianmu::open(const char *name, [[maybe_unused]] int mode, [[maybe_unused]] uint test_if_locked) {
H
hustjieke 已提交
730 731
  DBUG_ENTER(__PRETTY_FUNCTION__);

732
  table_name_ = name;
H
hustjieke 已提交
733 734 735 736 737 738 739 740
  int ret = 1;

  try {
    // TODO:
    // Probably we don't need to search from the map each time.
    // Keeping the share together with mysql handler cache makes
    // more sense that would mean once a table is opened the TableShare
    // would be kept.
741 742
    if (!(share_ = ha_rcengine_->GetTableShare(table_share)))
      DBUG_RETURN(ret);
H
hustjieke 已提交
743

744 745
    thr_lock_data_init(&share_->thr_lock, &lock_, nullptr);
    share_->thr_lock.check_status = tianmu_check_status;
H
hustjieke 已提交
746
    // have primary key, use table index
747 748 749
    if (table->s->primary_key != MAX_INDEXES)
      ha_rcengine_->AddTableIndex(name, table, ha_thd());
    ha_rcengine_->AddMemTable(table, share_);
H
hustjieke 已提交
750 751
    ret = 0;
  } catch (common::Exception &e) {
752 753
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "Error from Tianmu engine", MYF(0));
    TIANMU_LOG(LogCtl_Level::ERROR, "A tianmu exception is caught: %s", e.what());
H
hustjieke 已提交
754 755
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
756
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
757 758
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
759
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
760 761 762 763 764 765 766
  }

  info(HA_STATUS_CONST);

  DBUG_RETURN(ret);
}

767
int ha_tianmu::free_share() {
768
  share_.reset();
H
hustjieke 已提交
769 770 771 772 773 774 775 776 777 778 779 780 781
  return 0;
}

/*
 Closes a table. We call the free_share() function to free any resources
 that we have allocated in the "shared" structure.

 Called from sql_base.cc, sql_select.cc, and table.cc.
 In sql_select.cc it is only used to close up temporary tables or during
 the process where a temporary table is converted over to being a
 myisam table.
 For sql_base.cc look at close_data_tables().
 */
782
int ha_tianmu::close() {
H
hustjieke 已提交
783 784 785 786
  DBUG_ENTER(__PRETTY_FUNCTION__);
  DBUG_RETURN(free_share());
}

787
int ha_tianmu::fill_row_by_id([[maybe_unused]] uchar *buf, uint64_t rowid) {
H
hustjieke 已提交
788 789 790
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int rc = HA_ERR_KEY_NOT_FOUND;
  try {
791
    auto tab = current_txn_->GetTableByPath(table_name_);
H
hustjieke 已提交
792 793 794 795 796 797
    if (tab) {
      my_bitmap_map *org_bitmap = dbug_tmp_use_all_columns(table, table->write_set);
      std::shared_ptr<void> defer(
          nullptr, [org_bitmap, this](...) { dbug_tmp_restore_column_map(table->write_set, org_bitmap); });

      tab->FillRowByRowid(table, rowid);
798
      current_position_ = rowid;
H
hustjieke 已提交
799 800 801 802
      rc = 0;
    }
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
803
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
804 805 806 807
  }
  DBUG_RETURN(rc);
}

808
int ha_tianmu::index_init(uint index, [[maybe_unused]] bool sorted) {
H
hustjieke 已提交
809 810 811 812 813
  DBUG_ENTER(__PRETTY_FUNCTION__);
  active_index = index;
  DBUG_RETURN(0);
}

814
int ha_tianmu::index_end() {
H
hustjieke 已提交
815 816 817 818 819 820 821 822 823 824
  DBUG_ENTER(__PRETTY_FUNCTION__);
  active_index = MAX_KEY;
  DBUG_RETURN(0);
}

/*
 Positions an index cursor to the index specified in the handle. Fetches the
 row if available. If the key value is null, begin at the first key of the
 index.
 */
825 826 827
int ha_tianmu::index_read([[maybe_unused]] uchar *buf, [[maybe_unused]] const uchar *key,
                          [[maybe_unused]] uint key_len __attribute__((unused)),
                          enum ha_rkey_function find_flag __attribute__((unused))) {
H
hustjieke 已提交
828 829 830
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int rc = HA_ERR_KEY_NOT_FOUND;
  try {
831
    auto index = ha_rcengine_->GetTableIndex(table_name_);
H
hustjieke 已提交
832 833 834 835 836 837
    if (index && (active_index == table_share->primary_key)) {
      std::vector<std::string_view> keys;
      key_convert(key, key_len, index->KeyCols(), keys);
      // support equality fullkey lookup over primary key, using full tuple
      if (find_flag == HA_READ_KEY_EXACT) {
        uint64_t rowid;
838
        if (index->GetRowByKey(current_txn_, keys, rowid) == common::ErrorCode::SUCCESS) {
H
hustjieke 已提交
839 840 841
          rc = fill_row_by_id(buf, rowid);
        }
      } else if (find_flag == HA_READ_AFTER_KEY || find_flag == HA_READ_KEY_OR_NEXT) {
842
        auto iter = current_txn_->KVTrans().KeyIter();
843
        common::Operator op = (find_flag == HA_READ_AFTER_KEY) ? common::Operator::O_MORE : common::Operator::O_MORE_EQ;
H
hustjieke 已提交
844 845 846 847 848 849 850 851
        iter->ScanToKey(index, keys, op);
        uint64_t rowid;
        iter->GetRowid(rowid);
        rc = fill_row_by_id(buf, rowid);

      } else {
        // not support HA_READ_PREFIX_LAST_OR_PREV HA_READ_PREFIX_LAST
        rc = HA_ERR_WRONG_COMMAND;
852
        TIANMU_LOG(LogCtl_Level::ERROR, "Error: index_read not support prefix search");
H
hustjieke 已提交
853 854 855 856 857
      }

    } else {
      // other index not support
      rc = HA_ERR_WRONG_INDEX;
858
      TIANMU_LOG(LogCtl_Level::ERROR, "Error: index_read only support primary key");
H
hustjieke 已提交
859 860 861 862
    }

  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
863
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
864 865 866 867 868 869 870 871
  }

  DBUG_RETURN(rc);
}

/*
 Used to read forward through the index.
 */
872
int ha_tianmu::index_next([[maybe_unused]] uchar *buf) {
H
hustjieke 已提交
873 874 875
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int rc = HA_ERR_END_OF_FILE;
  try {
876
    auto iter = current_txn_->KVTrans().KeyIter();
H
hustjieke 已提交
877 878 879 880 881 882 883 884 885 886
    ++(*iter);
    if (iter->IsValid()) {
      uint64_t rowid;
      iter->GetRowid(rowid);
      rc = fill_row_by_id(buf, rowid);
    } else {
      rc = HA_ERR_KEY_NOT_FOUND;
    }
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
887
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
888 889 890 891 892 893 894
  }
  DBUG_RETURN(rc);
}

/*
 Used to read backwards through the index.
 */
895
int ha_tianmu::index_prev([[maybe_unused]] uchar *buf) {
H
hustjieke 已提交
896 897 898
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int rc = HA_ERR_END_OF_FILE;
  try {
899
    auto iter = current_txn_->KVTrans().KeyIter();
H
hustjieke 已提交
900 901 902 903 904 905 906 907 908 909
    --(*iter);
    if (iter->IsValid()) {
      uint64_t rowid;
      iter->GetRowid(rowid);
      rc = fill_row_by_id(buf, rowid);
    } else {
      rc = HA_ERR_KEY_NOT_FOUND;
    }
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
910
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
911 912 913 914 915 916 917 918 919 920
  }
  DBUG_RETURN(rc);
}

/*
 index_first() asks for the first key in the index.

 Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
 and sql_select.cc.
 */
921
int ha_tianmu::index_first([[maybe_unused]] uchar *buf) {
H
hustjieke 已提交
922 923 924
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int rc = HA_ERR_END_OF_FILE;
  try {
925
    auto index = ha_rcengine_->GetTableIndex(table_name_);
926
    if (index && current_txn_) {
H
hustjieke 已提交
927
      uint64_t rowid;
928
      auto iter = current_txn_->KVTrans().KeyIter();
H
hustjieke 已提交
929 930 931 932 933 934 935 936 937 938
      iter->ScanToEdge(index, true);
      if (iter->IsValid()) {
        iter->GetRowid(rowid);
        rc = fill_row_by_id(buf, rowid);
      } else {
        rc = HA_ERR_KEY_NOT_FOUND;
      }
    }
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
939
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
940 941 942 943 944 945 946 947 948 949
  }
  DBUG_RETURN(rc);
}

/*
 index_last() asks for the last key in the index.

 Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
 and sql_select.cc.
 */
950
int ha_tianmu::index_last([[maybe_unused]] uchar *buf) {
H
hustjieke 已提交
951 952 953
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int rc = HA_ERR_END_OF_FILE;
  try {
954
    auto index = ha_rcengine_->GetTableIndex(table_name_);
955
    if (index && current_txn_) {
H
hustjieke 已提交
956
      uint64_t rowid;
957
      auto iter = current_txn_->KVTrans().KeyIter();
H
hustjieke 已提交
958 959 960 961 962 963 964 965 966 967
      iter->ScanToEdge(index, false);
      if (iter->IsValid()) {
        iter->GetRowid(rowid);
        rc = fill_row_by_id(buf, rowid);
      } else {
        rc = HA_ERR_KEY_NOT_FOUND;
      }
    }
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
968
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
969 970 971 972 973 974 975 976 977 978 979 980 981
  }
  DBUG_RETURN(rc);
}

/*
 rnd_init() is called when the system wants the storage engine to do a table
 scan.
 See the example in the introduction at the top of this file to see when
 rnd_init() is called.

 Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
 sql_table.cc, and sql_update.cc.
 */
982
int ha_tianmu::rnd_init(bool scan) {
H
hustjieke 已提交
983 984 985 986
  DBUG_ENTER(__PRETTY_FUNCTION__);

  int ret = 1;
  try {
987 988 989
    if (query_ && !result_ && table_ptr_->NumOfObj() != 0) {
      cq_->Result(tmp_table_);  // it is ALWAYS -2 though....
      result_ = true;
H
hustjieke 已提交
990 991

      try {
992 993
        core::FunctionExecutor fe(std::bind(&core::Query::LockPackInfoForUse, std::ref(query_)),
                                  std::bind(&core::Query::UnlockPackInfoFromUse, std::ref(query_)));
H
hustjieke 已提交
994

995
        core::TempTable *push_down_result = query_->Preexecute(*cq_, nullptr, false);
996
        if (!push_down_result || push_down_result->NumOfTables() != 1)
H
hustjieke 已提交
997 998 999 1000
          throw common::InternalException("core::Query execution returned no result object");

        core::Filter *filter(push_down_result->GetMultiIndexP()->GetFilter(0));
        if (!filter)
1001 1002
          filter_ptr_.reset(
              new core::Filter(push_down_result->GetMultiIndexP()->OrigSize(0), table_ptr_->Getpackpower()));
H
hustjieke 已提交
1003
        else
1004
          filter_ptr_.reset(new core::Filter(*filter));
H
hustjieke 已提交
1005

1006 1007 1008
        table_ptr_ = push_down_result->GetTableP(0);
        table_new_iter_ = ((core::RCTable *)table_ptr_)->Begin(GetAttrsUseIndicator(table), *filter);
        table_new_iter_end_ = ((core::RCTable *)table_ptr_)->End();
H
hustjieke 已提交
1009
      } catch (common::Exception const &e) {
1010
        rc_control_ << system::lock << "Error in push-down execution, push-down execution aborted: " << e.what()
1011
                    << system::unlock;
1012
        TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in push-down execution: %s", e.what());
H
hustjieke 已提交
1013
      }
1014 1015
      query_.reset();
      cq_.reset();
H
hustjieke 已提交
1016
    } else {
1017 1018 1019
      if (scan && filter_ptr_.get()) {
        table_new_iter_ = ((core::RCTable *)table_ptr_)->Begin(GetAttrsUseIndicator(table), *filter_ptr_);
        table_new_iter_end_ = ((core::RCTable *)table_ptr_)->End();
H
hustjieke 已提交
1020 1021
      } else {
        std::shared_ptr<core::RCTable> rctp;
1022
        ha_rcengine_->GetTableIterator(table_name_, table_new_iter_, table_new_iter_end_, rctp,
1023
                                       GetAttrsUseIndicator(table), table->in_use);
1024 1025
        table_ptr_ = rctp.get();
        filter_ptr_.reset();
H
hustjieke 已提交
1026 1027 1028
      }
    }
    ret = 0;
1029 1030 1031
    blob_buffers_.resize(0);
    if (table_ptr_ != nullptr)
      blob_buffers_.resize(table_ptr_->NumOfDisplaybleAttrs());
H
hustjieke 已提交
1032 1033
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1034
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1035 1036
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1037
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1038 1039 1040 1041 1042
  }

  DBUG_RETURN(ret);
}

1043
int ha_tianmu::rnd_end() {
H
hustjieke 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057
  DBUG_ENTER(__PRETTY_FUNCTION__);
  reset();
  DBUG_RETURN(0);
}

/*
 This is called for each row of the table scan. When you run out of records
 you should return HA_ERR_END_OF_FILE. Fill buff up with the row information.
 The Field structure for the table is the key to getting data into buf
 in a manner that will allow the server to understand it.

 Called from filesort.cc, records.cc, sql_handler.cc, sql_select.cc,
 sql_table.cc, and sql_update.cc.
 */
1058
int ha_tianmu::rnd_next(uchar *buf) {
H
hustjieke 已提交
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070
  DBUG_ENTER(__PRETTY_FUNCTION__);

  int ret = HA_ERR_END_OF_FILE;
  try {
    table->status = 0;
    if (fill_row(buf) == HA_ERR_END_OF_FILE) {
      table->status = STATUS_NOT_FOUND;
      DBUG_RETURN(ret);
    }
    ret = 0;
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1071
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1072 1073
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1074
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1075 1076 1077 1078 1079 1080 1081 1082 1083
  }

  DBUG_RETURN(ret);
}

/*
 position() is called after each call to rnd_next() if the data needs
 to be ordered. You can do something like the following to store
 the position:
1084
 my_store_ptr(ref, ref_length, current_position_);
H
hustjieke 已提交
1085 1086

 The server uses ref to store data. ref_length in the above case is
1087
 the size needed to store current_position_. ref is just a byte array
H
hustjieke 已提交
1088
 that the server will maintain. If you are using offsets to mark rows, then
1089
 current_position_ should be the offset. If it is a primary key like in
H
hustjieke 已提交
1090 1091 1092 1093
 BDB, then it needs to be a primary key.

 Called from filesort.cc, sql_select.cc, sql_delete.cc and sql_update.cc.
 */
1094
void ha_tianmu::position([[maybe_unused]] const uchar *record) {
H
hustjieke 已提交
1095 1096
  DBUG_ENTER(__PRETTY_FUNCTION__);

1097
  my_store_ptr(ref, ref_length, current_position_);
H
hustjieke 已提交
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108

  DBUG_VOID_RETURN;
}

/*
 This is like rnd_next, but you are given a position to use
 to determine the row. The position will be of the type that you stored in
 ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key
 or position you saved when position() was called.
 Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
 */
1109
int ha_tianmu::rnd_pos(uchar *buf, uchar *pos) {
H
hustjieke 已提交
1110 1111 1112 1113 1114 1115
  DBUG_ENTER(__PRETTY_FUNCTION__);

  int ret = HA_ERR_END_OF_FILE;
  try {
    uint64_t position = my_get_ptr(pos, ref_length);

1116 1117 1118
    filter_ptr_ = std::make_unique<core::Filter>(position + 1, share_->PackSizeShift());
    filter_ptr_->Reset();
    filter_ptr_->Set(position);
H
hustjieke 已提交
1119

1120 1121 1122 1123
    auto tab_ptr = ha_rcengine_->GetTx(table->in_use)->GetTableByPath(table_name_);
    table_new_iter_ = tab_ptr->Begin(GetAttrsUseIndicator(table), *filter_ptr_);
    table_new_iter_end_ = tab_ptr->End();
    table_ptr_ = tab_ptr.get();
H
hustjieke 已提交
1124

1125
    table_new_iter_.MoveToRow(position);
H
hustjieke 已提交
1126
    table->status = 0;
1127
    blob_buffers_.resize(table->s->fields);
H
hustjieke 已提交
1128 1129 1130 1131 1132 1133 1134
    if (fill_row(buf) == HA_ERR_END_OF_FILE) {
      table->status = STATUS_NOT_FOUND;
      DBUG_RETURN(ret);
    }
    ret = 0;
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1135
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1136 1137
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1138
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
  }

  DBUG_RETURN(ret);
}

/*
 extra() is called whenever the server wishes to send a hint to
 the storage engine. The myisam engine implements the most hints.
 ha_innodb.cc has the most exhaustive list of these hints.
 */
1149
int ha_tianmu::extra(enum ha_extra_function operation) {
H
hustjieke 已提交
1150 1151
  DBUG_ENTER(__PRETTY_FUNCTION__);
  /* This preemptive delete might cause problems here.
1152
   * Other place where it can be put is ha_tianmu::external_lock().
H
hustjieke 已提交
1153 1154
   */
  if (operation == HA_EXTRA_NO_CACHE) {
1155 1156
    cq_.reset();
    query_.reset();
H
hustjieke 已提交
1157 1158 1159 1160
  }
  DBUG_RETURN(0);
}

1161
int ha_tianmu::start_stmt(THD *thd, thr_lock_type lock_type) {
H
hustjieke 已提交
1162 1163
  try {
    if (lock_type == TL_WRITE_CONCURRENT_INSERT || lock_type == TL_WRITE_DEFAULT || lock_type == TL_WRITE) {
1164
      trans_register_ha(thd, false, rcbase_hton, nullptr);
1165
      if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
1166
        trans_register_ha(thd, true, rcbase_hton, nullptr);
1167 1168
      }
      current_txn_ = ha_rcengine_->GetTx(thd);
1169
      current_txn_->AddTableWRIfNeeded(share_);
H
hustjieke 已提交
1170 1171 1172
    }
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1173
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1174 1175
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1176
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1177 1178 1179 1180 1181 1182 1183 1184 1185
  }
  return 0;
}

/*
 Ask rcbase handler about permission to cache table during query registration.
 If current thread is in non-autocommit, we don't permit any mysql query
 caching.
 */
1186 1187
my_bool ha_tianmu::register_query_cache_table(THD *thd, char *table_key, size_t key_length,
                                              qc_engine_callback *call_back, [[maybe_unused]] ulonglong *engine_data) {
H
hustjieke 已提交
1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
  *call_back = rcbase_query_caching_of_table_permitted;
  return rcbase_query_caching_of_table_permitted(thd, table_key, key_length, 0);
}

/*
 Used to delete a table. By the time delete_table() has been called all
 opened references to this table will have been closed (and your globally
 shared references released. The variable name will just be the name of
 the table. You will need to remove any files you have created at this point.

 If you do not implement this, the default delete_table() is called from
 handler.cc and it will delete all files with the file extentions returned
 by bas_ext().

 Called from handler.cc by delete_table and  ha_create_table(). Only used
 during create if the table_flag HA_DROP_BEFORE_CREATE was specified for
 the storage engine.
 */
1206
int ha_tianmu::delete_table(const char *name) {
H
hustjieke 已提交
1207 1208 1209
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int ret = 1;
  try {
1210
    ha_rcengine_->DeleteTable(name, ha_thd());
H
hustjieke 已提交
1211 1212 1213
    ret = 0;
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1214
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1215 1216
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1217
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
  }

  DBUG_RETURN(ret);
}

/*
 Given a starting key, and an ending key estimate the number of rows that
 will exist between the two. end_key may be empty which in case determine
 if start_key matches any rows.

 Called from opt_range.cc by check_quick_keys().
 */
1230 1231
ha_rows ha_tianmu::records_in_range([[maybe_unused]] uint inx, [[maybe_unused]] key_range *min_key,
                                    [[maybe_unused]] key_range *max_key) {
H
hustjieke 已提交
1232
  DBUG_ENTER(__PRETTY_FUNCTION__);
1233
  DBUG_RETURN(stats.records);  // low number to force index usage
H
hustjieke 已提交
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245
}

/*
 create() is called to create a database. The variable name will have the name
 of the table. When create() is called you do not need to worry about opening
 the table. Also, the FRM file will have already been created so adjusting
 create_info will not do you any good. You can overwrite the frm file at this
 point if you wish to change the table definition, but there are no methods
 currently provided for doing that.

 Called from handle.cc by ha_create_table().
 */
1246
int ha_tianmu::create(const char *name, TABLE *table_arg, [[maybe_unused]] HA_CREATE_INFO *create_info) {
H
hustjieke 已提交
1247 1248
  DBUG_ENTER(__PRETTY_FUNCTION__);
  try {
1249 1250 1251 1252 1253
    // fix issue 487: bug for create table #mysql50#q.q should return failure and actually return success
    const size_t table_name_len = strlen(name);
    if (name[table_name_len - 1] == '/') {
      TIANMU_LOG(LogCtl_Level::ERROR, "Table name is empty");
      DBUG_RETURN(ER_WRONG_TABLE_NAME);
1254 1255
    }

1256
    ha_rcengine_->CreateTable(name, table_arg);
H
hustjieke 已提交
1257 1258 1259 1260 1261 1262
    DBUG_RETURN(0);
  } catch (common::AutoIncException &e) {
    my_message(ER_WRONG_AUTO_KEY, e.what(), MYF(0));
  } catch (common::UnsupportedDataTypeException &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
  } catch (fs::filesystem_error &e) {
1263 1264
    TIANMU_LOG(LogCtl_Level::ERROR, "filesystem_error on table creation '%s'", e.what());
    fs::remove_all(std::string(name) + common::TIANMU_EXT);
H
hustjieke 已提交
1265 1266
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1267
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1268 1269
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1270
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1271 1272 1273 1274 1275
  }

  DBUG_RETURN(1);
}

1276
int ha_tianmu::truncate() {
1277 1278
  DBUG_ENTER(__PRETTY_FUNCTION__);
  int ret = 1;
H
hustjieke 已提交
1279
  try {
1280
    ha_rcengine_->TruncateTable(table_name_, ha_thd());
1281
    ret = 0;
H
hustjieke 已提交
1282
  } catch (std::exception &e) {
1283
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1284
  } catch (...) {
1285
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1286
  }
1287
  DBUG_RETURN(ret);
H
hustjieke 已提交
1288 1289
}

1290
uint ha_tianmu::max_supported_key_part_length(HA_CREATE_INFO *create_info) const {
1291 1292 1293 1294 1295 1296
  if (tianmu_sysvar_large_prefix)
    return (Tianmu::common::TIANMU_MAX_INDEX_COL_LEN_LARGE);
  else
    return (Tianmu::common::TIANMU_MAX_INDEX_COL_LEN_SMALL);
}

1297
int ha_tianmu::fill_row(uchar *buf) {
1298 1299
  if (table_new_iter_ == table_new_iter_end_)
    return HA_ERR_END_OF_FILE;
H
hustjieke 已提交
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312

  my_bitmap_map *org_bitmap = dbug_tmp_use_all_columns(table, table->write_set);

  std::shared_ptr<char[]> buffer;

  // we should pack the row into `buf` but seems it just use record[0] blindly.
  // So this is a workaround to handle the case that `buf` is not record[0].
  if (buf != table->record[0]) {
    buffer.reset(new char[table->s->reclength]);
    std::memcpy(buffer.get(), table->record[0], table->s->reclength);
  }

  for (uint col_id = 0; col_id < table->s->fields; col_id++)
1313
    core::Engine::ConvertToField(table->field[col_id], *(table_new_iter_.GetData(col_id)), &blob_buffers_[col_id]);
H
hustjieke 已提交
1314 1315 1316 1317 1318 1319

  if (buf != table->record[0]) {
    std::memcpy(buf, table->record[0], table->s->reclength);
    std::memcpy(table->record[0], buffer.get(), table->s->reclength);
  }

1320 1321
  current_position_ = table_new_iter_.GetCurrentRowId();
  table_new_iter_++;
H
hustjieke 已提交
1322 1323 1324 1325 1326 1327

  dbug_tmp_restore_column_map(table->write_set, org_bitmap);

  return 0;
}

1328
char *ha_tianmu::update_table_comment(const char *comment) {
H
hustjieke 已提交
1329 1330 1331
  char *ret = const_cast<char *>(comment);
  try {
    uint length = (uint)std::strlen(comment);
1332
    char *str = nullptr;
H
hustjieke 已提交
1333 1334 1335 1336 1337 1338 1339 1340
    uint extra_len = 0;

    if (length > 64000 - 3) {
      return ((char *)comment);  // string too long
    }

    //  get size & ratio
    int64_t sum_c = 0, sum_u = 0;
1341
    std::vector<core::AttrInfo> attr_info = ha_rcengine_->GetTableAttributesInfo(table_name_, table_share);
H
hustjieke 已提交
1342 1343 1344 1345
    for (uint j = 0; j < attr_info.size(); j++) {
      sum_c += attr_info[j].comp_size;
      sum_u += attr_info[j].uncomp_size;
    }
1346
    char buf[256] = {0};
H
hustjieke 已提交
1347 1348 1349 1350
    double ratio = (sum_c > 0 ? double(sum_u) / double(sum_c) : 0);
    int count = std::sprintf(buf, "Overall compression ratio: %.3f, Raw size=%ld MB", ratio, sum_u >> 20);
    extra_len += count;

1351
    str = (char *)my_malloc(PSI_NOT_INSTRUMENTED, length + extra_len + 3, MYF(0));
H
hustjieke 已提交
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
    if (str) {
      char *pos = str + length;
      if (length) {
        std::memcpy(str, comment, length);
        *pos++ = ';';
        *pos++ = ' ';
      }

      std::memcpy(pos, buf, extra_len);
      pos[extra_len] = 0;
    }
    ret = str;
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1366
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1367 1368
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1369
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1370 1371 1372 1373 1374
  }

  return ret;
}

1375
bool ha_tianmu::explain_message(const Item *a_cond, String *buf) {
H
hustjieke 已提交
1376
  DBUG_ENTER(__PRETTY_FUNCTION__);
1377
  if (current_txn_->Explain()) {
H
hustjieke 已提交
1378
    cond_push(a_cond);
1379
    std::string str = current_txn_->GetExplainMsg();
H
hustjieke 已提交
1380 1381 1382 1383 1384
    buf->append(str.c_str(), str.length());
  }
  DBUG_RETURN(TRUE);
}

1385
int ha_tianmu::set_cond_iter() {
H
hustjieke 已提交
1386
  int ret = 1;
1387 1388 1389
  if (query_ && !result_ && table_ptr_->NumOfObj() != 0) {
    cq_->Result(tmp_table_);  // it is ALWAYS -2 though....
    result_ = true;
H
hustjieke 已提交
1390 1391

    try {
1392 1393
      core::FunctionExecutor fe(std::bind(&core::Query::LockPackInfoForUse, std::ref(query_)),
                                std::bind(&core::Query::UnlockPackInfoFromUse, std::ref(query_)));
H
hustjieke 已提交
1394

1395
      core::TempTable *push_down_result = query_->Preexecute(*cq_, nullptr, false);
1396
      if (!push_down_result || push_down_result->NumOfTables() != 1)
H
hustjieke 已提交
1397 1398 1399 1400
        throw common::InternalException("core::Query execution returned no result object");

      core::Filter *filter(push_down_result->GetMultiIndexP()->GetFilter(0));
      if (!filter)
1401 1402
        filter_ptr_.reset(
            new core::Filter(push_down_result->GetMultiIndexP()->OrigSize(0), table_ptr_->Getpackpower()));
H
hustjieke 已提交
1403
      else
1404
        filter_ptr_.reset(new core::Filter(*filter));
H
hustjieke 已提交
1405

1406 1407 1408
      table_ptr_ = push_down_result->GetTableP(0);
      table_new_iter_ = ((core::RCTable *)table_ptr_)->Begin(GetAttrsUseIndicator(table), *filter_ptr_);
      table_new_iter_end_ = ((core::RCTable *)table_ptr_)->End();
H
hustjieke 已提交
1409 1410
      ret = 0;
    } catch (common::Exception const &e) {
1411
      rc_control_ << system::lock << "Error in push-down execution, push-down execution aborted: " << e.what()
1412
                  << system::unlock;
1413
      TIANMU_LOG(LogCtl_Level::ERROR, "Error in push-down execution, push-down execution aborted: %s", e.what());
H
hustjieke 已提交
1414
    }
1415 1416
    query_.reset();
    cq_.reset();
H
hustjieke 已提交
1417 1418 1419 1420
  }
  return ret;
}

1421
const Item *ha_tianmu::cond_push(const Item *a_cond) {
H
hustjieke 已提交
1422 1423 1424 1425
  Item const *ret = a_cond;
  Item *cond = const_cast<Item *>(a_cond);

  try {
1426
    if (!query_) {
H
hustjieke 已提交
1427
      std::shared_ptr<core::RCTable> rctp;
1428
      ha_rcengine_->GetTableIterator(table_name_, table_new_iter_, table_new_iter_end_, rctp,
1429
                                     GetAttrsUseIndicator(table), table->in_use);
1430 1431 1432 1433
      table_ptr_ = rctp.get();
      query_.reset(new core::Query(current_txn_));
      cq_.reset(new core::CompiledQuery);
      result_ = false;
H
hustjieke 已提交
1434

1435
      query_->AddTable(rctp);
H
hustjieke 已提交
1436
      core::TabID t_out;
1437 1438 1439
      cq_->TableAlias(t_out,
                      core::TabID(0));  // we apply it to the only table in this query
      cq_->TmpTable(tmp_table_, t_out);
H
hustjieke 已提交
1440 1441 1442 1443 1444 1445 1446

      std::string ext_alias;
      if (table->pos_in_table_list->referencing_view)
        ext_alias = std::string(table->pos_in_table_list->referencing_view->table_name);
      else
        ext_alias = std::string(table->s->table_name.str);
      ext_alias += std::string(":") + std::string(table->alias);
1447
      query_->table_alias2index_ptr.insert(std::make_pair(ext_alias, std::make_pair(t_out.n, table)));
H
hustjieke 已提交
1448 1449 1450

      int col_no = 0;
      core::AttrID col, vc;
1451
      core::TabID tab(tmp_table_);
H
hustjieke 已提交
1452 1453 1454 1455 1456 1457

      my_bitmap_map *org_bitmap = dbug_tmp_use_all_columns(table, table->read_set);
      for (Field **field = table->field; *field; field++) {
        core::AttrID at;
        if (bitmap_is_set(table->read_set, col_no)) {
          col.n = col_no++;
1458 1459 1460
          cq_->CreateVirtualColumn(vc.n, tmp_table_, t_out, col);
          cq_->AddColumn(at, tmp_table_, core::CQTerm(vc.n), common::ColOperation::LISTING, (*field)->field_name,
                         false);
H
hustjieke 已提交
1461 1462 1463 1464 1465
        }
      }
      dbug_tmp_restore_column_map(table->read_set, org_bitmap);
    }

1466 1467
    if (result_)
      return a_cond;  // if result_ there is already a result command in
H
hustjieke 已提交
1468 1469
                      // compilation

1470
    std::unique_ptr<core::CompiledQuery> tmp_cq(new core::CompiledQuery(*cq_));
H
hustjieke 已提交
1471
    core::CondID cond_id;
1472 1473
    if (!query_->BuildConditions(cond, cond_id, tmp_cq.get(), tmp_table_, core::CondType::WHERE_COND, false)) {
      query_.reset();
H
hustjieke 已提交
1474 1475
      return a_cond;
    }
1476 1477 1478 1479
    tmp_cq->AddConds(tmp_table_, cond_id, core::CondType::WHERE_COND);
    tmp_cq->ApplyConds(tmp_table_);
    cq_.reset(tmp_cq.release());
    // reset  table_new_iter_ with push condition
H
hustjieke 已提交
1480 1481 1482 1483
    set_cond_iter();
    ret = 0;
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1484
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1485 1486
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1487
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1488 1489 1490 1491
  }
  return ret;
}

1492
int ha_tianmu::reset() {
H
hustjieke 已提交
1493 1494 1495 1496
  DBUG_ENTER(__PRETTY_FUNCTION__);

  int ret = 1;
  try {
1497 1498 1499 1500 1501 1502 1503 1504
    table_new_iter_ = core::RCTable::Iterator();
    table_new_iter_end_ = core::RCTable::Iterator();
    table_ptr_ = nullptr;
    filter_ptr_.reset();
    query_.reset();
    cq_.reset();
    result_ = false;
    blob_buffers_.resize(0);
H
hustjieke 已提交
1505 1506 1507
    ret = 0;
  } catch (std::exception &e) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0));
1508
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1509 1510
  } catch (...) {
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0));
1511
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1512 1513 1514 1515 1516
  }

  DBUG_RETURN(ret);
}

1517 1518
enum_alter_inplace_result ha_tianmu::check_if_supported_inplace_alter([[maybe_unused]] TABLE *altered_table,
                                                                      Alter_inplace_info *ha_alter_info) {
1519
  DBUG_ENTER(__PRETTY_FUNCTION__);
1520 1521
  if ((ha_alter_info->handler_flags & ~TIANMU_SUPPORTED_ALTER_ADD_DROP_ORDER) &&
      (ha_alter_info->handler_flags != TIANMU_SUPPORTED_ALTER_COLUMN_NAME)) {
1522
    // support alter table column type
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532
    if (ha_alter_info->handler_flags & Alter_inplace_info::ALTER_STORED_COLUMN_TYPE)
      DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
    // support alter table column exceeded length
    if ((ha_alter_info->handler_flags & Alter_inplace_info::ALTER_COLUMN_EQUAL_PACK_LENGTH))
      DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
    // support alter table column default
    if (ha_alter_info->handler_flags & Alter_inplace_info::ALTER_COLUMN_DEFAULT)
      DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);

    DBUG_RETURN(HA_ALTER_ERROR);
H
hustjieke 已提交
1533
  }
1534
  DBUG_RETURN(HA_ALTER_INPLACE_EXCLUSIVE_LOCK);
H
hustjieke 已提交
1535 1536
}

1537
bool ha_tianmu::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
H
hustjieke 已提交
1538
  try {
1539
    if (!(ha_alter_info->handler_flags & ~TIANMU_SUPPORTED_ALTER_ADD_DROP_ORDER)) {
H
hustjieke 已提交
1540 1541
      std::vector<Field *> v_old(table_share->field, table_share->field + table_share->fields);
      std::vector<Field *> v_new(altered_table->s->field, altered_table->s->field + altered_table->s->fields);
1542
      ha_rcengine_->PrepareAlterTable(table_name_, v_new, v_old, ha_thd());
H
hustjieke 已提交
1543
      return false;
1544
    } else if (ha_alter_info->handler_flags == TIANMU_SUPPORTED_ALTER_COLUMN_NAME) {
H
hustjieke 已提交
1545 1546 1547
      return false;
    }
  } catch (std::exception &e) {
1548
    TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught: %s", e.what());
H
hustjieke 已提交
1549
  } catch (...) {
1550
    TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
H
hustjieke 已提交
1551 1552 1553 1554 1555 1556 1557
  }

  my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "Unable to inplace alter table", MYF(0));

  return true;
}

1558 1559
bool ha_tianmu::commit_inplace_alter_table([[maybe_unused]] TABLE *altered_table, Alter_inplace_info *ha_alter_info,
                                           bool commit) {
H
hustjieke 已提交
1560
  if (!commit) {
1561
    TIANMU_LOG(LogCtl_Level::INFO, "Alter table failed : %s%s", table_name_.c_str(), " rollback");
H
hustjieke 已提交
1562 1563
    return true;
  }
1564
  if (ha_alter_info->handler_flags == TIANMU_SUPPORTED_ALTER_COLUMN_NAME) {
H
hustjieke 已提交
1565 1566
    return false;
  }
1567 1568
  if ((ha_alter_info->handler_flags & ~TIANMU_SUPPORTED_ALTER_ADD_DROP_ORDER)) {
    TIANMU_LOG(LogCtl_Level::INFO, "Altered table not support type %lu", ha_alter_info->handler_flags);
H
hustjieke 已提交
1569 1570
    return true;
  }
1571 1572 1573
  fs::path tmp_dir(table_name_ + ".tmp");
  fs::path tab_dir(table_name_ + common::TIANMU_EXT);
  fs::path bak_dir(table_name_ + ".backup");
H
hustjieke 已提交
1574 1575 1576 1577 1578 1579 1580 1581 1582

  try {
    fs::rename(tab_dir, bak_dir);
    fs::rename(tmp_dir, tab_dir);

    // now we are safe to clean up
    std::unordered_set<std::string> s;
    for (auto &it : fs::directory_iterator(tab_dir / common::COLUMN_DIR)) {
      auto target = fs::read_symlink(it.path()).string();
1583 1584
      if (!target.empty())
        s.insert(target);
H
hustjieke 已提交
1585 1586 1587 1588
    }

    for (auto &it : fs::directory_iterator(bak_dir / common::COLUMN_DIR)) {
      auto target = fs::read_symlink(it.path()).string();
1589 1590
      if (target.empty())
        continue;
H
hustjieke 已提交
1591 1592 1593
      auto search = s.find(target);
      if (search == s.end()) {
        fs::remove_all(target);
1594
        TIANMU_LOG(LogCtl_Level::INFO, "removing %s", target.c_str());
H
hustjieke 已提交
1595 1596 1597 1598
      }
    }
    fs::remove_all(bak_dir);
  } catch (fs::filesystem_error &e) {
1599
    TIANMU_LOG(LogCtl_Level::ERROR, "file system error: %s %s|%s", e.what(), e.path1().string().c_str(),
1600
               e.path2().string().c_str());
H
hustjieke 已提交
1601 1602 1603 1604 1605 1606 1607 1608 1609
    my_message(static_cast<int>(common::ErrorCode::UNKNOWN_ERROR), "Failed to commit alter table", MYF(0));
    return true;
  }
  return false;
}
/*
 key: mysql format, may be union key, need changed to kvstore key format

 */
1610 1611
void ha_tianmu::key_convert(const uchar *key, uint key_len, std::vector<uint> cols,
                            std::vector<std::string_view> &keys) {
H
hustjieke 已提交
1612 1613 1614 1615 1616 1617 1618 1619
  key_restore(table->record[0], (uchar *)key, &table->key_info[active_index], key_len);

  Field **field = table->field;
  std::vector<std::string> records;
  for (auto &i : cols) {
    Field *f = field[i];
    size_t length;
    if (f->is_null()) {
1620
      throw common::Exception("Priamry key part can not be nullptr");
H
hustjieke 已提交
1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632
    }
    if (f->flags & BLOB_FLAG)
      length = dynamic_cast<Field_blob *>(f)->get_length();
    else
      length = f->row_pack_length();

    std::unique_ptr<char[]> buf(new char[length]);
    char *ptr = buf.get();

    switch (f->type()) {
      case MYSQL_TYPE_TINY: {
        int64_t v = f->val_int();
1633 1634 1635 1636
        if (v > TIANMU_TINYINT_MAX)
          v = TIANMU_TINYINT_MAX;
        else if (v < TIANMU_TINYINT_MIN)
          v = TIANMU_TINYINT_MIN;
H
hustjieke 已提交
1637 1638 1639 1640 1641
        *(int64_t *)ptr = v;
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_SHORT: {
        int64_t v = f->val_int();
1642 1643 1644 1645
        if (v > TIANMU_SMALLINT_MAX)
          v = TIANMU_SMALLINT_MAX;
        else if (v < TIANMU_SMALLINT_MIN)
          v = TIANMU_SMALLINT_MIN;
H
hustjieke 已提交
1646 1647 1648 1649 1650 1651 1652
        *(int64_t *)ptr = v;
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_LONG: {
        int64_t v = f->val_int();
        if (v > std::numeric_limits<int>::max())
          v = std::numeric_limits<int>::max();
1653 1654
        else if (v < TIANMU_INT_MIN)
          v = TIANMU_INT_MIN;
H
hustjieke 已提交
1655 1656 1657 1658 1659
        *(int64_t *)ptr = v;
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_INT24: {
        int64_t v = f->val_int();
1660 1661 1662 1663
        if (v > TIANMU_MEDIUMINT_MAX)
          v = TIANMU_MEDIUMINT_MAX;
        else if (v < TIANMU_MEDIUMINT_MIN)
          v = TIANMU_MEDIUMINT_MIN;
H
hustjieke 已提交
1664 1665 1666 1667 1668
        *(int64_t *)ptr = v;
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_LONGLONG: {
        int64_t v = f->val_int();
1669 1670 1671 1672
        if (v > common::TIANMU_BIGINT_MAX)
          v = common::TIANMU_BIGINT_MAX;
        else if (v < common::TIANMU_BIGINT_MIN)
          v = common::TIANMU_BIGINT_MIN;
H
hustjieke 已提交
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756
        *(int64_t *)ptr = v;
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_DECIMAL:
      case MYSQL_TYPE_FLOAT:
      case MYSQL_TYPE_DOUBLE: {
        double v = f->val_real();
        *(int64_t *)ptr = *reinterpret_cast<int64_t *>(&v);
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_NEWDECIMAL: {
        auto dec_f = dynamic_cast<Field_new_decimal *>(f);
        *(int64_t *)ptr = std::lround(dec_f->val_real() * types::PowOfTen(dec_f->dec));
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_TIME:
      case MYSQL_TYPE_TIME2:
      case MYSQL_TYPE_DATE:
      case MYSQL_TYPE_DATETIME:
      case MYSQL_TYPE_NEWDATE:
      case MYSQL_TYPE_TIMESTAMP2:
      case MYSQL_TYPE_DATETIME2: {
        MYSQL_TIME my_time;
        std::memset(&my_time, 0, sizeof(my_time));
        f->get_time(&my_time);
        types::DT dt(my_time);
        *(int64_t *)ptr = dt.val;
        ptr += sizeof(int64_t);
      } break;

      case MYSQL_TYPE_TIMESTAMP: {
        MYSQL_TIME my_time;
        std::memset(&my_time, 0, sizeof(my_time));
        f->get_time(&my_time);
        auto saved = my_time.second_part;
        // convert to UTC
        if (!common::IsTimeStampZero(my_time)) {
          my_bool myb;
          my_time_t secs_utc = current_thd->variables.time_zone->TIME_to_gmt_sec(&my_time, &myb);
          common::GMTSec2GMTTime(&my_time, secs_utc);
        }
        my_time.second_part = saved;
        types::DT dt(my_time);
        *(int64_t *)ptr = dt.val;
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_YEAR:  // what the hell?
      {
        types::DT dt = {};
        dt.year = f->val_int();
        *(int64_t *)ptr = dt.val;
        ptr += sizeof(int64_t);
      } break;
      case MYSQL_TYPE_VARCHAR:
      case MYSQL_TYPE_TINY_BLOB:
      case MYSQL_TYPE_MEDIUM_BLOB:
      case MYSQL_TYPE_LONG_BLOB:
      case MYSQL_TYPE_BLOB:
      case MYSQL_TYPE_VAR_STRING:
      case MYSQL_TYPE_STRING: {
        String str;
        f->val_str(&str);
        *(uint32_t *)ptr = str.length();
        ptr += sizeof(uint32_t);
        std::memcpy(ptr, str.ptr(), str.length());
        ptr += str.length();
      } break;
      case MYSQL_TYPE_SET:
      case MYSQL_TYPE_ENUM:
      case MYSQL_TYPE_GEOMETRY:
      case MYSQL_TYPE_NULL:
      case MYSQL_TYPE_BIT:
      default:
        throw common::Exception("unsupported mysql type " + std::to_string(f->type()));
        break;
    }
    records.emplace_back((const char *)buf.get(), ptr - buf.get());
  }

  for (auto &elem : records) {
    keys.emplace_back(elem.data(), elem.size());
  }
}

1757
}  // namespace handler
1758
}  // namespace Tianmu