提交 7d1b99f9 编写于 作者: D dapan1121

fix bug

上级 c7c0fd1d
......@@ -392,7 +392,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->tmrId = 0;
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, (void *)pObj->rid, NULL);
// TODO the pObj->pStream may be released if error happens
if (pObj->pStream) {
......@@ -406,18 +406,28 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
}
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqObj *pObj = (SCqObj *)param;
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
if (tres == NULL && row == NULL) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
taosReleaseRef(cqObjRef, (int64_t)param);
return;
}
SCqContext *pContext = pObj->pContext;
STSchema *pSchema = pObj->pSchema;
if (pObj->pStream == NULL) return;
if (pObj->pStream == NULL) {
taosReleaseRef(cqObjRef, (int64_t)param);
return;
}
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
......@@ -468,5 +478,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
// write into vnode write queue
pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL);
free(buffer);
taosReleaseRef(cqObjRef, (int64_t)param);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册