提交 96974662 编写于 作者: O obdev 提交者: ob-robot

rollback persistent unit

上级 01644a39
......@@ -2542,6 +2542,7 @@ int ObRootService::alter_resource_tenant(const obrpc::ObAlterResourceTenantArg &
const common::ObIArray<uint64_t> &delete_unit_group_id_array = arg.unit_group_id_array_;
share::schema::ObSchemaGetterGuard schema_guard;
uint64_t target_tenant_id = OB_INVALID_ID;
int tmp_ret = OB_SUCCESS;
if (OB_FAIL(schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), "tenant_id", OB_SYS_TENANT_ID);
......@@ -2554,9 +2555,13 @@ int ObRootService::alter_resource_tenant(const obrpc::ObAlterResourceTenantArg &
target_tenant_id, new_unit_num, delete_unit_group_id_array))) {
LOG_WARN("fail to alter resource tenant", KR(ret), K(target_tenant_id),
K(new_unit_num), K(delete_unit_group_id_array));
if (OB_TMP_FAIL(submit_reload_unit_manager_task())) {
LOG_ERROR("fail to reload unit_mgr, please try 'alter system reload unit'", KR(ret), KR(tmp_ret));
}
}
LOG_INFO("finish alter_resource_tenant", KR(ret), K(arg));
}
ROOTSERVICE_EVENT_ADD("root_service", "alter_resource_tenant", K(ret), K(arg));
return ret;
}
......@@ -2678,12 +2683,16 @@ int ObRootService::create_tenant(const ObCreateTenantArg &arg, UInt64 &tenant_id
{
LOG_INFO("receive create tenant arg", K(arg), "timeout_ts", THIS_WORKER.get_timeout_ts());
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_FAIL(ddl_service_.create_tenant(arg, tenant_id))) {
LOG_WARN("fail to create tenant", KR(ret), K(arg));
}
if (OB_TMP_FAIL(submit_reload_unit_manager_task())) {
LOG_ERROR("fail to reload unit_mgr, please try 'alter system reload unit'", KR(ret), KR(tmp_ret));
}
} else {}
LOG_INFO("finish create tenant", KR(ret), K(tenant_id), K(arg), "timeout_ts", THIS_WORKER.get_timeout_ts());
return ret;
}
......
......@@ -5205,6 +5205,48 @@ int ObUnitManager::try_notify_tenant_server_unit_resource(
return ret;
}
int ObUnitManager::rollback_persistent_units(
const common::ObArray<share::ObUnit> &units,
const share::ObResourcePool &pool,
const lib::Worker::CompatMode compat_mode,
const bool if_not_grant,
const bool skip_offline_server,
ObNotifyTenantServerResourceProxy &notify_proxy)
{
int ret = OB_SUCCESS;
bool is_delete = true;
int tmp_ret = OB_SUCCESS;
ObArray<int> return_ret_array;
notify_proxy.reuse();
for (int64_t i = 0; i < units.count(); i++) {
const ObUnit & unit = units.at(i);
if (OB_TMP_FAIL(try_notify_tenant_server_unit_resource(
pool.tenant_id_, is_delete, notify_proxy,
pool, compat_mode, unit, if_not_grant, skip_offline_server))) {
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("fail to try notify server unit resource", KR(ret), KR(tmp_ret),
K(pool), K(compat_mode), K(unit), K(if_not_grant), K(skip_offline_server));
}
}
if (OB_TMP_FAIL(notify_proxy.wait_all(return_ret_array))) {
LOG_WARN("fail to wait notify resource", KR(ret), K(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
if (OB_SUCC(ret)) {
for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) {
const int ret_i = return_ret_array.at(i);
// if (OB_SUCCESS != ret_i && OB_TENANT_NOT_IN_SERVER != ret_i) {
if (OB_SUCCESS != ret_i && OB_TENANT_NOT_IN_SERVER != ret_i) {
ret = ret_i;
LOG_WARN("fail to mark tenant removed", KR(ret), KR(ret_i),
K(notify_proxy.get_dests().at(i)));
}
}
}
LOG_WARN("rollback persistent unit", KR(ret), K(pool), K(units));
return ret;
}
int ObUnitManager::get_tenant_unit_servers(
const uint64_t tenant_id,
const common::ObZone &zone,
......@@ -5246,7 +5288,7 @@ int ObUnitManager::get_tenant_unit_servers(
}
return ret;
}
ERRSIM_POINT_DEF(ERRSIM_UNIT_PERSISTENCE_ERROR);
// allocate unit on target zones for specified resource pool
//
......@@ -5307,6 +5349,7 @@ int ObUnitManager::allocate_pool_units_(
&obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource);
ObArray<ObAddr> excluded_servers;
ObArray<ObUnit> units;
for (int64_t i = 0; OB_SUCC(ret) && i < zones.count(); ++i) { // for each zone
const ObZone &zone = zones.at(i);
excluded_servers.reuse();
......@@ -5357,15 +5400,33 @@ int ObUnitManager::allocate_pool_units_(
LOG_WARN("add_unit failed", K(unit), K(ret));
} else if (OB_FAIL(new_servers.push_back(server))) {
LOG_WARN("push_back failed", K(ret));
} else if (OB_FAIL(units.push_back(unit))) {
LOG_WARN("fail to push an element into units", KR(ret), K(unit));
}
}
}
}
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) {
if (OB_TMP_FAIL(notify_proxy.wait())) {
LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret));
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
if (is_valid_tenant_id(pool.tenant_id_)) {
ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret;
}
if (OB_FAIL(ret)) {
LOG_WARN("start to rollback unit persistence", KR(ret), K(units), K(pool));
if(OB_TMP_FAIL(rollback_persistent_units(
units,
pool,
compat_mode,
false/*if not grant*/,
false/*skip offline server*/,
notify_proxy))) {
LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret), K(units),
K(pool), K(compat_mode));
}
}
}
return ret;
}
......@@ -8301,9 +8362,13 @@ int ObUnitManager::change_pool_owner(
*srv_rpc_proxy_,
&obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource);
share::ObResourcePool new_pool;
ObArray<share::ObResourcePool> pools;
ObArray<ObArray<ObUnit>> all_pool_units;
ObArray<ObUnit> pool_units;
for (int64_t i = 0; OB_SUCC(ret) && i < pool_names.count(); ++i) {
share::ObResourcePool *pool = NULL;
ObArray<ObUnit *> *units = nullptr;
pool_units.reset();
common::ObArray<ObUnit *> zone_sorted_unit_array;
if (OB_FAIL(inner_get_resource_pool_by_name(pool_names.at(i), pool))) {
LOG_WARN("get resource pool by name failed", "pool_name", pool_names.at(i), K(ret));
......@@ -8362,15 +8427,39 @@ int ObUnitManager::change_pool_owner(
// shall never be here
} else if (OB_FAIL(ut_operator_.update_unit(client, new_unit))) {
LOG_WARN("fail to update unit", KR(ret));
} else if (OB_FAIL(pool_units.push_back(*unit))) {
LOG_WARN("fail to push an element into pool_units", KR(ret));
}
}
}
if (FAILEDx(all_pool_units.push_back(pool_units))) {
LOG_WARN("fail to push an element into all_pool_units", KR(ret));
} else if (OB_FAIL(pools.push_back(new_pool))) {
LOG_WARN("fail to push an element into pools", KR(ret));
}
}
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) {
LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret));
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
if (grant) {
ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret;
}
if (OB_FAIL(ret) && grant && pools.count() == all_pool_units.count()) {
LOG_WARN("start to rollback unit persistence", KR(ret), K(pools), K(tenant_id));
for (int64_t i = 0; i < pools.count(); ++i) {
if (OB_TMP_FAIL(rollback_persistent_units(all_pool_units.at(i),
pools.at(i),
compat_mode,
if_not_grant,
skip_offline_server,
notify_proxy))) {
LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret), K(all_pool_units.at(i)),
K(pools.at(i)), K(compat_mode), K(if_not_grant), K(skip_offline_server));
}
}
}
}
return ret;
}
......@@ -8946,6 +9035,10 @@ int ObUnitManager::migrate_unit(const uint64_t unit_id, const ObAddr &dst, const
LOG_WARN("unit->server same as migrate destination server",
"unit", *unit, K(dst), K(ret));
} else {
ObNotifyTenantServerResourceProxy notify_proxy(
*srv_rpc_proxy_,
&obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource);
ObUnit new_unit = *unit;
src = unit->server_;
if (unit->migrate_from_server_.is_valid()) {
ret = OB_NOT_SUPPORTED;
......@@ -8955,7 +9048,6 @@ int ObUnitManager::migrate_unit(const uint64_t unit_id, const ObAddr &dst, const
if (OB_SUCC(ret)) {
common::ObMySQLTransaction trans;
ObUnit new_unit = *unit;
new_unit.zone_ = zone;
if (granted) {
new_unit.migrate_from_server_ = unit->server_;
......@@ -8964,9 +9056,6 @@ int ObUnitManager::migrate_unit(const uint64_t unit_id, const ObAddr &dst, const
new_unit.is_manual_migrate_ = is_manual;
const bool is_delete = false; // is_delete is false when migrate unit
int tmp_ret = OB_SUCCESS;
ObNotifyTenantServerResourceProxy notify_proxy(
*srv_rpc_proxy_,
&obrpc::ObSrvRpcProxy::notify_tenant_server_unit_resource);
if (OB_FAIL(try_notify_tenant_server_unit_resource(
pool->tenant_id_, is_delete, notify_proxy,
*pool, compat_mode, new_unit, false/*if not grant*/,
......@@ -8978,6 +9067,24 @@ int ObUnitManager::migrate_unit(const uint64_t unit_id, const ObAddr &dst, const
LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret));
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret;
if (OB_FAIL(ret)) {
LOG_WARN("start to rollback unit persistence", KR(ret), K(new_unit), K(pool->tenant_id_));
int tmp_ret = OB_SUCCESS;
ObArray<ObUnit> units;
if (OB_TMP_FAIL(units.push_back(new_unit))) {
LOG_WARN("fail to push an element into units", KR(ret), KR(tmp_ret), KPC(unit));
} else if (OB_TMP_FAIL(rollback_persistent_units(
units,
*pool,
compat_mode,
false/*if not grant*/,
false/*skip offline server*/,
notify_proxy))) {
LOG_WARN("fail to rollback unit persistence", KR(ret), KR(tmp_ret),
K(units), KPC(pool), K(compat_mode));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(trans.start(proxy_, OB_SYS_TENANT_ID))) {
......
......@@ -1012,6 +1012,13 @@ protected:
const share::ObUnit &unit,
const bool if_not_grant,
const bool skip_offline_server);
int rollback_persistent_units(
const common::ObArray<share::ObUnit> &units,
const share::ObResourcePool &pool,
const lib::Worker::CompatMode compat_mode,
const bool if_not_grant,
const bool skip_offline_server,
ObNotifyTenantServerResourceProxy &notify_proxy);
int sum_servers_resources(ObUnitPlacementStrategy::ObServerResource &server_resource,
const share::ObUnitConfig &unit_config);
protected:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册