提交 04858fae 编写于 作者: wmmhello's avatar wmmhello

fix:tsdbreader is free by mistake

上级 f8de38e5
......@@ -90,7 +90,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
*/
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
void qSetTaskCode(qTaskInfo_t tinfo, int32_t code);
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
......
......@@ -75,7 +75,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code));
dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, tstrerror(code));
vmSendRsp(pMsg, code);
}
......
......@@ -384,7 +384,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
bool exec = tqIsHandleExec(pHandle);
if(!exec) {
tqSetHandleExec(pHandle);
qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
break;
......@@ -586,9 +586,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
}
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pTaskInfo);
}
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
}
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
......
......@@ -84,8 +84,10 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
qStreamSetOpen(task);
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
code = qExecTask(task, &pDataBlock, &ts);
if (code != TSDB_CODE_SUCCESS) {
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
terrno = code;
return -1;
}
......@@ -128,8 +130,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("tmqsnap task start to execute");
if (qExecTask(task, &pDataBlock, &ts) < 0) {
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
int code = qExecTask(task, &pDataBlock, &ts);
if (code != 0) {
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
terrno = code;
return -1;
}
......
......@@ -180,10 +180,10 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
doSetTaskId(pTaskInfo->pRoot);
}
void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
SExecTaskInfo* pTaskInfo = tinfo;
pTaskInfo->code = code;
}
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
// SExecTaskInfo* pTaskInfo = tinfo;
// pTaskInfo->code = code;
//}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) {
......@@ -1080,7 +1080,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
const char* id = GET_TASKID(pTaskInfo);
// if pOffset equal to current offset, means continue consume
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset) && pOffset->type != TMQ_OFFSET__SNAPSHOT_DATA) {
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册