提交 97b3bfab 编写于 作者: R raywill 提交者: LINGuanRen

result not in order when range partition truncated or splited

上级 e4bfc6dc
......@@ -42,30 +42,6 @@ ObPhyOperatorType ObGIInput::get_phy_op_type() const
return PHY_GRANULE_ITERATOR;
}
int ObGIInput::assign_ranges(const common::ObIArray<common::ObNewRange>& ranges)
{
int ret = OB_SUCCESS;
FOREACH_CNT_X(it, ranges, OB_SUCC(ret))
{
if (OB_FAIL(ranges_.push_back(*it))) {
LOG_WARN("failed to push range", K(ret));
}
}
return ret;
}
int ObGIInput::assign_pkeys(const common::ObIArray<common::ObPartitionKey>& pkeys)
{
int ret = OB_SUCCESS;
FOREACH_CNT_X(it, pkeys, OB_SUCC(ret))
{
if (OB_FAIL(pkeys_.push_back(*it))) {
LOG_WARN("failed to push range", K(ret));
}
}
return ret;
}
int ObGIInput::deep_copy_range(ObIAllocator* allocator, const ObNewRange& src, ObNewRange& dst)
{
int ret = OB_SUCCESS;
......@@ -324,8 +300,6 @@ int ObGranuleIterator::rescan(ObExecContext& ctx) const
gi_ctx->state_ = GI_GET_NEXT_GRANULE_TASK;
}
} else {
// 在partition_wise_join的情况, 按woker第一次完整执行所抢占的任务执行.
// 在执行过程中缓存住了自己的任务队列.
if (GI_UNINITIALIZED == gi_ctx->state_ || GI_PREPARED == gi_ctx->state_) {
/*do nothing*/
} else {
......
......@@ -53,8 +53,6 @@ public:
{
return parallelism_;
}
int assign_ranges(const common::ObIArray<common::ObNewRange>& ranges);
int assign_pkeys(const common::ObIArray<common::ObPartitionKey>& pkeys);
void set_granule_pump(ObGranulePump* pump)
{
pump_ = pump;
......@@ -137,9 +135,9 @@ public:
int64_t tablet_size_;
int64_t worker_id_;
uint64_t tsc_op_id_;
common::ObSEArray<common::ObNewRange, 16> ranges_;
common::ObSEArray<common::ObPartitionKey, 16> pkeys_;
ObGranulePump* pump_;
common::ObSEArray<common::ObNewRange, 1> ranges_;
common::ObSEArray<common::ObPartitionKey, 1> pkeys_;
ObGranulePump *pump_;
ObGranuleIteratorState state_;
bool all_task_fetched_;
......
......@@ -48,31 +48,7 @@ void ObGIOpInput::set_deserialize_allocator(common::ObIAllocator* allocator)
deserialize_allocator_ = allocator;
}
int ObGIOpInput::assign_ranges(const common::ObIArray<common::ObNewRange>& ranges)
{
int ret = OB_SUCCESS;
FOREACH_CNT_X(it, ranges, OB_SUCC(ret))
{
if (OB_FAIL(ranges_.push_back(*it))) {
LOG_WARN("failed to push range", K(ret));
}
}
return ret;
}
int ObGIOpInput::assign_pkeys(const common::ObIArray<common::ObPartitionKey>& pkeys)
{
int ret = OB_SUCCESS;
FOREACH_CNT_X(it, pkeys, OB_SUCC(ret))
{
if (OB_FAIL(pkeys_.push_back(*it))) {
LOG_WARN("failed to push range", K(ret));
}
}
return ret;
}
int ObGIOpInput::deep_copy_range(ObIAllocator* allocator, const ObNewRange& src, ObNewRange& dst)
int ObGIOpInput::deep_copy_range(ObIAllocator *allocator, const ObNewRange &src, ObNewRange &dst)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(allocator)) {
......@@ -362,8 +338,6 @@ int ObGranuleIteratorOp::rescan()
state_ = GI_GET_NEXT_GRANULE_TASK;
}
} else {
// 在partition_wise_join的情况, 按woker第一次完整执行所抢占的任务执行.
// 在执行过程中缓存住了自己的任务队列.
if (GI_UNINITIALIZED == state_ || GI_PREPARED == state_) {
/*do nothing*/
} else {
......
......@@ -59,7 +59,6 @@ public:
{
return worker_id_;
}
private:
int deep_copy_range(ObIAllocator* allocator, const ObNewRange& src, ObNewRange& dst);
......@@ -71,9 +70,8 @@ public:
int64_t worker_id_;
// Need serialize
common::ObSEArray<common::ObNewRange, 16> ranges_;
// use partition key/partition idx to tag partition
common::ObSEArray<common::ObPartitionKey, 16> pkeys_;
common::ObSEArray<common::ObNewRange, 1> ranges_;
common::ObSEArray<common::ObPartitionKey, 1> pkeys_;
ObGranulePump* pump_;
private:
......@@ -232,8 +230,6 @@ private:
const ObGITaskSet* rescan_taskset_ = NULL;
common::ObSEArray<ObGITaskSet::Pos, OB_MIN_PARALLEL_TASK_COUNT * 2> rescan_tasks_;
int64_t rescan_task_idx_;
// full pwj场景下, 在执行过程中缓存住了自己的任务队列.
// 供GI rescan使用
common::ObSEArray<ObGranuleTaskInfo, 2> pwj_rescan_task_infos_;
};
......
......@@ -583,7 +583,8 @@ int ObPXServerAddrUtil::set_sqcs_accessed_location(ObExecContext& ctx, ObDfo& df
} else if (OB_FAIL(get_access_partition_order<NEW_ENG>(dfo, phy_op, asc_order))) {
LOG_WARN("fail to get table scan partition order", K(ret));
} else if (OB_FAIL(ObPXServerAddrUtil::reorder_all_partitions(
table_location_key, locations, temp_locations, asc_order, ctx, base_order))) {
table_location_key, table_loc->get_ref_table_id(),
locations, temp_locations, asc_order, ctx, base_order))) {
LOG_WARN("fail to reorder all partitions", K(ret));
} else {
LOG_TRACE("sqc partition order is", K(asc_order));
......@@ -619,24 +620,106 @@ int ObPXServerAddrUtil::set_sqcs_accessed_location(ObExecContext& ctx, ObDfo& df
return ret;
}
int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key,
// used to fast lookup from phy partition id to partition order(index)
// for a range partition, the greater the range, the greater the partition_index
// for a hash partition, the index means nothing
int ObPXServerAddrUtil::build_partition_index_lookup_map(ObTaskExecutorCtx &task_exec_ctx,
uint64_t ref_table_id,
ObPartitionIndexMap &idx_map)
{
int ret = OB_SUCCESS;
schema::ObSchemaGetterGuard schema_guard;
const schema::ObSimpleTableSchemaV2 *table_schema = NULL;
if (OB_ISNULL(task_exec_ctx.schema_service_)) {
} else if (OB_FAIL(task_exec_ctx.schema_service_->get_schema_guard(schema_guard))) {
LOG_WARN("fail get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_table_schema(ref_table_id, table_schema))) {
LOG_WARN("fail get table schema", K(ref_table_id), K(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("fail get schema", K(ref_table_id), K(ret));
} else if (OB_FAIL(idx_map.create(table_schema->get_all_part_num(), "PartOrderIdx"))) {
LOG_WARN("fail create index map", K(ret), "cnt", table_schema->get_all_part_num());
} else {
schema::ObTablePartItemIterator iter(*table_schema);
schema::ObPartitionItem item;
do {
if (OB_FAIL(iter.next(item))) {
if (OB_ITER_END != ret) {
LOG_WARN("fail get next partition item from iterator", K(ref_table_id), K(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else if (OB_FAIL(idx_map.set_refactored(item.partition_id_, item.partition_idx_))) {
LOG_WARN("fail set value to hashmap", K(item), K(ret));
}
} while (OB_SUCC(ret));
}
return ret;
}
class ObPXPartitionOrderIndexCmp
{
public:
ObPXPartitionOrderIndexCmp(bool asc, ObPartitionIndexMap *map)
: asc_(asc), map_(map)
{}
bool operator() (const ObPartitionReplicaLocation &left, const ObPartitionReplicaLocation &right)
{
int ret = OB_SUCCESS;
bool bret = false;
int64_t lv, rv;
if (OB_FAIL(map_->get_refactored(left.get_partition_id(), lv))) {
LOG_WARN("fail get partition index", K(asc_), K(left), K(right), K(ret));
throw OB_EXCEPTION<OB_HASH_NOT_EXIST>();
} else if (OB_FAIL(map_->get_refactored(right.get_partition_id(), rv))) {
LOG_WARN("fail get partition index", K(asc_), K(left), K(right), K(ret));
throw OB_EXCEPTION<OB_HASH_NOT_EXIST>();
} else {
bret = asc_ ? (lv < rv) : (lv > rv);
}
return bret;
}
private:
bool asc_;
ObPartitionIndexMap *map_;
};
int ObPXServerAddrUtil::reorder_all_partitions(int64_t table_location_key, int64_t ref_table_id,
const ObPartitionReplicaLocationIArray& src_locations, ObPartitionReplicaLocationIArray& dst_locations, bool asc,
ObExecContext& exec_ctx, ObIArray<int64_t>& base_order)
{
int ret = OB_SUCCESS;
dst_locations.reset();
if (src_locations.count() > 1) {
ObPartitionIndexMap part_order_map;
if (OB_FAIL(build_partition_index_lookup_map(exec_ctx.get_task_exec_ctx(),
ref_table_id, part_order_map))) {
LOG_WARN("fail build index lookup map", K(ret));
}
for (int i = 0; i < src_locations.count() && OB_SUCC(ret); ++i) {
if (OB_FAIL(dst_locations.push_back(src_locations.at(i)))) {
LOG_WARN("fail to push dst locations", K(ret));
}
}
if (OB_SUCC(ret)) {
try {
std::sort(&dst_locations.at(0),
&dst_locations.at(0) + dst_locations.count(),
asc ? ObPartitionReplicaLocation::compare_part_loc_asc : ObPartitionReplicaLocation::compare_part_loc_desc);
PWJPartitionIdMap* pwj_map = NULL;
if (OB_NOT_NULL(pwj_map = exec_ctx.get_pwj_map())) {
ObPXPartitionOrderIndexCmp(asc, &part_order_map));
} catch (OB_BASE_EXCEPTION &except) {
if (OB_HASH_NOT_EXIST == (ret = except.get_errno())) {
// schema changed during execution, notify to retry
ret = OB_SCHEMA_ERROR;
}
}
PWJPartitionIdMap *pwj_map = NULL;
if (OB_FAIL(ret)) {
LOG_WARN("fail to sort locations", K(ret));
} else if (OB_NOT_NULL(pwj_map = exec_ctx.get_pwj_map())) {
PartitionIdArray partition_id_array;
if (OB_FAIL(pwj_map->get_refactored(table_location_key, partition_id_array))) {
if (OB_HASH_NOT_EXIST == ret) {
......
......@@ -96,6 +96,8 @@ public:
#define ENG_OP typename ObEngineOpTraits<NEW_ENG>
typedef common::hash::ObHashMap<uint64_t, int64_t, common::hash::NoPthreadDefendMode> ObPartitionIndexMap;
class ObPXServerAddrUtil {
class ObPxSqcTaskCountMeta {
public:
......@@ -132,8 +134,12 @@ private:
template <bool NEW_ENG>
static int find_dml_ops_inner(common::ObIArray<const ENG_OP::TableModify*>& insert_ops, const ENG_OP::Root& op);
static int check_partition_wise_location_valid(ObPartitionReplicaLocationIArray& tsc_locations);
static int reorder_all_partitions(int64_t location_key, const ObPartitionReplicaLocationIArray& src_locations,
static int build_partition_index_lookup_map(
ObTaskExecutorCtx &task_exec_ctx,
uint64_t ref_table_id,
ObPartitionIndexMap &idx_map);
static int reorder_all_partitions(int64_t location_key, int64_t ref_table_id,
const ObPartitionReplicaLocationIArray& src_locations,
ObPartitionReplicaLocationIArray& tsc_locations, bool asc, ObExecContext& exec_ctx,
ObIArray<int64_t>& base_order);
......
......@@ -335,7 +335,6 @@ int ObTaskExecutorCtxUtil::get_part_runner_server(
return ret;
}
// 每次调用都会 allocate 一个 table_location
int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext &ctx, uint64_t table_location_key,
uint64_t ref_table_id, bool is_weak, ObPhyTableLocationGuard &table_location)
{
......@@ -385,6 +384,8 @@ int ObTaskExecutorCtxUtil::get_full_table_phy_table_location(ObExecContext &ctx,
LOG_WARN("fail alloc new location", K(ret));
} else if (OB_FAIL(table_location.get_loc()->add_partition_locations(phy_location_info))) {
LOG_WARN("add partition locations failed", K(ret), K(phy_location_info));
} else {
table_location.get_loc()->set_table_location_key(table_location_key, ref_table_id);
}
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册