提交 d651ba02 编写于 作者: wmmhello's avatar wmmhello

fix:lose consume data because of exec close if consume while insert data

上级 1114d358
......@@ -198,6 +198,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
//
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
void qStreamSetOpen(qTaskInfo_t tinfo);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
......
......@@ -253,7 +253,7 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6
if (qStreamSetScanMemData(pTaskInfo, submit) != 0) {
return;
}
qStreamSetOpen(pTaskInfo);
// here start to scan submit block to extract the subscribed data
int32_t totalRows = 0;
......
......@@ -81,7 +81,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
qStreamSetOpen(task);
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
......
......@@ -192,7 +192,7 @@ enum {
OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
OP_EXEC_RECV = 0x11,
// OP_EXEC_RECV = 0x11,
};
typedef struct SOperatorFpSet {
......
......@@ -1074,6 +1074,12 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
return 0;
}
void qStreamSetOpen(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
pOperator->status = OP_NOT_OPENED;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
......@@ -1086,8 +1092,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
if (subType == TOPIC_SUB_TYPE__COLUMN) {
pOperator->status = OP_OPENED;
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream != 1) {
qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id);
......
......@@ -227,17 +227,8 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
blockDataCleanup(pFinalRes);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pTaskInfo->streamInfo.submit.msgStr) {
pOperator->status = OP_OPENED;
}
if (pOperator->status == OP_EXEC_DONE) {
// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
// pOperator->status = OP_OPENED;
// qDebug("projection in queue model, set status open and return null");
// return NULL;
// }
return NULL;
}
......@@ -262,11 +253,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL && pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) {
// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) {
// pOperator->status = OP_OPENED;
// return NULL;
// }
if (pBlock == NULL) {
qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
setOperatorCompleted(pOperator);
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册