提交 7331175f 编写于 作者: Y YoungYang0820 提交者: LINGuanRen

use source data size for copy index sstable

上级 495b296f
......@@ -7762,7 +7762,7 @@ int ObRootService::observer_copy_local_index_sstable(const obrpc::ObServerCopyLo
dest_replica.member_ =
ObReplicaMember(r->server_, ObTimeUtility::current_time(), r->replica_type_, r->get_memstore_percent());
dest_replica.unit_id_ = r->unit_id_;
data_size = r->data_size_;
data_size = arg.data_size_ != 0 ? arg.data_size_ : r->data_size_;
} else if (r->server_ != data_src) {
// by pass
} else if (OB_FAIL(server_manager_.check_server_alive(r->server_, alive))) {
......
......@@ -2642,7 +2642,7 @@ bool ObCopySSTableBatchArg::is_valid() const
return is_valid;
}
OB_SERIALIZE_MEMBER(ObServerCopyLocalIndexSSTableArg, data_src_, dst_, pkey_, index_table_id_, cluster_id_);
OB_SERIALIZE_MEMBER(ObServerCopyLocalIndexSSTableArg, data_src_, dst_, pkey_, index_table_id_, cluster_id_, data_size_);
bool ObServerCopyLocalIndexSSTableArg::is_valid() const
{
......
......@@ -3485,12 +3485,12 @@ struct ObServerCopyLocalIndexSSTableArg {
public:
ObServerCopyLocalIndexSSTableArg()
: data_src_(), dst_(), pkey_(), index_table_id_(common::OB_INVALID_ID), cluster_id_(common::OB_INVALID_ID)
: data_src_(), dst_(), pkey_(), index_table_id_(common::OB_INVALID_ID), cluster_id_(common::OB_INVALID_ID), data_size_(0)
{}
public:
bool is_valid() const;
TO_STRING_KV(K_(data_src), K_(dst), K_(pkey), K_(index_table_id), K_(cluster_id));
TO_STRING_KV(K_(data_src), K_(dst), K_(pkey), K_(index_table_id), K_(cluster_id), K_(data_size));
public:
common::ObAddr data_src_;
......@@ -3498,6 +3498,7 @@ public:
common::ObPartitionKey pkey_;
uint64_t index_table_id_;
int64_t cluster_id_;
int64_t data_size_;
};
struct ObBackupBatchArg {
......
......@@ -994,6 +994,8 @@ int ObBuildIndexScheduleTask::send_copy_replica_rpc()
// if the source and destination are the same, it means that this replica builds the index sstable itself,
// just retry the scheduling process will get the right way to next state
ret = OB_EAGAIN;
} else if (OB_FAIL(get_data_size(arg.data_size_))) {
STORAGE_LOG(WARN, "fail to get data size", K(ret));
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(
extract_tenant_id(index_id_), schema_guard))) {
STORAGE_LOG(WARN, "fail to get schema guard", K(ret), K(schema_version_));
......@@ -1392,6 +1394,49 @@ int ObBuildIndexScheduleTask::schedule_dag()
return ret;
}
int ObBuildIndexScheduleTask::get_data_size(int64_t &data_size)
{
int ret = OB_SUCCESS;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res)
{
sqlclient::ObMySQLResult *result = NULL;
char ip[common::OB_MAX_SERVER_ADDR_SIZE] = "";
if (OB_INVALID_ID == index_id_ || !pkey_.is_valid() || !candidate_replica_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(index_id_), K(pkey_), K(candidate_replica_));
} else if (!candidate_replica_.ip_to_string(ip, sizeof(ip))) {
LOG_WARN("fail to convert ObAddr to ip", K(ret));
} else if (OB_FAIL(sql.assign_fmt(
"SELECT used_size, MAX(major_version) from %s "
"WHERE tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND sstable_id = %ld "
"AND svr_ip = '%s' AND svr_port = %d",
OB_ALL_VIRTUAL_STORAGE_STAT_TNAME,
extract_tenant_id(index_id_),
pkey_.get_table_id(),
pkey_.get_partition_id(),
index_id_,
ip,
candidate_replica_.get_port()))) {
STORAGE_LOG(WARN, "fail to assign sql", K(ret));
} else if (OB_FAIL(GCTX.sql_proxy_->read(res, sql.ptr()))) {
LOG_WARN("fail to execute sql", K(ret), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "error unexpected, query result must not be NULL", K(ret));
} else if (OB_FAIL(result->next())) {
if (OB_LIKELY(OB_ITER_END == ret)) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to get next row", K(ret));
}
} else {
EXTRACT_INT_FIELD_MYSQL(*result, "used_size", data_size, int64_t);
}
}
return ret;
}
int ObBuildIndexScheduleTask::process()
{
int ret = OB_SUCCESS;
......
......@@ -139,6 +139,7 @@ private:
int get_candidate_source_replica(const bool need_refresh = false);
int unique_index_checking(const bool is_leader);
int rollback_state(const int state);
int get_data_size(int64_t &data_size);
private:
static const int64_t COPY_BUILD_INDEX_DATA_TIMEOUT = 10 * 1000 * 1000LL; // 10s
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册