From d84bce42aac5674c944d9fda4fa8e755a51177e0 Mon Sep 17 00:00:00 2001 From: KyrielightWei Date: Fri, 19 Apr 2024 09:13:57 +0000 Subject: [PATCH] Prohibit dup table local read on a election expired leader --- mittest/multi_replica/CMakeLists.txt | 1 + .../env/ob_multi_replica_test_base.cpp | 21 +- .../env/ob_multi_replica_test_base.h | 1 + .../multi_replica/env/ob_multi_replica_util.h | 14 +- .../multi_replica/env/ob_simple_replica.cpp | 2 +- ...test_max_commit_ts_read_from_dup_table.cpp | 432 ++++++++++++++++++ src/observer/omt/ob_worker_processor.cpp | 6 +- src/storage/tx/ob_dup_table_tablets.cpp | 4 +- src/storage/tx/ob_dup_table_util.cpp | 23 +- src/storage/tx/ob_dup_table_util.h | 2 + src/storage/tx/ob_trans_service_v4.cpp | 8 + 11 files changed, 496 insertions(+), 18 deletions(-) create mode 100644 mittest/multi_replica/test_max_commit_ts_read_from_dup_table.cpp diff --git a/mittest/multi_replica/CMakeLists.txt b/mittest/multi_replica/CMakeLists.txt index a5d35666f1..65476d0e41 100644 --- a/mittest/multi_replica/CMakeLists.txt +++ b/mittest/multi_replica/CMakeLists.txt @@ -26,4 +26,5 @@ ob_unittest_multi_replica(test_ob_dup_table_restart) ob_unittest_multi_replica(test_ob_dup_table_leader_switch) ob_unittest_multi_replica(test_ob_dup_table_tablet_gc) ob_unittest_multi_replica(test_ob_standby_read_basic) +ob_unittest_multi_replica(test_max_commit_ts_read_from_dup_table) ob_unittest_multi_replica(test_mds_replay_from_ctx_table) diff --git a/mittest/multi_replica/env/ob_multi_replica_test_base.cpp b/mittest/multi_replica/env/ob_multi_replica_test_base.cpp index 310aac1669..b9743c5168 100644 --- a/mittest/multi_replica/env/ob_multi_replica_test_base.cpp +++ b/mittest/multi_replica/env/ob_multi_replica_test_base.cpp @@ -582,7 +582,7 @@ int ObMultiReplicaTestBase::init_test_replica_(const int zone_id) int ObMultiReplicaTestBase::read_cur_json_document_(rapidjson::Document &json_doc) { int ret = OB_SUCCESS; - FILE *fp = fopen(event_file_path_.c_str(), "r"); + FILE *fp = fopen(event_file_path_.c_str(), "rb"); if (fp == NULL) { if (json_doc.IsObject()) { fprintf(stdout, "Fail to open file! file_path = %s\n", event_file_path_.c_str()); @@ -591,11 +591,18 @@ int ObMultiReplicaTestBase::read_cur_json_document_(rapidjson::Document &json_do return ret; } - char read_buffer[2 * 1024 * 1024]; + char read_buffer[4 * 1024]; rapidjson::FileReadStream rs(fp, read_buffer, sizeof(read_buffer)); json_doc.ParseStream(rs); + if (json_doc.HasParseError()) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(WARN, "[ObMultiReplicaTestBase] Parse EVENT JSON ERROR", K(ret), + K(json_doc.GetParseError())); + fprintf(stdout, "Parse Event Json Error\n"); + } + fclose(fp); return OB_SUCCESS; @@ -643,7 +650,7 @@ int ObMultiReplicaTestBase::wait_event_finish(const std::string &event_name, ret = OB_TIMEOUT; break; } else { - ob_usleep(retry_interval_ms * 1000); + usleep(retry_interval_ms * 1000); } } else { break; @@ -670,7 +677,7 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name, if (OB_SUCC(ret)) { FILE *fp = fopen(event_file_path_.c_str(), "w"); - char write_buffer[2 * 1024 * 1024]; + char write_buffer[4 * 1024]; rapidjson::FileWriteStream file_w_stream(fp, write_buffer, sizeof(write_buffer)); rapidjson::PrettyWriter prettywriter(file_w_stream); json_doc.AddMember(rapidjson::StringRef(event_name.c_str(), event_name.size()), @@ -680,8 +687,8 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name, fclose(fp); } - fprintf(stdout, "[WAIT EVENT] write target event : EVENT_KEY = %s; EVENT_VAL = %s\n", - event_name.c_str(), event_content.c_str()); + fprintf(stdout, "[WAIT EVENT] write target event : EVENT_KEY = %s; EVENT_VAL = %s\n", + event_name.c_str(), event_content.c_str()); SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] write target event", K(event_name.c_str()), K(event_content.c_str())); return ret; @@ -898,7 +905,7 @@ int ::oceanbase::omt::ObWorkerProcessor::process_err_test() if (ATOMIC_LOAD(&::oceanbase::unittest::ObMultiReplicaTestBase::block_msg_)) { ret = OB_EAGAIN; - SERVER_LOG(INFO, "[ObMultiReplicaTestBase] block msg process", K(ret)); + SERVER_LOG(INFO, "[ERRSIM] block msg process", K(ret)); } return ret; diff --git a/mittest/multi_replica/env/ob_multi_replica_test_base.h b/mittest/multi_replica/env/ob_multi_replica_test_base.h index 5381a3c790..d55e3c55cb 100644 --- a/mittest/multi_replica/env/ob_multi_replica_test_base.h +++ b/mittest/multi_replica/env/ob_multi_replica_test_base.h @@ -72,6 +72,7 @@ public: observer::ObSimpleServerReplica &get_curr_simple_server() { return *replica_; } static int read_cur_json_document_(rapidjson::Document & json_doc); + static int wait_event_finish(const std::string &event_name, std::string &event_content, int64_t wait_timeout_ms, diff --git a/mittest/multi_replica/env/ob_multi_replica_util.h b/mittest/multi_replica/env/ob_multi_replica_util.h index 9b47a4ed8f..8e0d4a72fe 100644 --- a/mittest/multi_replica/env/ob_multi_replica_util.h +++ b/mittest/multi_replica/env/ob_multi_replica_util.h @@ -234,23 +234,26 @@ namespace unittest common::ObString trace_id; \ common::ObString query_sql; \ int64_t request_time = 0; \ + int64_t snapshot = 0; \ int64_t ret_code = OB_SUCCESS; \ int64_t retry_cnt = 0; \ ASSERT_EQ(true, conn != nullptr); \ - std::string sql_str = \ - "select TX_ID, TRACE_ID, REQUEST_TIME, RET_CODE, RETRY_CNT, QUERY_SQL from " \ - "oceanbase.V$OB_SQL_AUDIT where QUERY_SQL like " \ - + std::string(" \"") + std::string(sql) + std::string("\" order by REQUEST_TIME DESC"); \ + std::string sql_str = "select TX_ID, SNAPSHOT_VERSION, TRACE_ID, REQUEST_TIME, RET_CODE, " \ + "RETRY_CNT, QUERY_SQL from " \ + "oceanbase.V$OB_SQL_AUDIT where QUERY_SQL like " \ + + std::string(" \"") + std::string(sql) \ + + std::string("\" order by REQUEST_TIME DESC"); \ READ_SQL_BY_CONN(conn, process_result, sql_str.c_str()); \ ASSERT_EQ(OB_SUCCESS, process_result->next()); \ ASSERT_EQ(OB_SUCCESS, process_result->get_int("TX_ID", tx_id)); \ + ASSERT_EQ(OB_SUCCESS, process_result->get_int("SNAPSHOT_VERSION", snapshot)); \ ASSERT_EQ(OB_SUCCESS, process_result->get_varchar("TRACE_ID", trace_id)); \ ASSERT_EQ(OB_SUCCESS, process_result->get_int("REQUEST_TIME", request_time)); \ ASSERT_EQ(OB_SUCCESS, process_result->get_int("RET_CODE", ret_code)); \ ASSERT_EQ(OB_SUCCESS, process_result->get_int("RETRY_CNT", retry_cnt)); \ ASSERT_EQ(OB_SUCCESS, process_result->get_varchar("QUERY_SQL", query_sql)); \ SERVER_LOG(INFO, "[ObMultiReplicaTestBase] query sql_audit for tx_id", K(trace_id), K(tx_id), \ - K(request_time), K(ret_code), K(retry_cnt), K(query_sql)); \ + K(snapshot), K(request_time), K(ret_code), K(retry_cnt), K(query_sql)); \ } #define PREPARE_CONN_ENV(conn) \ @@ -476,6 +479,7 @@ public: } }; + } // namespace unittest } // namespace oceanbase diff --git a/mittest/multi_replica/env/ob_simple_replica.cpp b/mittest/multi_replica/env/ob_simple_replica.cpp index 26459c0d30..83bfe7535a 100644 --- a/mittest/multi_replica/env/ob_simple_replica.cpp +++ b/mittest/multi_replica/env/ob_simple_replica.cpp @@ -149,7 +149,7 @@ int ObSimpleServerReplica::simple_init() + ",memory_limit=" + std::string(memory_limit_) + ",cache_wash_threshold=1G,net_thread_count=4,cpu_count=16,schema_history_expire_time=" "1d,workers_per_cpu_quota=10,datafile_disk_percentage=2,__min_full_resource_pool_" - "memory=1073741824,system_memory=5G,trace_log_slow_query_watermark=100ms,datafile_" + "memory=2147483648,system_memory=5G,trace_log_slow_query_watermark=100ms,datafile_" "size=10G,stack_size=512K"; opts.optstr_ = optstr_.c_str(); // opts.devname_ = "eth0"; diff --git a/mittest/multi_replica/test_max_commit_ts_read_from_dup_table.cpp b/mittest/multi_replica/test_max_commit_ts_read_from_dup_table.cpp new file mode 100644 index 0000000000..eb4c3327c1 --- /dev/null +++ b/mittest/multi_replica/test_max_commit_ts_read_from_dup_table.cpp @@ -0,0 +1,432 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#define USING_LOG_PREFIX SERVER +#define protected public +#define private public + +#include "env/ob_fast_bootstrap.h" +#include "env/ob_multi_replica_util.h" +#include "lib/mysqlclient/ob_mysql_result.h" +#include "storage/tx/ob_dup_table_lease.h" +#include "storage/tx/ob_tx_loop_worker.h" +#include "storage/tx/ob_tx_replay_executor.h" + +using namespace oceanbase::transaction; +using namespace oceanbase::storage; + +#define CUR_TEST_CASE_NAME ObDupTableMaxCommitTsRead + +DEFINE_MULTI_ZONE_TEST_CASE_CLASS + +MULTI_REPLICA_TEST_MAIN_FUNCTION(test_max_commit_ts_read_from_dup_table); + +namespace oceanbase +{ +namespace transaction +{ + +static bool STOP_TX_REPLAY = false; +static bool BLOCK_DUP_TABLE_LEADER_REVOKE = false; +static bool RETURN_NULL_GTS_CACHE = false; +static sqlclient::ObISQLConnection *static_conn = nullptr; +static sqlclient::ObMySQLResult *static_result = nullptr; +static int64_t final_row_count = 0; + +int ObTxReplayExecutor::errsim_tx_replay_() +{ + int ret = OB_SUCCESS; + + if (STOP_TX_REPLAY) { + ret = OB_EAGAIN; + } + + if (OB_FAIL(ret)) { + TRANS_LOG(INFO, "[ERRSIM] errsim tx replay in test", K(ret)); + } + return ret; +} + +int ObDupTableLSHandler::errsim_leader_revoke_() +{ + int ret = OB_SUCCESS; + + while (BLOCK_DUP_TABLE_LEADER_REVOKE) { + usleep(1000 * 1000); + TRANS_LOG(INFO, "[ERRSIM] errsim wait leader revoke", K(ret)); + } + + return ret; +} + +} // namespace transaction + +namespace unittest +{ + +using namespace oceanbase::transaction; +using namespace oceanbase::storage; + +struct TableBasicArg +{ + uint64_t tenant_id_; + + int64_t dup_ls_id_num_; + int64_t dup_table_id_; + ObSEArray dup_tablet_id_array_; + + int64_t normal_ls_id_num_; + int64_t normal_table_id_; + ObSEArray normal_tablet_id_array_; + + TO_STRING_KV(K(tenant_id_), + K(dup_ls_id_num_), + K(dup_table_id_), + K(normal_ls_id_num_), + K(normal_table_id_), + K(dup_tablet_id_array_), + K(normal_tablet_id_array_)); + + OB_UNIS_VERSION(1); +}; + +OB_SERIALIZE_MEMBER(TableBasicArg, + tenant_id_, + dup_ls_id_num_, + dup_table_id_, + dup_tablet_id_array_, + normal_ls_id_num_, + normal_table_id_, + normal_tablet_id_array_); + +static TableBasicArg static_basic_arg_; + +const std::string test_dup_table_name = "test_dup_1"; +const std::string test_normal_table_name = "test_normal_1"; +const int64_t DEFAULT_LOAD_ROW_CNT = 10; + +TEST_F(GET_ZONE_TEST_CLASS_NAME(1), create_test_env) +{ + int ret = OB_SUCCESS; + + CREATE_TEST_TENANT(test_tenant_id); + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] create test tenant success", K(test_tenant_id)); + + common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + ACQUIRE_CONN_FROM_SQL_PROXY(sys_conn, get_curr_simple_server().get_sql_proxy()); + WRITE_SQL_BY_CONN(sys_conn, "alter system set _private_buffer_size = '1B';"); + std::string ls_id_str = std::to_string(1); + std::string target_ip = local_ip_ + ":" + std::to_string(rpc_ports_[1]); + std::string switch_leader_sql = "alter system switch replica leader ls=" + ls_id_str + " server='" + + target_ip + "' tenant='tt1';"; + + WRITE_SQL_BY_CONN(sys_conn, switch_leader_sql.c_str()); + + ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy); + + std::string primary_zone_sql = "ALTER TENANT " + std::string(DEFAULT_TEST_TENANT_NAME) + + " set primary_zone='zone1; zone2; zone3';"; + WRITE_SQL_BY_CONN(test_conn, primary_zone_sql.c_str()); + + unittest::TestEnvTool::create_table_for_test_env( + test_conn, test_dup_table_name.c_str(), 10, true /*is_dup_table*/, + static_basic_arg_.dup_ls_id_num_, static_basic_arg_.dup_table_id_, + static_basic_arg_.dup_tablet_id_array_); + + unittest::TestEnvTool::create_table_for_test_env( + test_conn, test_normal_table_name.c_str(), 10, false /*is_dup_table*/, + static_basic_arg_.normal_ls_id_num_, static_basic_arg_.normal_table_id_, + static_basic_arg_.normal_tablet_id_array_); + + GET_LS(test_tenant_id, static_basic_arg_.dup_ls_id_num_, ls_handle); + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] -------- before wait dup tablet discover", K(ret), + K(static_basic_arg_)); + RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->dup_table_ls_handler_.get_dup_tablet_count() + == static_basic_arg_.dup_tablet_id_array_.count(), + 20 * 1000 * 1000, 100 * 1000); + RETRY_UNTIL_TIMEOUT( + ls_handle.get_ls()->dup_table_ls_handler_.tablets_mgr_ptr_->get_readable_tablet_set_count() + >= 1, + 20 * 1000 * 1000, 100 * 1000); + RETRY_UNTIL_TIMEOUT( + ls_handle.get_ls() + ->dup_table_ls_handler_.tablets_mgr_ptr_->get_need_confirm_tablet_set_count() + == 0, + 20 * 1000 * 1000, 100 * 1000); + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] -------- after wait dup tablet discover", K(ret), + K(static_basic_arg_), + K(ls_handle.get_ls()->dup_table_ls_handler_.get_dup_tablet_count())); + ASSERT_EQ(OB_SUCCESS, ret /*has_dup_tablet*/); + + WRITE_SQL_BY_CONN(test_conn, "set autocommit = false;"); + WRITE_SQL_BY_CONN(test_conn, "begin;"); + + for (int i = 1; i <= DEFAULT_LOAD_ROW_CNT; i++) { + std::string insert_dup_sql_str = + "INSERT INTO " + test_dup_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)"; + std::string insert_normal_sql_str = + "INSERT INTO " + test_normal_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)"; + WRITE_SQL_BY_CONN(test_conn, insert_dup_sql_str.c_str()); + WRITE_SQL_BY_CONN(test_conn, insert_normal_sql_str.c_str()); + } + WRITE_SQL_BY_CONN(test_conn, "commit;"); + + static_basic_arg_.tenant_id_ = test_tenant_id; + std::string tmp_str; + ASSERT_EQ(OB_SUCCESS, EventArgSerTool::serialize_arg(static_basic_arg_, tmp_str)); + finish_event("CREATE_TEST_TABLE", tmp_str); +} + +TEST_F(GET_ZONE_TEST_CLASS_NAME(2), normal_follower_max_commit_ts_read) +{ + std::string tmp_event_val; + ASSERT_EQ(OB_SUCCESS, wait_event_finish("CREATE_TEST_TABLE", tmp_event_val, 30 * 60 * 1000)); + ASSERT_EQ(OB_SUCCESS, + EventArgSerTool::deserialize_arg(static_basic_arg_, tmp_event_val)); + + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + + std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name; + + common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy); + + READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str()); + ASSERT_EQ(OB_SUCCESS, table_info_result->next()); + + int64_t dup_table_row_count = 0; + const int64_t col_index = 0; + ASSERT_EQ(OB_SUCCESS, table_info_result->get_int(col_index, dup_table_row_count)); + ASSERT_EQ(dup_table_row_count, DEFAULT_LOAD_ROW_CNT); + + finish_event("NORMAL_FOLLOWER_LOCAL_READ", ""); +} + +TEST_F(GET_ZONE_TEST_CLASS_NAME(1), switch_follwer_forcedly_to_zone2_and_stop_replay) +{ + + int ret = OB_SUCCESS; + + std::string tmp_event_val; + ASSERT_EQ(OB_SUCCESS, + wait_event_finish("NORMAL_FOLLOWER_LOCAL_READ", tmp_event_val, 30 * 60 * 1000)); + // refresh location cache + { + std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name; + + common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy); + + READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str()); + ASSERT_EQ(OB_SUCCESS, table_info_result->next()); + } + + ASSERT_EQ(OB_SUCCESS, + wait_event_finish("PREPARE_TO_UPDATE_ON_NEW_LEADER", tmp_event_val, 30 * 60 * 1000)); + STOP_TX_REPLAY = true; + BLOCK_DUP_TABLE_LEADER_REVOKE = true; + block_msg_ = true; + + // usleep(6*1000*1000); + finish_event("STOP_ZONE1", ""); +} + +TEST_F(GET_ZONE_TEST_CLASS_NAME(2), update_on_new_leader) +{ + int ret = OB_SUCCESS; + + std::string tmp_event_val; + + finish_event("PREPARE_TO_UPDATE_ON_NEW_LEADER", ""); + ASSERT_EQ(OB_SUCCESS, wait_event_finish("STOP_ZONE1", tmp_event_val, 30 * 60 * 1000)); + + { + std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name; + + common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy); + + READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str()); + ASSERT_EQ(OB_SUCCESS, table_info_result->next()); + } + + { + GET_LS(static_basic_arg_.tenant_id_, static_basic_arg_.dup_ls_id_num_, ls_handle); + RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->dup_table_ls_handler_.is_master(), 20 * 1000 * 1000, + 100 * 1000); + ASSERT_EQ(OB_SUCCESS, ret); + RETRY_UNTIL_TIMEOUT(ls_handle.get_ls()->ls_tx_svr_.mgr_->in_leader_serving_state(), + 20 * 1000 * 1000, 100 * 1000); + ASSERT_EQ(OB_SUCCESS, ret); + } + + sleep(2); + common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2(); + { + ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy); + + WRITE_SQL_BY_CONN(test_conn, "set autocommit = false;"); + WRITE_SQL_BY_CONN(test_conn, "set ob_trx_timeout = 1000000000;") + + WRITE_SQL_BY_CONN(test_conn, "begin;"); + + int64_t tmp_tx_id = 0; + for (int i = DEFAULT_LOAD_ROW_CNT + 1; i <= DEFAULT_LOAD_ROW_CNT + 3; i++) { + std::string insert_dup_sql_str = + "INSERT INTO " + test_dup_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)"; + // std::string insert_normal_sql_str = + // "INSERT INTO " + test_normal_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)"; + WRITE_SQL_BY_CONN(test_conn, insert_dup_sql_str.c_str()); + // GET_TX_ID_FROM_SQL_AUDIT(test_conn, insert_dup_sql_str, tmp_tx_id); + // WRITE_SQL_BY_CONN(test_conn, insert_normal_sql_str.c_str()); + } + + // GET_RUNNGING_TRX_ID(test_conn, tmp_tx_id) + WRITE_SQL_BY_CONN(test_conn, "commit;"); + } + + std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name; + { + ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy); + + READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str()); + ASSERT_EQ(OB_SUCCESS, table_info_result->next()); + + int64_t dup_table_row_count = 0; + const int64_t col_index = 0; + ASSERT_EQ(OB_SUCCESS, table_info_result->get_int(col_index, dup_table_row_count)); + ASSERT_EQ(dup_table_row_count, DEFAULT_LOAD_ROW_CNT + 3); + } + + finish_event("UPDATE_ON_NEW_LEADER", ""); + // ob_abort(); +} + +void read_in_new_thread() +{ + int ret = OB_SUCCESS; + std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name + + " where id_x=" + std::to_string(DEFAULT_LOAD_ROW_CNT + 2); + int64_t tmp_tx_id = 0; + { + share::ObTenantSwitchGuard tenant_guard; + ObTsSourceInfoGuard info_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(static_basic_arg_.tenant_id_)); + MTL(ObTxLoopWorker *)->stop(); + usleep(1 * 1000 * 1000); + + // share::SCN max_commit_ts_5_min = + // share::SCN::minus(MTL(ObTransService *)->get_tx_version_mgr().get_max_commit_ts(false), + // 3 * 1000 * 1000 * 1000L); + // TRANS_LOG(INFO, "[ObMultiReplicaTestBase] print max commit ts", K(ret), + // K(MTL(ObTransService *)->get_tx_version_mgr().get_max_commit_ts(false)), + // K(max_commit_ts_5_min)); + // MTL(ObTransService *)->get_tx_version_mgr().max_commit_ts_ = max_commit_ts_5_min; + // ((ObTsMgr*)(MTL(ObTransService + // *)->ts_mgr_))->get_ts_source_info_opt_(static_basic_arg_.tenant_id_, info_guard, true, true); + // int64_t tmp_gts = 0 ; + // ASSERT_EQ(OB_SUCCESS, max_commit_ts_5_min.convert_for_gts(tmp_gts)); + // info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.gts_ = + // max_commit_ts_5_min.get_val_for_gts(); + // info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.srr_ = + // MonotonicTs::current_time(); + // info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.srr_.mts_ += + // 1*1000*1000; + + RETURN_NULL_GTS_CACHE = true; + const int64_t col_index = 0; + READ_SQL_BY_CONN(static_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str()); + if (OB_FAIL(table_info_result->next())) { + + TRANS_LOG(WARN, "[ObMultiReplicaTestBase] get next in new thread failed", K(ret), + K(final_row_count)); + final_row_count = -1; + + } else if (OB_FAIL(table_info_result->get_int(col_index, final_row_count))) { + TRANS_LOG(WARN, "[ObMultiReplicaTestBase] get count(*) in new thread failed", K(ret), + K(final_row_count)); + final_row_count = -1; + } + } + TRANS_LOG(INFO, "[ObMultiReplicaTestBase] after read in new thread", K(ret), K(final_row_count), + K(SELECT_SQL_ON_DUP_TABLE.c_str())); + GET_TX_ID_FROM_SQL_AUDIT(static_conn, SELECT_SQL_ON_DUP_TABLE, tmp_tx_id); +} + +TEST_F(GET_ZONE_TEST_CLASS_NAME(1), read_from_old_leader_zone1) +{ + int ret = OB_SUCCESS; + std::string tmp_event_val; + ASSERT_EQ(OB_SUCCESS, + wait_event_finish("UPDATE_ON_NEW_LEADER", tmp_event_val, 30 * 60 * 1000, 5 * 1000)); + block_msg_ = false; + common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + // refresh location cache + { + std::string SELECT_SQL_ON_DUP_TABLE = "select count(*) from " + test_dup_table_name; + + common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy); + + READ_SQL_BY_CONN(test_conn, table_info_result, SELECT_SQL_ON_DUP_TABLE.c_str()); + ASSERT_EQ(OB_SUCCESS, table_info_result->next()); + } + + TRANS_LOG(INFO, "[ObMultiReplicaTestBase] 3 - Stop blocking msg during the read operation", + K(ret), K(final_row_count)); + ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy); + static_conn = test_conn; + WRITE_SQL_BY_CONN(test_conn, "set ob_query_timeout=20000000;"); + + TRANS_LOG(INFO, "[ObMultiReplicaTestBase] 2 - Stop blocking msg during the read operation", + K(ret), K(final_row_count)); + std::thread read_thread(read_in_new_thread); + std::thread::id tid = read_thread.get_id(); + uint64_t read_thread_id = *(uint64_t *)(&(tid)); + TRANS_LOG(INFO, "[ObMultiReplicaTestBase] 1 - Stop blocking msg during the read operation", + K(ret), K(read_thread_id), K(final_row_count)); + + share::ObTenantSwitchGuard tenant_guard; + ObTsSourceInfoGuard info_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(static_basic_arg_.tenant_id_)); + int64_t start_ts = ObTimeUtility::fast_current_time(); + while (ObTimeUtility::fast_current_time() - start_ts >= 3 * 1000 * 1000) { + share::SCN max_commit_ts_5_min = + share::SCN::minus(MTL(ObTransService *)->get_tx_version_mgr().get_max_commit_ts(false), + 3 * 1000 * 1000 * 1000L); + MTL(ObTransService *)->get_tx_version_mgr().max_commit_ts_ = max_commit_ts_5_min; + info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.gts_ = + max_commit_ts_5_min.get_val_for_gts(); + usleep(3 * 1000); + } + + TRANS_LOG(INFO, "[ObMultiReplicaTestBase] Stop blocking msg during the read operation", K(ret), + K(read_thread_id), K(final_row_count)); + + read_thread.join(); + + ASSERT_EQ(final_row_count, 1); + + // usleep(1000 * 1000 * 1000); +} + +} // namespace unittest + +} // namespace oceanbase diff --git a/src/observer/omt/ob_worker_processor.cpp b/src/observer/omt/ob_worker_processor.cpp index 8ebcb6a606..71b187cfab 100644 --- a/src/observer/omt/ob_worker_processor.cpp +++ b/src/observer/omt/ob_worker_processor.cpp @@ -59,8 +59,12 @@ OB_NOINLINE int ObWorkerProcessor::process_err_test() #ifdef ERRSIM ret = EN_WORKER_PROCESS_REQUEST; - LOG_DEBUG("process err_test", K(ret)); #endif + + if(OB_FAIL(ret)) + { + LOG_WARN("process err_test", K(ret)); + } return ret; } diff --git a/src/storage/tx/ob_dup_table_tablets.cpp b/src/storage/tx/ob_dup_table_tablets.cpp index 4bf863c7e7..9cc30d6978 100644 --- a/src/storage/tx/ob_dup_table_tablets.cpp +++ b/src/storage/tx/ob_dup_table_tablets.cpp @@ -1114,8 +1114,10 @@ OB_NOINLINE int ObLSDupTabletsMgr::process_prepare_ser_err_test_() ret = EN_DUP_TABLE_LOG_PREPARE_SERIALIZE; #endif + if(OB_FAIL(ret)) + { DUP_TABLE_LOG(INFO, "errsim prepare serialize err test", K(ret),K(ls_id_)); - + } return ret; } diff --git a/src/storage/tx/ob_dup_table_util.cpp b/src/storage/tx/ob_dup_table_util.cpp index 93aa8f1b56..0bd871e1a4 100644 --- a/src/storage/tx/ob_dup_table_util.cpp +++ b/src/storage/tx/ob_dup_table_util.cpp @@ -1048,8 +1048,8 @@ bool ObDupTableLSHandler::is_dup_table_lease_valid() if (!is_inited() || OB_ISNULL(lease_mgr_ptr_)) { is_dup_lease_ls = false; } else if (ls_state_helper_.is_leader()) { - is_dup_lease_ls = true; - DUP_TABLE_LOG(INFO, "the lease is always valid for a dup ls leader", K(is_dup_lease_ls), + is_dup_lease_ls = false; + DUP_TABLE_LOG(INFO, "None valid lease on dup ls leader", K(is_dup_lease_ls), KPC(this)); } else { is_dup_lease_ls = lease_mgr_ptr_->is_follower_lease_valid(); @@ -1373,6 +1373,16 @@ int ObDupTableLSHandler::switch_to_leader() return ret; } +OB_NOINLINE int ObDupTableLSHandler::errsim_leader_revoke_() +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(ret)) { + DUP_TABLE_LOG(WARN, "errsim leader revoke", K(ret), K(ls_id_)); + } + return ret; +} + int ObDupTableLSHandler::leader_revoke_(const bool is_forcedly) { int ret = OB_SUCCESS; @@ -1380,12 +1390,19 @@ int ObDupTableLSHandler::leader_revoke_(const bool is_forcedly) bool is_logging = false; + if (OB_SUCC(ret)) { + if (OB_FAIL(errsim_leader_revoke_())) { + DUP_TABLE_LOG(WARN, "errsim for dup table leader revoke", K(ret), K(ls_id_), K(is_forcedly)); + } + } + if (is_inited_) { if (OB_NOT_NULL(log_operator_)) { log_operator_->rlock_for_log(); is_logging = log_operator_->check_is_busy_without_lock(); } - if (OB_NOT_NULL(tablets_mgr_ptr_) && OB_TMP_FAIL(tablets_mgr_ptr_->leader_revoke(is_logging))) { + if (OB_SUCC(ret) && OB_NOT_NULL(tablets_mgr_ptr_)) + if(OB_TMP_FAIL(tablets_mgr_ptr_->leader_revoke(is_logging))) { DUP_TABLE_LOG(WARN, "tablets_mgr switch to follower failed", K(ret), K(tmp_ret), KPC(this)); if (!is_forcedly) { ret = tmp_ret; diff --git a/src/storage/tx/ob_dup_table_util.h b/src/storage/tx/ob_dup_table_util.h index 30e478f019..3577cdd2f9 100644 --- a/src/storage/tx/ob_dup_table_util.h +++ b/src/storage/tx/ob_dup_table_util.h @@ -237,6 +237,8 @@ private: int recover_ckpt_into_memory_(); + int errsim_leader_revoke_(); + private: share::ObLSID ls_id_; diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 0d4b20e591..3b83a42d8f 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1051,6 +1051,14 @@ int ObTransService::get_read_store_ctx(const ObTxReadSnapshot &snapshot, } } +if (OB_SUCC(ret)) { + if (snapshot.snapshot_ls_role_ == common::ObRole::FOLLOWER + && snapshot.snapshot_acquire_addr_ != GCTX.self_addr()) { + TRANS_LOG(INFO, "get read store_ctx by a follower's max_commit_ts", K(ret), K(snapshot), + K(ls_id), K(store_ctx)); + } +} + // setup tx_table_guard ObTxTableGuard tx_table_guard; if (OB_SUCC(ret) && -- GitLab