diff --git a/src/sql/engine/px/ob_px_tenant_target_monitor.cpp b/src/sql/engine/px/ob_px_tenant_target_monitor.cpp index f3c3a790ca400d53d284905388f1a05432d5f84f..62dbfadf9d5003565b19ee886a0b1df4ae0b64d6 100644 --- a/src/sql/engine/px/ob_px_tenant_target_monitor.cpp +++ b/src/sql/engine/px/ob_px_tenant_target_monitor.cpp @@ -319,9 +319,16 @@ int ObPxTenantTargetMonitor::update_peer_target_used(const ObAddr &server, int64 if (ret != OB_HASH_NOT_EXIST) { LOG_WARN("get refactored failed", K(ret)); } else { - target_usage.set_peer_used(peer_used); - if (OB_FAIL(global_target_usage_.set_refactored(server, target_usage))) { - LOG_WARN("set refactored failed", K(ret)); + ObLockGuard lock_guard(spin_lock_); + if (OB_FAIL(global_target_usage_.get_refactored(server, target_usage))) { + if (ret != OB_HASH_NOT_EXIST) { + LOG_WARN("get refactored failed", K(ret)); + } else { + target_usage.set_peer_used(peer_used); + if (OB_FAIL(global_target_usage_.set_refactored(server, target_usage))) { + LOG_WARN("set refactored failed", K(ret)); + } + } } } } else { @@ -361,6 +368,7 @@ int ObPxTenantTargetMonitor::get_global_target_usage(const hash::ObHashMap lock_guard(spin_lock_); global_target_usage_.clear(); if (OB_FAIL(global_target_usage_.set_refactored(server_, ServerTargetUsage()))) { LOG_WARN("set refactored failed", K(ret)); diff --git a/src/sql/optimizer/ob_px_resource_analyzer.cpp b/src/sql/optimizer/ob_px_resource_analyzer.cpp index a3390278a1553aa98295bfdaa7362738245db8a8..4c5412bd53654edefead42d18b34a248e7594d4c 100644 --- a/src/sql/optimizer/ob_px_resource_analyzer.cpp +++ b/src/sql/optimizer/ob_px_resource_analyzer.cpp @@ -593,7 +593,10 @@ int ObPxResourceAnalyzer::schedule_dfo( const int64_t group = 1; groups += group; ObHashSet &addr_set = dfo.location_addr_; - if (OB_FAIL(update_parallel_map(current_thread_map, addr_set, dfo.get_dop()))) { + // we assume that should allocate same thread count for each sqc in the dfo. + // this may not true. but we can't decide the real count for each sqc. just let it be for now + const int64_t dop_per_addr = 0 == addr_set.size() ? dfo.get_dop() : (dfo.get_dop() + addr_set.size() - 1) / addr_set.size(); + if (OB_FAIL(update_parallel_map(current_thread_map, addr_set, dop_per_addr))) { LOG_WARN("increase current thread map failed", K(ret)); } else if (OB_FAIL(update_parallel_map(current_group_map, addr_set, group))) { LOG_WARN("increase current group map failed", K(ret)); @@ -616,7 +619,8 @@ int ObPxResourceAnalyzer::finish_dfo( const int64_t group = 1; groups -= group; ObHashSet &addr_set = dfo.location_addr_; - if (OB_FAIL(update_parallel_map(current_thread_map, addr_set, -dfo.get_dop()))) { + const int64_t dop_per_addr = 0 == addr_set.size() ? dfo.get_dop() : (dfo.get_dop() + addr_set.size() - 1) / addr_set.size(); + if (OB_FAIL(update_parallel_map(current_thread_map, addr_set, -dop_per_addr))) { LOG_WARN("decrease current thread map failed", K(ret)); } else if (OB_FAIL(update_parallel_map(current_group_map, addr_set, -group))) { LOG_WARN("decrease current group map failed", K(ret));