diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index d78ba075c6dad9fd975e72f72352de7a485bb266..8d05c60cb6132a22382f674cc3f3c764617b0d6b 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -91,6 +91,7 @@ int ObPXServerAddrUtil::alloc_by_data_distribution_inner(ObExecContext& ctx, ObD } } else { const ObPhyTableLocation* table_loc = NULL; + ObPhyTableLocationGuard full_table_loc; uint64_t table_location_key = OB_INVALID_INDEX; uint64_t ref_table_id = OB_INVALID_ID; if (scan_ops.count() > 0) { @@ -106,8 +107,10 @@ int ObPXServerAddrUtil::alloc_by_data_distribution_inner(ObExecContext& ctx, ObD if (dml_op && dml_op->is_table_location_uncertain()) { bool is_weak = false; if (OB_FAIL(ObTaskExecutorCtxUtil::get_full_table_phy_table_location( - ctx, table_location_key, ref_table_id, is_weak, table_loc))) { + ctx, table_location_key, ref_table_id, is_weak, full_table_loc))) { LOG_WARN("fail to get phy table location", K(ret)); + } else { + table_loc = full_table_loc.get_loc(); } } else { if (OB_FAIL(ObTaskExecutorCtxUtil::get_phy_table_location(ctx, table_location_key, ref_table_id, table_loc))) { @@ -504,6 +507,7 @@ int ObPXServerAddrUtil::set_dfo_accessed_location( // pass } else { const ObPhyTableLocation* table_loc = nullptr; + ObPhyTableLocationGuard full_table_loc; uint64_t table_location_key = common::OB_INVALID_ID; uint64_t ref_table_id = common::OB_INVALID_ID; if (FALSE_IT(table_location_key = dml_op->get_table_id())) { @@ -512,8 +516,10 @@ int ObPXServerAddrUtil::set_dfo_accessed_location( if (dml_op->is_table_location_uncertain()) { bool is_weak = false; if (OB_FAIL(ObTaskExecutorCtxUtil::get_full_table_phy_table_location( - ctx, table_location_key, ref_table_id, is_weak, table_loc))) { + ctx, table_location_key, ref_table_id, is_weak, full_table_loc))) { LOG_WARN("fail to get phy table location", K(ret)); + } else { + table_loc = full_table_loc.get_loc(); } } else { if (OB_FAIL(ObTaskExecutorCtxUtil::get_phy_table_location(ctx, table_location_key, ref_table_id, table_loc))) { diff --git a/src/sql/executor/ob_task_executor_ctx.cpp b/src/sql/executor/ob_task_executor_ctx.cpp index ad07c58dba9f655ea1e30f3426d2fbe03deae102..dec171dad50c5487d527d7bc658cad4148902e8b 100644 --- a/src/sql/executor/ob_task_executor_ctx.cpp +++ b/src/sql/executor/ob_task_executor_ctx.cpp @@ -335,8 +335,9 @@ int ObTaskExecutorCtxUtil::get_part_runner_server( return ret; } -int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext& ctx, uint64_t table_location_key, - uint64_t ref_table_id, bool is_weak, const ObPhyTableLocation*& table_location) +// 每次调用都会 allocate 一个 table_location +int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext &ctx, uint64_t table_location_key, + uint64_t ref_table_id, bool is_weak, ObPhyTableLocationGuard &table_location) { int ret = OB_SUCCESS; ObPhyTableLocationInfo phy_location_info; @@ -348,9 +349,6 @@ int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext& ctx, const ObTableSchema* table_schema = NULL; const uint64_t tenant_id = extract_tenant_id(ref_table_id); // ObPhysicalPlanCtx *plan_ctx = ctx.get_physical_plan_ctx(); - ObPhyTableLocation* loc = nullptr; - table_location = NULL; - if (OB_ISNULL(location_cache) || OB_ISNULL(schema_service)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("location cache or schema_service is null", KP(location_cache), KP(schema_service), K(ret)); @@ -380,18 +378,13 @@ int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext& ctx, if (OB_FAIL(ret)) { // bypass - } else if (NULL == (loc = static_cast(ctx.get_allocator().alloc(sizeof(ObPhyTableLocation))))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - } else if (NULL == (loc = new (loc) ObPhyTableLocation())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail new object", K(ret)); } else if (OB_FAIL(ObTableLocation::get_phy_table_location_info( ctx, table_location_key, ref_table_id, is_weak, part_ids, *location_cache, phy_location_info))) { LOG_WARN("get phy table location info failed", K(ret)); - } else if (OB_FAIL(loc->add_partition_locations(phy_location_info))) { + } else if (OB_FAIL(table_location.new_location(ctx.get_allocator()))) { + LOG_WARN("fail alloc new location", K(ret)); + } else if (OB_FAIL(table_location.get_loc()->add_partition_locations(phy_location_info))) { LOG_WARN("add partition locations failed", K(ret), K(phy_location_info)); - } else { - table_location = loc; } return ret; } diff --git a/src/sql/executor/ob_task_executor_ctx.h b/src/sql/executor/ob_task_executor_ctx.h index c76eb3430f69eba7e1a0451db3dda8d6e34a1b62..04b8310db86302a6b3cf731d5323a3f0b352872c 100644 --- a/src/sql/executor/ob_task_executor_ctx.h +++ b/src/sql/executor/ob_task_executor_ctx.h @@ -310,8 +310,8 @@ public: ObTaskExecutorCtx& ctx, uint64_t table_location_key, uint64_t ref_table_id, ObPhyTableLocation*& table_location); static ObPhyTableLocation* get_phy_table_location_for_update( ObTaskExecutorCtx& ctx, uint64_t table_location_key, uint64_t ref_table_id); - static int get_full_table_phy_table_location(ObExecContext& ctx, uint64_t table_location_key, uint64_t ref_table_id, - bool is_weak, const ObPhyTableLocation*& table_location); + static int get_full_table_phy_table_location(ObExecContext &ctx, uint64_t table_location_key, uint64_t ref_table_id, + bool is_weak, ObPhyTableLocationGuard &table_location); static int extract_server_participants( ObExecContext& ctx, const common::ObAddr& svr, common::ObPartitionIArray& participants); diff --git a/src/sql/ob_phy_table_location.h b/src/sql/ob_phy_table_location.h index a907b0681931a8a3df2dcab72590990747b2baeb..e3baf15c2848182ffdefee5cb4d1f1b545cfde46 100644 --- a/src/sql/ob_phy_table_location.h +++ b/src/sql/ob_phy_table_location.h @@ -185,6 +185,41 @@ int ObPhyTableLocation::find_not_include_part_ids(const SrcArray& all_part_ids, } return ret; } + +class ObPhyTableLocationGuard { +public: + ObPhyTableLocationGuard() : loc_(nullptr){}; + ~ObPhyTableLocationGuard() + { + if (loc_) { + loc_->~ObPhyTableLocation(); + loc_ = nullptr; + } + } + int new_location(common::ObIAllocator &allocator) + { + int ret = common::OB_SUCCESS; + void *buf = nullptr; + if (OB_NOT_NULL(loc_)) { + // init twice + ret = common::OB_ERR_UNEXPECTED; + } else if (nullptr == (buf = allocator.alloc(sizeof(ObPhyTableLocation)))) { + ret = common::OB_ALLOCATE_MEMORY_FAILED; + } else if (NULL == (loc_ = new (buf) ObPhyTableLocation())) { + ret = common::OB_ERR_UNEXPECTED; + } + return ret; + } + // caller must ensure that the loc_ is not NULL before call get_loc() + ObPhyTableLocation *get_loc() + { + return loc_; + } + +private: + ObPhyTableLocation *loc_; +}; + } // namespace sql } // namespace oceanbase #endif /* OCEANBASE_SQL_OB_PHY_TABLE_LOCATION_ */