diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 51d166ccb438f72a276e89c544a5db64afc33f06..63be743ef48d95f7a929cc55de9d8d0b281cc033 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -117,7 +117,7 @@ static int32_t flushFromResultBuf(SQInfo *pQInfo); bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; - + char *pElem = pFilterInfo->pData + pFilterInfo->info.bytes * elemPos; if (isNull(pElem, pFilterInfo->info.type)) { return false; @@ -126,7 +126,7 @@ bool doFilterData(SQuery *pQuery, int32_t elemPos) { bool qualified = false; for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) { SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j]; - + if (pFilterElem->fp(pFilterElem, pElem, pElem)) { qualified = true; break; @@ -765,7 +765,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas SArray *pDataBlock) { char *dataBlock = NULL; SQuery *pQuery = pRuntimeEnv->pQuery; - + SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; int32_t functionId = pQuery->pSelectExpr[col].base.functionId; @@ -778,18 +778,18 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas } else { pCtx->startOffset = pQuery->pos - (size - 1); } - + sas->offset = 0; sas->colList = pQuery->colList; sas->numOfCols = pQuery->numOfCols; sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); - + // here the pQuery->colList and sas->colList are identical for (int32_t i = 0; i < pQuery->numOfCols; ++i) { SColumnInfo *pColMsg = &pQuery->colList[i]; - + int32_t numOfCols = taosArrayGetSize(pDataBlock); - + dataBlock = NULL; for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor SColumnInfoData *p = taosArrayGet(pDataBlock, k); @@ -798,7 +798,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas break; } } - + assert(dataBlock != NULL); sas->data[i] = dataBlock/* + pQuery->colList[i].bytes*/; // start from the offset } @@ -838,7 +838,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); - + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k); @@ -901,10 +901,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { continue; } - + tfree(sasArray[i].data); } - + tfree(sasArray); } @@ -964,7 +964,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, * column in cache with the corresponding meter schema is reinforced. */ int32_t numOfCols = taosArrayGetSize(pDataBlock); - + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData *p = taosArrayGet(pDataBlock, i); if (pColIndex->colId == p->info.colId) { @@ -972,7 +972,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, } } } - + return NULL; } @@ -1037,7 +1037,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - + SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pQuery->current; @@ -1170,10 +1170,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { continue; } - + tfree(sasArray[i].data); } - + free(sasArray); } @@ -1362,7 +1362,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SColIndex* pIndex = &pSqlFuncMsg->colInfo; - + int32_t index = pSqlFuncMsg->colInfo.colIndex; if (TSDB_COL_IS_TAG(pIndex->flag)) { if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor @@ -1443,7 +1443,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); - + qTrace("QInfo:%p teardown runtime env", pQInfo); cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutput); @@ -1485,7 +1485,7 @@ static bool isQueryKilled(SQInfo *pQInfo) { pQInfo->killed = 1; return true; } - + return (pQInfo->killed == 1); #endif } @@ -1591,7 +1591,7 @@ static bool onlyQueryTags(SQuery* pQuery) { return false; } } - + return true; } @@ -1893,14 +1893,14 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); - + if (pMeter->numOfQueries > 0) { qTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid, pMeter->meterId, pMeter->numOfQueries); num++; } } - + /* * in order to reduce log output, for all meters of which numOfQueries count are 0, * we do not output corresponding information @@ -1922,26 +1922,26 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; int32_t colIndex = pFilterInfo->info.colIndex; - + // this column not valid in current data block if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) { continue; } - + // not support pre-filter operation on binary/nchar data type if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) { continue; } - + // all points in current column are NULL, no need to check its boundary value if (pDataStatis[colIndex].numOfNull == numOfTotalPoints) { continue; } - + if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) { float minval = *(double *)(&pDataStatis[colIndex].min); float maxval = *(double *)(&pDataStatis[colIndex].max); - + for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) { if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&minval, (char *)&maxval)) { return true; @@ -1956,7 +1956,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun } } } - + // todo disable this opt code block temporarily // for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { // int32_t functId = pQuery->pSelectExpr[i].base.functionId; @@ -2392,7 +2392,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim SQuery* pQuery = pRuntimeEnv->pQuery; int32_t numOfCols = pQuery->numOfOutput; printf("super table query intermediate result, total:%d\n", numOfRows); - + for (int32_t j = 0; j < numOfRows; ++j) { for (int32_t i = 0; i < numOfCols; ++i) { @@ -3534,7 +3534,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo SQuery * pQuery = pRuntimeEnv->pQuery; SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; - + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { @@ -4110,11 +4110,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { if (pInfo->id.tid == blockInfo.tid) { assert(pInfo->id.uid == blockInfo.uid); pTableQueryInfo = item->info; - + break; } } - + if (pTableQueryInfo != NULL) { break; } @@ -4182,11 +4182,11 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); pRuntimeEnv->pQueryHandle = NULL; } - + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp); taosArrayDestroy(tx); taosArrayDestroy(g1); - + if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vgroupIndex == -1) { int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; @@ -4859,22 +4859,22 @@ static void stableQueryImpl(SQInfo *pQInfo) { static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { int32_t j = 0; - + if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) { while(j < pQueryMsg->numOfTags) { if (pExprMsg->colInfo.colId == pTagCols[j].colId) { return j; } - + j += 1; } - + } else { while (j < pQueryMsg->numOfCols) { if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) { return j; } - + j += 1; } } @@ -4923,7 +4923,7 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx } } } - + return true; } @@ -5065,10 +5065,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pExprMsg = (SSqlFuncMsg *)pMsg; } - + if (!validateQuerySourceCols(pQueryMsg, *pExpr)) { tfree(*pExpr); - + return TSDB_CODE_INVALID_QUERY_MSG; } @@ -5111,12 +5111,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, (*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags); for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) { SColumnInfo* pTagCol = (SColumnInfo*) pMsg; - + pTagCol->colId = htons(pTagCol->colId); pTagCol->bytes = htons(pTagCol->bytes); pTagCol->type = htons(pTagCol->type); pTagCol->numOfFilters = 0; - + (*tagCols)[i] = *pTagCol; pMsg += sizeof(SColumnInfo); } @@ -5128,14 +5128,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen); pMsg += pQueryMsg->tagCondLen; } - + if (*pMsg != 0) { size_t len = strlen(pMsg) + 1; *tbnameCond = malloc(len); strcpy(*tbnameCond, pMsg); pMsg += len; } - + qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, @@ -5160,7 +5160,7 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz); return TSDB_CODE_APP_ERROR; } - + pArithExprInfo->pExpr = pExprNode; return TSDB_CODE_SUCCESS; } @@ -5225,7 +5225,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; int16_t functId = pExprs[i].base.functionId; - + if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); assert(j < pQueryMsg->numOfCols); @@ -5265,7 +5265,7 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { taosArrayPush(pGroupbyExpr->columnInfo, &pColIndex[i]); } - + return pGroupbyExpr; } @@ -5288,7 +5288,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData)); pFilterInfo->info = pQuery->colList[i]; - + pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters; pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem)); @@ -5435,9 +5435,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->colList[i] = pQueryMsg->colList[i]; pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters); } - + pQuery->tagColList = pTagCols; - + // calculate the result row size for (int16_t col = 0; col < numOfOutput; ++col) { assert(pExprs[col].bytes > 0); @@ -5484,22 +5484,23 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - + pQInfo->tableIdGroupInfo = *groupInfo; size_t numOfGroups = taosArrayGetSize(groupInfo->pGroupList); - + pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pQInfo->groupInfo.numOfTables = groupInfo->numOfTables; int tableIndex = 0; STimeWindow window = pQueryMsg->window; taosArraySort(pTableIdList, compareTableIdInfo); + for(int32_t i = 0; i < numOfGroups; ++i) { SArray* pa = taosArrayGetP(groupInfo->pGroupList, i); size_t s = taosArrayGetSize(pa); - + SArray* p1 = taosArrayInit(s, sizeof(SGroupItem)); - + for(int32_t j = 0; j < s; ++j) { STableId id = *(STableId*) taosArrayGet(pa, j); SGroupItem item = { .id = id }; @@ -5515,6 +5516,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, item.info->tableIndex = tableIndex++; taosArrayPush(p1, &item); } + taosArrayPush(pQInfo->groupInfo.pGroupList, &p1); } @@ -5658,7 +5660,7 @@ static void freeQInfo(SQInfo *pQInfo) { int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); for (int32_t i = 0; i < numOfGroups; ++i) { SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); - + size_t num = taosArrayGetSize(p); for(int32_t j = 0; j < num; ++j) { SGroupItem* item = taosArrayGet(p, j); @@ -5666,17 +5668,17 @@ static void freeQInfo(SQInfo *pQInfo) { destroyTableQueryInfo(item->info, pQuery->numOfOutput); } } - + taosArrayDestroy(p); } - + taosArrayDestroy(pQInfo->groupInfo.pGroupList); - + for(int32_t i = 0; i < numOfGroups; ++i) { SArray* p = taosArrayGetP(pQInfo->tableIdGroupInfo.pGroupList, i); taosArrayDestroy(p); } - + taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList); taosArrayDestroy(pQInfo->arrTableIdInfo); @@ -5684,14 +5686,14 @@ static void freeQInfo(SQInfo *pQInfo) { taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); tfree(pQuery->pGroupbyExpr); } - + tfree(pQuery->tagColList); tfree(pQuery->pFilterInfo); tfree(pQuery->colList); tfree(pQuery->sdata); - + tfree(pQuery); - + qTrace("QInfo:%p QInfo is freed", pQInfo); // destroy signature, in order to avoid the query process pass the object safety check @@ -5744,7 +5746,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); } - + // all data returned, set query over if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { setQueryStatus(pQuery, QUERY_OVER); @@ -5861,7 +5863,12 @@ _over: tfree(tagCond); tfree(tbnameCond); taosArrayDestroy(pTableIdList); - + + if (code != TSDB_CODE_SUCCESS) { + tfree(*pQInfo); + *pQInfo = NULL; + } + // if failed to add ref for all meters in this query, abort current query return code; } @@ -5885,7 +5892,7 @@ void qTableQuery(qinfo_t qinfo) { } qTrace("QInfo:%p query task is launched", pQInfo); - + if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { buildTagQueryResult(pQInfo); // todo support the limit/offset } else if (pQInfo->runtimeEnv.stableQuery) { @@ -5893,7 +5900,7 @@ void qTableQuery(qinfo_t qinfo) { } else { tableQueryImpl(pQInfo); } - + sem_post(&pQInfo->dataReady); // vnodeDecRefCount(pQInfo); } @@ -5987,7 +5994,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co static void buildTagQueryResult(SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - + size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList); assert(num == 0 || num == 1); if (num == 0) { @@ -5996,44 +6003,44 @@ static void buildTagQueryResult(SQInfo* pQInfo) { SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); num = taosArrayGetSize(pa); - + assert(num == pQInfo->groupInfo.numOfTables); int16_t type, bytes; - + int32_t functionId = pQuery->pSelectExpr[0].base.functionId; if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id assert(pQuery->numOfOutput == 1); SExprInfo* pExprInfo = &pQuery->pSelectExpr[0]; - + int32_t rsize = pExprInfo->bytes; char* data = NULL; - + for(int32_t i = 0; i < num; ++i) { SGroupItem* item = taosArrayGet(pa, i); - + char* output = pQuery->sdata[0]->data + i * rsize; varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); output = varDataVal(output); *(int64_t*) output = item->id.uid; // memory align problem, todo serialize output += sizeof(item->id.uid); - + *(int32_t*) output = item->id.tid; output += sizeof(item->id.tid); - + *(int32_t*) output = pQInfo->vgId; output += sizeof(pQInfo->vgId); - + tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data); memcpy(output, data, bytes); } - + qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num); } else { // return only the tags|table name etc. for(int32_t i = 0; i < num; ++i) { SExprInfo* pExprInfo = pQuery->pSelectExpr; SGroupItem* item = taosArrayGet(pa, i); - + char* data = NULL; for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo check the return value, refactor codes @@ -6059,7 +6066,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) { pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, num); } - + pQuery->rec.rows = num; setQueryStatus(pQuery, QUERY_COMPLETED); } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index c41171a36e1fe4292ba059495dc01a39c1bc5782..ef2cb20171603dac96d97fbad1c9fa6a30054cfd 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -39,8 +39,8 @@ void vnodeInitReadFp(void) { int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { SVnodeObj *pVnode = (SVnodeObj *)param; - if (vnodeProcessReadMsgFp[msgType] == NULL) - return TSDB_CODE_MSG_NOT_PROCESSED; + if (vnodeProcessReadMsgFp[msgType] == NULL) + return TSDB_CODE_MSG_NOT_PROCESSED; if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_INVALID_VGROUP_ID; @@ -53,26 +53,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont memset(pRet, 0, sizeof(SRspRet)); int32_t code = TSDB_CODE_SUCCESS; - + qinfo_t pQInfo = NULL; if (contLen != 0) { pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); - + SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->code = pRet->code; - + pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); } else { + assert(pCont != NULL); pQInfo = pCont; code = TSDB_CODE_ACTION_IN_PROGRESS; } - qTableQuery(pQInfo); // do execute query - + if (pQInfo != NULL) { + qTableQuery(pQInfo); // do execute query + } + return code; }