diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index c6259c185fb5faa0f3b23de1f7d2e28c9f47ed9a..8c278640a37792b886e6a1a583b710051d596237 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -7538,52 +7538,14 @@ int ObRootService::generate_stop_server_log_in_sync_dest_server_array( return ret; } -int ObRootService::generate_log_in_sync_dest_server_buff(common::ObIAllocator& allocator, - const common::ObIArray& dest_server_array, common::ObString& dest_server_buff) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else if (OB_UNLIKELY(dest_server_array.count() <= 0)) { - ret = OB_SERVER_NOT_ALIVE; // dest_server_array is empty, although it can be processed, but still throw error - LOG_WARN("all server not alive", K(ret)); - } else { - char* buf = NULL; - const int64_t buf_len = dest_server_array.count() * (MAX_IP_ADDR_LENGTH + 2) + 1; - if (OB_UNLIKELY(nullptr == (buf = static_cast(allocator.alloc(buf_len))))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc buf", K(ret), K(buf)); - } else { - int64_t pos = 0; - bool first = true; - MEMSET(static_cast(buf), 0, buf_len); - for (int64_t index = 0; OB_SUCC(ret) && index < dest_server_array.count(); ++index) { - const common::ObAddr& server = dest_server_array.at(index); - if (OB_FAIL(databuff_printf(buf, buf_len, pos, "%s", first ? "" : ","))) { - LOG_WARN("fail to do databuff printf", K(ret)); - } else { - first = false; - pos += server.to_string(buf + pos, buf_len - pos); - } - } - if (OB_SUCC(ret)) { - dest_server_buff.assign_ptr(buf, pos); - } - } - } - return ret; -} - int ObRootService::check_is_log_sync(bool& is_log_sync, const common::ObIArray& stop_server) { int ret = OB_SUCCESS; - ObArenaAllocator alloc; is_log_sync = true; common::ObZone zone; /*empty zone*/ common::ObArray alive_server_array; common::ObArray dest_server_array; - common::ObString dest_server_buff; + ObSqlString sql; if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); @@ -7596,10 +7558,9 @@ int ObRootService::check_is_log_sync(bool& is_log_sync, const common::ObIArrayget_schema_guard(schema_guard))) { LOG_WARN("fail to get schema guard", K(ret)); - } else if (OB_FAIL(sql.assign_fmt("SELECT table_id, partition_idx FROM %s WHERE is_in_sync = 0 and " - "is_offline = 0 and replica_type != 16 " - "and concat(svr_ip, \":\", svr_port) in (%s)", - OB_ALL_VIRTUAL_CLOG_STAT_TNAME, - dest_server_buff.ptr()))) { - LOG_WARN("assign_fmt failed", K(ret)); } else if (OB_FAIL(sql_proxy_.read(res, sql.ptr()))) { LOG_WARN("execute sql failed", K(ret), K(sql)); } else if (OB_ISNULL(result = res.get_result())) { @@ -7836,6 +7791,48 @@ int ObRootService::admin_switch_replica_role(const obrpc::ObAdminSwitchReplicaRo return ret; } +int ObRootService::generate_log_in_sync_sql( + const common::ObIArray &dest_server_array, common::ObSqlString &sql) +{ + int ret = OB_SUCCESS; + sql.reset(); + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_UNLIKELY(dest_server_array.count() <= 0)) { + ret = OB_SERVER_NOT_ALIVE; // all server not alive + LOG_WARN("all server not alive", KR(ret)); + } else if (OB_FAIL(sql.assign_fmt("SELECT table_id, partition_idx FROM %s WHERE is_in_sync = 0 and " + "is_offline = 0 and replica_type != 16 " + "and (svr_ip, svr_port) in (", + OB_ALL_VIRTUAL_CLOG_STAT_TNAME))) { + LOG_WARN("assign_fmt failed", KR(ret), K(sql)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < dest_server_array.count(); ++i) { + const common::ObAddr &addr = dest_server_array.at(i); + char ip_str[MAX_IP_PORT_LENGTH] = ""; + if (0 != i) { + if (OB_FAIL(sql.append(","))) { + LOG_WARN("failed to append fmt sql", KR(ret), K(i), K(sql)); + } + } + if (OB_FAIL(ret)) { + } else if (!addr.ip_to_string(ip_str, MAX_IP_PORT_LENGTH)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to ip to string", KR(ret), K(ip_str), K(addr)); + } else if (OB_FAIL(sql.append_fmt("(\"%s\", %d)", ip_str, addr.get_port()))) { + LOG_WARN("failed to append fmt sql", KR(ret), K(i), K(ip_str), K(addr), K(sql)); + } + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(sql.append(")"))) { + LOG_WARN("failed to append fmt sql", KR(ret), K(sql)); + } + return ret; +} + + int ObRootService::admin_switch_rs_role(const obrpc::ObAdminSwitchRSRoleArg& arg) { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_root_service.h b/src/rootserver/ob_root_service.h index 14e2c75cd816d2e50552c5a2ab8abfda5b05c20a..250387dd0b265e653ee9f5bad8bafebc2cbc4ec4 100644 --- a/src/rootserver/ob_root_service.h +++ b/src/rootserver/ob_root_service.h @@ -1166,16 +1166,15 @@ public: int logical_restore_partitions(const obrpc::ObRestorePartitionsArg& arg); int check_is_log_sync(bool& is_log_sync, const common::ObIArray& stop_server); + int generate_log_in_sync_sql(const common::ObIArray &dest_server_array, common::ObSqlString &sql); int generate_stop_server_log_in_sync_dest_server_array(const common::ObIArray& alive_server_array, const common::ObIArray& excluded_server_array, common::ObIArray& dest_server_array); - int generate_log_in_sync_dest_server_buff(common::ObIAllocator& allocator, - const common::ObIArray& dest_server_array, common::ObString& dest_server_buff); int log_nop_operation(const obrpc::ObDDLNopOpreatorArg& arg); int broadcast_schema(const obrpc::ObBroadcastSchemaArg& arg); int check_other_rs_exist(bool& is_other_rs_exist); ObRsGtsManager& get_rs_gts_manager() - { + { return rs_gts_manager_; } ObFreezeInfoManager& get_freeze_manager()