diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index b159ffc5a16a408ccde5c9e3cf9a0eb00c1c1a05..52a06277e360a583f8e56f27fed38db741815db0 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -217,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pReducer->numOfBuffer = numOfFlush; pReducer->numOfVnode = numOfBuffer; - + pReducer->pDesc = pDesc; tscTrace("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer); @@ -604,7 +604,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage tOrderDescriptor *pOrderDesc = pReducer->pDesc; SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo; - + // no group by columns, all data belongs to one group int32_t numOfCols = orderInfo->numOfCols; if (numOfCols <= 0) { @@ -627,7 +627,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage // only one row exists int32_t index = orderInfo->pData[0]; int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset; - + int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset); return ret == 0; } @@ -1040,7 +1040,7 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) { // the tag columns need to be set before all functions execution SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t j = 0; j < size; ++j) { SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j]; @@ -1182,7 +1182,7 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { */ bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) { int32_t ret = 0; // merge all result by default - + int16_t functionId = pLocalReducer->pCtx[0].functionId; // todo opt performance diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8e87e8fd7a2b6cd513cd405a38adbeff534c1819..5913200ff6d3f87e4a8edb1628806f4b04781e5f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -209,6 +209,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { tscError("%p sql is already released", pSql->signature); return; } + if (pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); return; @@ -217,10 +218,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - // tscTrace("%p msg:%s is received from server", pSql, taosMsg[rpcMsg->msgType]); - if (pObj->signature != pObj) { - tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, + if (pObj->signature != pObj || pSql->freed == 1) { + tscTrace("%p sqlObj needs to be released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, pObj, pObj->signature); tscFreeSqlObj(pSql); rpcFreeCont(rpcMsg->pCont); @@ -375,7 +375,7 @@ int tscProcessSql(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = NULL; - uint16_t type = 0; + uint32_t type = 0; if (pQueryInfo != NULL) { pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -424,7 +424,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { * sub-queries not correctly released and master sql object of super table query reaches an abnormal state. */ pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - //taosStopRpcConn(pSql->pSubs[i]->thandle); +// taosStopRpcConn(pSql->pSubs[i]->); } /* diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5f8b2f0dc13c4414bf29008d9bf7ca819f4392dc..6f043f186a9673840d6deffd57639bc76a74ff67 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -219,6 +219,11 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { sem_post(&pSql->rspSem); } +static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { + SSqlObj* pSql = (SSqlObj*) tres; + sem_post(&pSql->rspSem); +} + TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { @@ -369,11 +374,6 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { return (pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows; } -static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { - SSqlObj* pSql = (SSqlObj*) tres; - sem_post(&pSql->rspSem); -} - TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; if (pSql == NULL || pSql->signature != pSql) { @@ -476,7 +476,7 @@ int taos_select_db(TAOS *taos, const char *db) { } // send free message to vnode to free qhandle and corresponding resources in vnode -static void tscFreeQhandleInVnode(SSqlObj* pSql) { +static bool tscFreeQhandleInVnode(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -496,10 +496,19 @@ static void tscFreeQhandleInVnode(SSqlObj* pSql) { tscProcessSql(pSql); // in case of sync model query, waits for response and then goes on - if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) { - sem_wait(&pSql->rspSem); - } +// if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) { +// sem_wait(&pSql->rspSem); + +// tscFreeSqlObj(pSql); +// tscTrace("%p sqlObj is freed by app", pSql); +// } else { + tscTrace("%p sqlObj will be freed while rsp received", pSql); +// } + + return true; } + + return false; } void taos_free_result(TAOS_RES *res) { @@ -527,10 +536,10 @@ void taos_free_result(TAOS_RES *res) { } pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; - tscFreeQhandleInVnode(pSql); - tscFreeSqlObj(pSql); - - tscTrace("%p sql result is freed by app", pSql); + if (!tscFreeQhandleInVnode(pSql)) { + tscFreeSqlObj(pSql); + tscTrace("%p sqlObj is freed by app", pSql); + } } // todo should not be used in async query diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2701d2b572d9f893b265c6cc1262ee08d4bbaf6f..57634e73fd2dc0ed4f97ad5d53f0778d52dde945 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -134,24 +134,6 @@ void tscGetDBInfoFromMeterId(char* tableId, char* db) { db[0] = 0; } -//STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { -// if (pSidList == NULL) { -// tscError("illegal sidlist"); -// return 0; -// } -// -// if (idx < 0 || idx >= pSidList->numOfSids) { -// int32_t sidRange = (pSidList->numOfSids > 0) ? (pSidList->numOfSids - 1) : 0; -// -// tscError("illegal sidIdx:%d, reset to 0, sidIdx range:%d-%d", idx, 0, sidRange); -// idx = 0; -// } -// -// assert(pSidList->pSidExtInfoList[idx] >= 0); -// -// return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList); -//} - bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { if (pQueryInfo == NULL) { return false; @@ -176,8 +158,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { return false; } - if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) && - pQueryInfo->command == TSDB_SQL_SELECT) { + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->command == TSDB_SQL_SELECT) { return UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); } diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index aa8e83da380a752ca58857129b5ca17a38cc016b..340f6bc4f326bc7105b12b285c3edf60b88063ec 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -112,7 +112,7 @@ enum { #define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0) #define QUERY_IS_JOIN_QUERY(type) (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_QUERY)) -#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0) +#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0) #define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0) typedef struct SArithmeticSupport { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 57ab593b17aaaca8619c0b740799ce3dc26e33e7..aa602ed6610e3dd3c485bc1e87850fbe7986ab2b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -484,7 +484,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes // set time window for current result pWindowRes->window = *win; - + setWindowResOutputBufInitCtx(pRuntimeEnv, pWindowRes); return TSDB_CODE_SUCCESS; } @@ -685,14 +685,14 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow SDataBlockInfo *pDataBlockInfo, TSKEY *primaryKeys, __block_search_fn_t searchFn) { SQuery *pQuery = pRuntimeEnv->pQuery; - + // tumbling time window query, a special case of sliding time window query if (pQuery->slidingTime == pQuery->intervalTime) { // todo opt } - + getNextTimeWindow(pQuery, pNextWin); - + // next time window is not in current block if ((pNextWin->skey > pDataBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pNextWin->ekey < pDataBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { @@ -720,7 +720,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow */ if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNextWin->ekey) { TSKEY next = primaryKeys[startPos]; - + pNextWin->ekey += ((next - pNextWin->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; pNextWin->skey = pNextWin->ekey - pQuery->intervalTime + 1; } else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNextWin->skey) { @@ -729,7 +729,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow pNextWin->skey -= ((pNextWin->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; pNextWin->ekey = pNextWin->skey + pQuery->intervalTime - 1; } - + return startPos; } @@ -2072,7 +2072,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* pTableQueryInfo = pQuery->current; SQueryCostInfo* summary = &pRuntimeEnv->summary; - + qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pRuntimeEnv), pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, pTableQueryInfo->lastKey, pQuery->order.order); @@ -2113,7 +2113,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataStatis *pStatis = NULL; SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); - + // query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); @@ -2502,7 +2502,7 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { SResultInfo *pResultInfo = &pWindowRes->resultInfo[j]; assert(pResultInfo != NULL); - + if (pResultInfo->numOfRes > 0) { return pResultInfo->numOfRes; } @@ -2551,7 +2551,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { assert(pQInfo->numOfGroupResultPages == 0); return 0; } else if (numOfTables == 1) { // no need to merge results since only one table in each group - + } SCompSupporter cs = {pTableList, posList, pQInfo}; @@ -2640,7 +2640,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { #endif qTrace("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pQInfo, pQInfo->groupIndex, endt - startt); - + tfree(pTableList); tfree(posList); tfree(pTree); @@ -2870,12 +2870,12 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pQuery->pSelectExpr[j].base.functionId; pRuntimeEnv->pCtx[j].currentStage = 0; - + SResultInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); if (pResInfo->initialized) { continue; } - + aAggs[functionId].init(&pRuntimeEnv->pCtx[j]); } } @@ -3248,7 +3248,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); - + int32_t functionId = pQuery->pSelectExpr[i].base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; @@ -3268,7 +3268,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { SQuery *pQuery = pRuntimeEnv->pQuery; - + // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -3277,21 +3277,21 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * if (pCtx->resultInfo->complete) { continue; } - + pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); pCtx->currentStage = 0; - + int32_t functionId = pCtx->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - + /* * set the output buffer information and intermediate buffer * not all queries require the interResultBuf, such as COUNT */ pCtx->resultInfo->superTableQ = pRuntimeEnv->stableQuery; // set super table query flag - + if (!pCtx->resultInfo->initialized) { aAggs[functionId].init(pCtx); } @@ -4470,7 +4470,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { // query error occurred or query is killed, abort current execution if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { - qTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code); + qTrace("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); return; } @@ -4491,7 +4491,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { setQueryStatus(pQuery, QUERY_COMPLETED); if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { - qTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code); + qTrace("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); return; } @@ -4867,7 +4867,7 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx (pFuncMsg->functionId == TSDB_FUNC_COUNT && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX)) { continue; } - + return false; } } @@ -5851,6 +5851,8 @@ void qDestroyQueryInfo(qinfo_t qHandle) { } int16_t ref = T_REF_DEC(pQInfo); + qTrace("QInfo:%p dec refCount, value:%d", pQInfo, ref); + if (ref == 0) { doDestoryQueryInfo(pQInfo); } @@ -5994,25 +5996,25 @@ static void buildTagQueryResult(SQInfo* pQInfo) { size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); assert(numOfGroup == 0 || numOfGroup == 1); - + if (numOfGroup == 0) { return; } SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - + size_t num = taosArrayGetSize(pa); assert(num == pQInfo->groupInfo.numOfTables); - + int32_t count = 0; int32_t functionId = pQuery->pSelectExpr[0].base.functionId; if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id assert(pQuery->numOfOutput == 1); - + SExprInfo* pExprInfo = &pQuery->pSelectExpr[0]; int32_t rsize = pExprInfo->bytes; count = 0; - + while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) { int32_t i = pQInfo->tableIndex++; SGroupItem *item = taosArrayGet(pa, i); @@ -6054,12 +6056,12 @@ static void buildTagQueryResult(SQInfo* pQInfo) { } } } - + count += 1; } - + qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, count); - + } else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query *(int64_t*) pQuery->sdata[0]->data = num; @@ -6071,7 +6073,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) { SSchema tbnameSchema = tGetTableNameColumnSchema(); while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) { int32_t i = pQInfo->tableIndex++; - + SExprInfo* pExprInfo = pQuery->pSelectExpr; SGroupItem* item = taosArrayGet(pa, i); @@ -6086,7 +6088,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) { char* data = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, type, bytes); char* dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes; - + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (data == NULL) { setVardataNull(dst, type); @@ -6104,7 +6106,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) { } count += 1; } - + qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, count); } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 70424c3fd442abd49c492aceb0e29f988959bf53..7627a06c994d155309be6dfd7d2e6e919acc7664 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -393,7 +393,6 @@ void rpcSendResponse(const SRpcMsg *pRsp) { if ( pConn->inType == 0 || pConn->user[0] == 0 ) { tTrace("%s, connection is already released, rsp wont be sent", pConn->info); rpcUnlockConn(pConn); - rpcDecRef(pRpc); return; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index e74ec3b06ad418f454b920702fc42cf5c5d1b685..a62ad5bbd3bc112673a487ffd2451a152b55c47f 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -72,7 +72,7 @@ typedef struct STableCheckInfo { int32_t compSize; int32_t numOfBlocks; // number of qualified data blocks not the original blocks SDataCols* pDataCols; - + int32_t chosen; // indicate which iterator should move forward bool initBuf; // whether to initialize the in-memory skip list iterator or not SSkipListIterator* iter; // mem buffer skip list iterator @@ -311,14 +311,14 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { rmem = SL_GET_NODE_DATA(node); } } - + if (pCheckInfo->iiter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { rimem = SL_GET_NODE_DATA(node); } } - + if (rmem != NULL && rimem != NULL) { if (dataRowKey(rmem) < dataRowKey(rimem)) { pCheckInfo->chosen = 0; @@ -333,17 +333,17 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { return rimem; } } - + if (rmem != NULL) { pCheckInfo->chosen = 0; return rmem; } - + if (rimem != NULL) { pCheckInfo->chosen = 1; return rimem; } - + return NULL; } @@ -353,11 +353,11 @@ bool moveToNextRow(STableCheckInfo* pCheckInfo) { if (pCheckInfo->iter != NULL) { hasNext = tSkipListIterNext(pCheckInfo->iter); } - + if (hasNext) { return hasNext; } - + if (pCheckInfo->iiter != NULL) { return tSkipListIterGet(pCheckInfo->iiter) != NULL; } @@ -366,17 +366,17 @@ bool moveToNextRow(STableCheckInfo* pCheckInfo) { if (pCheckInfo->iiter != NULL) { hasNext = tSkipListIterNext(pCheckInfo->iiter); } - + if (hasNext) { return hasNext; } - + if (pCheckInfo->iter != NULL) { return tSkipListIterGet(pCheckInfo->iter) != NULL; } } } - + return hasNext; } @@ -395,7 +395,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { if (row == NULL) { return false; } - + pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer tsdbTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo); @@ -581,9 +581,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo bool blockLoaded = false; SArray* sa = getDefaultLoadColumns(pQueryHandle, true); - + int64_t st = taosGetTimestampUs(); - + if (pCheckInfo->pDataCols == NULL) { STsdbMeta* pMeta = tsdbGetMeta(pRepo); pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); @@ -603,13 +603,13 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; assert(pCols->numOfRows != 0); - + taosArrayDestroy(sa); tfree(data); - + int64_t et = taosGetTimestampUs() - st; tsdbTrace("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et); - + return blockLoaded; } @@ -681,7 +681,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { return false; } - + SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows); @@ -1212,7 +1212,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO *numOfAllocBlocks = numOfBlocks; int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - + SBlockOrderSupporter sup = {0}; sup.numOfTables = numOfTables; sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables); @@ -1256,17 +1256,17 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO } assert(numOfBlocks == cnt); - + // since there is only one table qualified, blocks are not sorted if (numOfQualTables == 1) { memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks); cleanBlockOrderSupporter(&sup, numOfQualTables); - + tsdbTrace("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt, pQueryHandle->qinfo); return TSDB_CODE_SUCCESS; } - + tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt, numOfQualTables, pQueryHandle->qinfo); @@ -1683,7 +1683,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int int64_t elapsedTime = taosGetTimestampUs() - st; tsdbTrace("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle, elapsedTime, numOfRows, numOfCols); - + return numOfRows; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 29f8d887d1183814c0b1a714e1ae22104b65cf16..d6227f4270be2e528baf429613e0e2e938a45cb9 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -68,6 +68,7 @@ static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) killQueryMsg->header.vgId = htonl(vgId); killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); + vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle); rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); } @@ -85,10 +86,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); + vWarn("QInfo:%p connection %p broken, kill query", killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); - qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle); - return TSDB_CODE_SUCCESS; + qKillQuery((qinfo_t) killQueryMsg->qhandle); + return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code } int32_t code = TSDB_CODE_SUCCESS;