提交 c2fea4f4 编写于 作者: Q qianchanger 提交者: ob-robot

Fix dh window function report -4016

上级 20e5641c
......@@ -47,29 +47,44 @@ int ObBarrierPieceMsgListener::on_message(
// 已经收到所有 piece,发送 sqc 个 whole
// 各个 sqc 广播给各自 task
if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) {
ObBarrierWholeMsg whole;
whole.op_id_ = ctx.op_id_;
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(whole, ctx.timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched barrier whole msg",
K(idx), K(cnt), K(whole), K(*ch));
}
if (OB_FAIL(ctx.send_whole_msg(sqcs))) {
LOG_WARN("fail to send whole msg", K(ret));
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
IGNORE_RETURN ctx.reset_resource();
}
return ret;
}
int ObBarrierPieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
{
int ret = OB_SUCCESS;
ObBarrierWholeMsg whole;
whole.op_id_ = op_id_;
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(whole, timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched barrier whole msg",
K(idx), K(cnt), K(whole), K(*ch));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
return ret;
}
void ObBarrierPieceMsgCtx::reset_resource()
{
received_ = 0;
}
int ObBarrierPieceMsgCtx::alloc_piece_msg_ctx(const ObBarrierPieceMsg &pkt,
ObPxCoordInfo &,
ObExecContext &ctx,
......
......@@ -83,6 +83,8 @@ public:
ObBarrierPieceMsgCtx(uint64_t op_id, int64_t task_cnt, int64_t timeout_ts)
: ObPieceMsgCtx(op_id, task_cnt, timeout_ts), received_(0) {}
~ObBarrierPieceMsgCtx() = default;
virtual int send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs) override;
virtual void reset_resource() override;
static int alloc_piece_msg_ctx(const ObBarrierPieceMsg &pkt,
ObPxCoordInfo &coord_info,
ObExecContext &ctx,
......
......@@ -33,28 +33,10 @@ int ObInitChannelPieceMsgListener::on_message(
// have received all piece from px receive
// send whole msg to px transmit
if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) {
// get child transmit op's id.
// in current impl. the paired transmit op id = receive op id + 1
//TODO : https://aone.alibaba-inc.com/task/43312101
ctx.whole_msg_.op_id_ = ctx.op_id_ + 1;
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched winbuf whole msg",
K(idx), K(cnt), K(ctx.whole_msg_), K(*ch));
}
if (OB_FAIL(ctx.send_whole_msg(sqcs))) {
LOG_WARN("fail to send whole msg", K(ret));
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
ctx.whole_msg_.reset();
IGNORE_RETURN ctx.reset_resource();
}
return ret;
}
......@@ -91,4 +73,37 @@ int ObInitChannelWholeMsg::assign(const ObInitChannelWholeMsg &other, common::Ob
return ret;
}
bool ObInitChannelPieceMsgCtx::enable_dh_channel_sync(const bool channel_sync_enabled) { return channel_sync_enabled; }
\ No newline at end of file
bool ObInitChannelPieceMsgCtx::enable_dh_channel_sync(const bool channel_sync_enabled) { return channel_sync_enabled; }
int ObInitChannelPieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
{
int ret = OB_SUCCESS;
// get child transmit op's id.
// in current impl. the paired transmit op id = receive op id + 1
//TODO : https://aone.alibaba-inc.com/task/43312101
whole_msg_.op_id_ = op_id_ + 1;
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(whole_msg_, timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched winbuf whole msg",
K(idx), K(cnt), K(whole_msg_), K(*ch));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
return ret;
}
void ObInitChannelPieceMsgCtx::reset_resource()
{
whole_msg_.reset();
received_ = 0;
}
......@@ -71,6 +71,8 @@ public:
tenant_id_(tenant_id)/*, whole_msg_()*/ {}
~ObInitChannelPieceMsgCtx() = default;
INHERIT_TO_STRING_KV("meta", ObPieceMsgCtx, K_(received));
virtual int send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs) override;
virtual void reset_resource() override;
static int alloc_piece_msg_ctx(const ObInitChannelPieceMsg &pkt,
ObPxCoordInfo &coord_info,
ObExecContext &ctx,
......@@ -100,4 +102,4 @@ private:
}
}
#endif /* __OB_SQL_ENG_PX_DH_INIT_CHANNEL_H__ */
\ No newline at end of file
#endif /* __OB_SQL_ENG_PX_DH_INIT_CHANNEL_H__ */
......@@ -107,6 +107,7 @@ public:
ObOptStatsGatherPieceMsgCtx(uint64_t op_id, int64_t task_cnt, int64_t timeout_ts)
: ObPieceMsgCtx(op_id, task_cnt, timeout_ts), received_(0), osg_info_() {}
~ObOptStatsGatherPieceMsgCtx() = default;
virtual void reset_resource() {};
static int alloc_piece_msg_ctx(const ObOptStatsGatherPieceMsg &pkt,
ObPxCoordInfo &coord_info,
ObExecContext &ctx,
......
......@@ -234,6 +234,72 @@ int ObRDWFPieceMsgCtx::formalize_store_row()
return ret;
}
int ObRDWFPieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
{
int ret = OB_SUCCESS;
ObOperatorKit *op_kit = exec_ctx_.get_operator_kit(op_id_);
if (NULL == op_kit || NULL == op_kit->spec_ || PHY_WINDOW_FUNCTION != op_kit->spec_->type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("no window function operator", K(ret), KP(op_kit), K(op_id_));
} else {
auto wf = static_cast<const ObWindowFunctionSpec *>(op_kit->spec_);
if (OB_FAIL(wf->rd_generate_patch(*this))) {
LOG_WARN("calculate range distribution window function final res failed", K(ret));
} else if (formalize_store_row()) {
LOG_WARN("formalize store row failed", K(ret));
} else {
LOG_DEBUG("after formalize", K(infos_));
}
}
ObRDWFWholeMsg *responses = NULL;
if (OB_SUCC(ret)) {
responses = static_cast<ObRDWFWholeMsg *>(
arena_alloc_.alloc(sizeof(ObRDWFWholeMsg) * sqcs.count()));
OV(NULL != responses, OB_ALLOCATE_MEMORY_FAILED);
for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); i++) {
new (&responses[i])ObRDWFWholeMsg();
}
}
if (OB_SUCC(ret)) {
// order by sqc_id_, thread_id_
std::sort(infos_.begin(), infos_.end(), [](ObRDWFPartialInfo *l,
ObRDWFPartialInfo *r) {
return std::tie(l->sqc_id_, l->thread_id_) < std::tie(r->sqc_id_, r->thread_id_);
});
for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); i++) {
auto &sqc = *sqcs.at(i);
auto &msg = responses[i];
msg.op_id_ = op_id_;
auto it = std::lower_bound(infos_.begin(), infos_.end(), sqc.get_sqc_id(),
[&](ObRDWFPartialInfo *info, int64_t id)
{ return info->sqc_id_ < id; });
if (it == infos_.end() || (*it)->sqc_id_ != sqc.get_sqc_id()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sqc not found", K(ret), K(sqc));
} else {
while (OB_SUCC(ret) && it != infos_.end() && (*it)->sqc_id_ == sqc.get_sqc_id()) {
OZ(msg.infos_.push_back(*it));
it++;
}
}
auto ch = sqc.get_qc_channel();
CK(NULL != ch);
OZ(ch->send(msg, timeout_ts_));
OZ(ch->flush(true /* wait */, false /* wait response */));
}
OZ(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs));
}
for (int64_t i = 0; NULL != responses && i < sqcs.count(); i++) {
responses[i].~ObRDWFWholeMsg();
}
return ret;
}
void ObRDWFPieceMsgCtx::reset_resource()
{
received_ = 0;
}
int ObRDWFWholeMsg::assign(const ObRDWFWholeMsg &msg)
{
int ret = OB_SUCCESS;
......@@ -263,64 +329,10 @@ int ObRDWFPieceMsgListener::on_message(ObRDWFPieceMsgCtx &ctx,
}
if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) {
ObOperatorKit *op_kit = ctx.exec_ctx_.get_operator_kit(ctx.op_id_);
if (NULL == op_kit || NULL == op_kit->spec_ || PHY_WINDOW_FUNCTION != op_kit->spec_->type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("no window function operator", K(ret), KP(op_kit), K(ctx.op_id_));
} else {
auto wf = static_cast<const ObWindowFunctionSpec *>(op_kit->spec_);
if (OB_FAIL(wf->rd_generate_patch(ctx))) {
LOG_WARN("calculate range distribution window function final res failed", K(ret));
} else if (ctx.formalize_store_row()) {
LOG_WARN("formalize store row failed", K(ret));
} else {
LOG_DEBUG("after formalize", K(ctx.infos_));
}
}
ObRDWFWholeMsg *responses = NULL;
if (OB_SUCC(ret)) {
responses = static_cast<ObRDWFWholeMsg *>(
ctx.arena_alloc_.alloc(sizeof(ObRDWFWholeMsg) * sqcs.count()));
OV(NULL != responses, OB_ALLOCATE_MEMORY_FAILED);
for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); i++) {
new (&responses[i])ObRDWFWholeMsg();
}
}
if (OB_SUCC(ret)) {
// order by sqc_id_, thread_id_
std::sort(ctx.infos_.begin(), ctx.infos_.end(), [](ObRDWFPartialInfo *l,
ObRDWFPartialInfo *r) {
return std::tie(l->sqc_id_, l->thread_id_) < std::tie(r->sqc_id_, r->thread_id_);
});
for (int64_t i = 0; OB_SUCC(ret) && i < sqcs.count(); i++) {
auto &sqc = *sqcs.at(i);
auto &msg = responses[i];
msg.op_id_ = ctx.op_id_;
auto it = std::lower_bound(ctx.infos_.begin(), ctx.infos_.end(), sqc.get_sqc_id(),
[&](ObRDWFPartialInfo *info, int64_t id)
{ return info->sqc_id_ < id; });
if (it == ctx.infos_.end() || (*it)->sqc_id_ != sqc.get_sqc_id()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sqc not found", K(ret), K(sqc));
} else {
while (OB_SUCC(ret) && it != ctx.infos_.end() && (*it)->sqc_id_ == sqc.get_sqc_id()) {
OZ(msg.infos_.push_back(*it));
it++;
}
}
auto ch = sqc.get_qc_channel();
CK(NULL != ch);
OZ(ch->send(msg, ctx.timeout_ts_));
OZ(ch->flush(true /* wait */, false /* wait response */));
}
OZ(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs));
}
for (int64_t i = 0; NULL != responses && i < sqcs.count(); i++) {
responses[i].~ObRDWFWholeMsg();
if (OB_FAIL(ctx.send_whole_msg(sqcs))) {
LOG_WARN("fail to send whole msg", K(ret));
}
IGNORE_RETURN ctx.reset_resource();
}
return ret;
}
......
......@@ -122,7 +122,8 @@ public:
exec_ctx_(exec_ctx), eval_ctx_(exec_ctx)
{
}
virtual int send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs) override;
virtual void reset_resource() override;
static int alloc_piece_msg_ctx(const ObRDWFPieceMsg &pkt,
ObPxCoordInfo &coord_info,
ObExecContext &ctx,
......
......@@ -46,28 +46,12 @@ int ObRollupKeyPieceMsgListener::on_message(
LOG_TRACE("got a win buf picece msg", "all_got", ctx.received_, "expected", ctx.task_cnt_);
}
if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) {
// all piece msg has been received
ctx.whole_msg_.op_id_ = ctx.op_id_;
if (OB_FAIL(ctx.process_ndv())) {
LOG_WARN("failed to process ndv", K(ret));
} else if (OB_FAIL(ctx.send_whole_msg(sqcs))) {
LOG_WARN("fail to send whole msg", K(ret));
}
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched winbuf whole msg",
K(idx), K(cnt), K(ctx.whole_msg_), K(*ch));
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
}
IGNORE_RETURN ctx.reset_resource();
}
return ret;
}
......@@ -159,6 +143,36 @@ int ObRollupKeyPieceMsgCtx::alloc_piece_msg_ctx(const ObRollupKeyPieceMsg &pkt,
return ret;
}
int ObRollupKeyPieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
{
int ret = OB_SUCCESS;
// all piece msg has been received
whole_msg_.op_id_ = op_id_;
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(whole_msg_, timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched winbuf whole msg",
K(idx), K(cnt), K(whole_msg_), K(*ch));
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
}
return ret;
}
void ObRollupKeyPieceMsgCtx::reset_resource()
{
received_ = 0;
}
int ObRollupKeyWholeMsg::assign(const ObRollupKeyWholeMsg &other)
{
int ret = OB_SUCCESS;
......
......@@ -99,7 +99,8 @@ public:
received_msgs_.reset();
}
INHERIT_TO_STRING_KV("meta", ObPieceMsgCtx, K_(received));
virtual int send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs) override;
virtual void reset_resource() override;
static int alloc_piece_msg_ctx(const ObRollupKeyPieceMsg &pkt,
ObPxCoordInfo &coord_info,
ObExecContext &ctx,
......
......@@ -574,33 +574,48 @@ int ObDynamicSamplePieceMsgCtx::on_message(
// send whole message when all piece received
if (OB_SUCC(ret) && received_ == task_cnt_) {
SMART_VAR(ObDynamicSampleWholeMsg, whole) {
whole.op_id_ = op_id_;
if (OB_FAIL(build_whole_msg(whole))) {
LOG_WARN("build sample whole message failed", K(ret), K(*this));
}
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(whole, timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_TRACE("dispatched sample whole msg",
K(idx), K(cnt), K(whole), K(*ch));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
if (OB_FAIL(send_whole_msg(sqcs))) {
LOG_WARN("fail to send whole msg", K(ret));
}
IGNORE_RETURN reset_resource();
}
return ret;
}
int ObDynamicSamplePieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
{
int ret = OB_SUCCESS;
SMART_VAR(ObDynamicSampleWholeMsg, whole) {
whole.op_id_ = op_id_;
if (OB_FAIL(build_whole_msg(whole))) {
LOG_WARN("build sample whole message failed", K(ret), K(*this));
}
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(whole, timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_TRACE("dispatched sample whole msg",
K(idx), K(cnt), K(whole), K(*ch));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
}
return ret;
}
void ObDynamicSamplePieceMsgCtx::reset_resource()
{
received_ = 0;
}
int ObDynamicSamplePieceMsgListener::on_message(
ObDynamicSamplePieceMsgCtx &ctx,
common::ObIArray<ObPxSqcMeta *> &sqcs,
......
......@@ -111,6 +111,8 @@ public:
const SortDef &sort_def);
virtual ~ObDynamicSamplePieceMsgCtx() = default;
int init(const ObIArray<uint64_t> &tablet_ids);
virtual int send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs) override;
virtual void reset_resource() override;
virtual void destroy();
int process_piece(const ObDynamicSamplePieceMsg &piece);
int split_range(
......
......@@ -121,6 +121,37 @@ int ObReportingWFPieceMsgCtx::alloc_piece_msg_ctx(const ObReportingWFPieceMsg &p
return ret;
}
int ObReportingWFPieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
{
int ret = OB_SUCCESS;
// datahub already received all pieces, will send whole msg to sqc
whole_msg_.op_id_ = op_id_;
// no need to sort here, will use pby_hash_value_array_ to build hash map later
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(whole_msg_, timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true /* wait */, false /* wait response */))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched winbuf whole msg", K(idx), K(cnt), K(whole_msg_), K(*ch));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
return ret;
}
void ObReportingWFPieceMsgCtx::reset_resource()
{
whole_msg_.reset();
received_ = 0;
}
int ObReportingWFPieceMsgListener::on_message(
ObReportingWFPieceMsgCtx &ctx,
common::ObIArray<ObPxSqcMeta *> &sqcs,
......@@ -148,27 +179,10 @@ int ObReportingWFPieceMsgListener::on_message(
LOG_TRACE("got a win buf picece msg", "all_got", ctx.received_, "expected", ctx.task_cnt_);
}
if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) {
// datahub already received all pieces, will send whole msg to sqc
ctx.whole_msg_.op_id_ = ctx.op_id_;
// no need to sort here, will use pby_hash_value_array_ to build hash map later
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true /* wait */, false /* wait response */))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched winbuf whole msg", K(idx), K(cnt), K(ctx.whole_msg_), K(*ch));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
if (OB_FAIL(ctx.send_whole_msg(sqcs))) {
LOG_WARN("fail to send whole msg", K(ret));
}
ctx.received_ = 0;
ctx.whole_msg_.reset();
IGNORE_RETURN ctx.reset_resource();
}
return ret;
}
......
......@@ -82,6 +82,8 @@ public:
: ObPieceMsgCtx(op_id, task_cnt, timeout_ts), received_(0), tenant_id_(tenant_id),
whole_msg_() {}
virtual ~ObReportingWFPieceMsgCtx() = default;
virtual int send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs) override;
virtual void reset_resource() override;
INHERIT_TO_STRING_KV("meta", ObPieceMsgCtx, K_(received));
static int alloc_piece_msg_ctx(const ObReportingWFPieceMsg &pkt,
ObPxCoordInfo &coord_info,
......
......@@ -61,29 +61,10 @@ int ObWinbufPieceMsgListener::on_message(
// 已经收到所有 piece,发送 sqc 个 whole
// 各个 sqc 广播给各自 task
if (OB_SUCC(ret) && ctx.received_ == ctx.task_cnt_) {
ctx.whole_msg_.is_datum_ = pkt.is_datum_;
ctx.whole_msg_.op_id_ = ctx.op_id_;
ctx.whole_msg_.is_empty_ = (!ctx.whole_msg_.row_store_.is_inited()) &&
(!ctx.whole_msg_.datum_store_.is_inited());
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(ctx.whole_msg_, ctx.timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched winbuf whole msg",
K(idx), K(cnt), K(ctx.whole_msg_), K(*ch));
}
if (OB_FAIL(ctx.send_whole_msg(sqcs))) {
LOG_WARN("fail to send whole msg", K(ret));
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
ctx.whole_msg_.reset();
ctx.received_ = 0;
IGNORE_RETURN ctx.reset_resource();
}
return ret;
}
......@@ -112,6 +93,38 @@ int ObWinbufPieceMsgCtx::alloc_piece_msg_ctx(const ObWinbufPieceMsg &pkt,
return ret;
}
int ObWinbufPieceMsgCtx::send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs)
{
int ret = OB_SUCCESS;
whole_msg_.is_datum_ = true;
whole_msg_.op_id_ = op_id_;
whole_msg_.is_empty_ = (!whole_msg_.datum_store_.is_inited());
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
dtl::ObDtlChannel *ch = sqcs.at(idx)->get_qc_channel();
if (OB_ISNULL(ch)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null expected", K(ret));
} else if (OB_FAIL(ch->send(whole_msg_, timeout_ts_))) {
LOG_WARN("fail push data to channel", K(ret));
} else if (OB_FAIL(ch->flush(true, false))) {
LOG_WARN("fail flush dtl data", K(ret));
} else {
LOG_DEBUG("dispatched winbuf whole msg",
K(idx), K(cnt), K(whole_msg_), K(*ch));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObPxChannelUtil::sqcs_channles_asyn_wait(sqcs))) {
LOG_WARN("failed to wait response", K(ret));
}
return ret;
}
void ObWinbufPieceMsgCtx::reset_resource()
{
whole_msg_.reset();
received_ = 0;
}
namespace ob_dh_winbuf {
template <typename T, typename B>
......
......@@ -112,6 +112,8 @@ public:
ObExecContext &ctx,
int64_t task_cnt,
ObPieceMsgCtx *&msg_ctx);
virtual int send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs) override;
virtual void reset_resource() override;
int received_; // 已经收到的 piece 数量
int64_t tenant_id_;
ObWinbufWholeMsg whole_msg_;
......
......@@ -26,6 +26,8 @@ public:
ObPieceMsgCtx(uint64_t op_id, int64_t task_cnt, int64_t timeout_ts)
: op_id_(op_id), task_cnt_(task_cnt), timeout_ts_(timeout_ts) {}
virtual ~ObPieceMsgCtx() {}
virtual int send_whole_msg(common::ObIArray<ObPxSqcMeta *> &sqcs) { return OB_SUCCESS; };
virtual void reset_resource() = 0;
VIRTUAL_TO_STRING_KV(K_(op_id), K_(task_cnt));
virtual void destroy() {}
uint64_t op_id_; // 哪个算子使用 datahub 服务
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册