diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 56559e9312f7a44c99b7b416f38c8c3454adc3b2..0e20c66305675e1bef80b26a7bfba8c2322c488b 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -284,7 +284,7 @@ typedef struct { char * pRsp; int32_t rspType; int32_t rspLen; - uint64_t qhandle; + uint64_t qId; int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable int32_t row; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 490f9e1291654da882efeb3d6614dd98e6478829..3249529b34580b519ef1c6dbe9bfe7e76e0cb7cc 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -160,8 +160,8 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) { - if (pRes->qhandle == 0 && numOfRows != 0) { + if ((pRes->qId == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) { + if (pRes->qId == 0 && numOfRows != 0) { tscError("qhandle is NULL"); } else { pRes->code = numOfRows; @@ -208,7 +208,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { pSql->fetchFp = fp; pSql->fp = tscAsyncFetchRowsProxy; - if (pRes->qhandle == 0) { + if (pRes->qId == 0) { tscError("qhandle is NULL"); pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; pSql->param = param; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 820572859e88540656b8ce42dc3a6e77467c1423..188ba29a97c244ce5c5027c459fabc303ed85c0f 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -309,7 +309,7 @@ TAOS_ROW tscFetchRow(void *param) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || + if (pRes->qId == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { return NULL; @@ -905,7 +905,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { * set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to * free allocated resources and remove the SqlObj from sql query linked list */ - pRes->qhandle = 0x1; + pRes->qId = 0x1; pRes->numOfRows = 0; } else if (pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE) { pRes->code = tscProcessShowCreateTable(pSql); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index a44b0c46ba7920c2483c4baa3b34bc37f06bdee7..8a301d1820d4fc88406c4fe5fa5fe708a3ee92d1 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1607,7 +1607,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) tscDestroyLocalMerger(pObj); } - pRes->qhandle = 1; // hack to pass the safety check in fetch_row function + pRes->qId = 1; // hack to pass the safety check in fetch_row function pRes->numOfRows = 0; pRes->row = 0; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 0669d6aeb095e2410e48b86b10c724a38a5728fa..4efaf7c2b516019052018c508306e4736322a284 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -903,7 +903,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pRes->qhandle = 0; + pRes->qId = 0; pRes->numOfRows = 1; strtolower(pSql->sqlstr, sql); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 9203dcfbbab8d4b512b490bf13ab91fe1b475c22..3b0e1b5775f13e619995ef8145ba4a09533f7a49 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -249,8 +249,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pQdesc->stime = htobe64(pSql->stime); pQdesc->queryId = htonl(pSql->queryId); //pQdesc->useconds = htobe64(pSql->res.useconds); - pQdesc->useconds = htobe64(now - pSql->stime); - pQdesc->qHandle = htobe64(pSql->res.qhandle); + pQdesc->useconds = htobe64(now - pSql->stime); // use local time instead of sever rsp elapsed time + pQdesc->qHandle = htobe64(pSql->res.qId); pHeartbeat->numOfQueries++; pQdesc++; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a8903d964b48988bd6a7536256b659b0f895d7d1..d6deb7d7e223eb7792642e6d1db6894af4fe4df0 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -508,7 +508,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); pRetrieveMsg->free = htons(pQueryInfo->type); - pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); + pRetrieveMsg->qId = htobe64(pSql->res.qId); // todo valid the vgroupId at the client side STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -520,7 +520,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); - tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qhandle); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qId); } else { int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); assert(vgIndex >= 0 && vgIndex < numOfVgroups); @@ -528,12 +528,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); - tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qhandle); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qId); } } else { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); - tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qhandle:%" PRIX64, pSql, pTableMeta->vgId, pSql->res.qhandle); + tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qId:%" PRIu64, pSql, pTableMeta->vgId, pSql->res.qId); } pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); @@ -1583,7 +1583,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload; - pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); + pRetrieveMsg->qId = htobe64(pSql->res.qId); pRetrieveMsg->free = htons(pQueryInfo->type); return TSDB_CODE_SUCCESS; @@ -2134,7 +2134,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { pShow = (SShowRsp *)pRes->pRsp; pShow->qhandle = htobe64(pShow->qhandle); - pRes->qhandle = pShow->qhandle; + pRes->qId = pShow->qhandle; tscResetForNextRetrieve(pRes); pMetaMsg = &(pShow->tableMeta); @@ -2316,11 +2316,12 @@ int tscProcessQueryRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp; - pQuery->qhandle = htobe64(pQuery->qhandle); - pRes->qhandle = pQuery->qhandle; + pQuery->qId = htobe64(pQuery->qId); + pRes->qId = pQuery->qId; pRes->data = NULL; tscResetForNextRetrieve(pRes); + tscDebug("%p query rsp received, qId:%"PRIu64, pSql, pRes->qId); return 0; } @@ -2378,7 +2379,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { } pRes->row = 0; - tscDebug("%p numOfRows:%d, offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed); + tscDebug("%p numOfRows:%d, offset:%" PRId64 ", complete:%d, qId:%"PRIu64, pSql, pRes->numOfRows, pRes->offset, + pRes->completed, pRes->qId); return 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 13539a9b197875210d76c86bd93ec986190c0c37..93d0e9fd092b7e91fc8028e30f73a13bdb02627d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -476,7 +476,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || + if (pRes->qId == 0 || pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { @@ -508,7 +508,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || + if (pRes->qId == 0 || pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { @@ -554,7 +554,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - if (pRes == NULL || pRes->qhandle == 0) { + if (pRes == NULL || pRes->qId == 0) { return true; } @@ -1050,7 +1050,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { * If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql() * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. */ - pRes->qhandle = 0; + pRes->qId = 0; free(str); if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 32257f5a7c723d04ec0f8200c889a68d78cf7824..1277a436a14c926b756ff99b0cd2e045f5dfed1f 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -149,7 +149,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* } strtolower(pSql->sqlstr, pSql->sqlstr); - pRes->qhandle = 0; + pRes->qId = 0; pRes->numOfRows = 1; code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); @@ -448,7 +448,7 @@ SSqlObj* recreateSqlObj(SSub* pSub) { return NULL; } - pRes->qhandle = 0; + pRes->qId = 0; pRes->numOfRows = 1; int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); @@ -546,7 +546,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { uint32_t type = pQueryInfo->type; tscFreeSqlResult(pSql); pRes->numOfRows = 1; - pRes->qhandle = 0; + pRes->qId = 0; pSql->cmd.command = TSDB_SQL_SELECT; pQueryInfo->type = type; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 76237ebb9518eb4dc831f417b5ee2adf16d2a3b9..299cf038057bf5ce80c449be41faa484e0a8ef93 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1840,7 +1840,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - pSql->res.qhandle = 0x1; + pSql->res.qId = 0x1; assert(pSql->res.numOfRows == 0); if (pSql->pSubs == NULL) { @@ -2440,7 +2440,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SColumnModel *pModel = NULL; SColumnModel *pFinalModel = NULL; - pRes->qhandle = 0x1; // hack the qhandle check + pRes->qId = 0x1; // hack the qhandle check const uint32_t nBufferSize = (1u << 16u); // 64KB @@ -2988,7 +2988,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); - if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode + if (pSql->res.qId == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode tscRetrieveFromDnodeCallBack(param, pSql, 0); } else { taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); diff --git a/src/inc/query.h b/src/inc/query.h index 77a12ebfc5c069bef42dcb6a339c650528df8987..c9dabcef5454d8322ab64eade53802a3e29a7b53 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -38,7 +38,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs * @param qinfo * @return */ -bool qTableQuery(qinfo_t qinfo); +bool qTableQuery(qinfo_t qinfo, uint64_t *qId); /** * Retrieve the produced results information, if current query is not paused or completed, diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b9f45bd0d97c4366d231c271b905d8feb00ad829..7b6d5680be5fd48c59cc306504fdd01235aea80d 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -514,12 +514,13 @@ typedef struct { typedef struct { int32_t code; - uint64_t qhandle; // query handle + union{uint64_t qhandle; uint64_t qId;}; // query handle } SQueryTableRsp; +// todo: the show handle should be replaced with id typedef struct { SMsgHead header; - uint64_t qhandle; + union{uint64_t qhandle; uint64_t qId;}; // query handle uint16_t free; } SRetrieveTableMsg; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 493bdbe5ded9f6235687d376625b8f6fc9861c8f..495bfa2384ae0680577775c83f63d098ebf4585f 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -245,7 +245,7 @@ typedef struct { * @param qinfo query info handle from query processor * @return */ -TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo, +TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, SMemRef *pRef); /** @@ -258,7 +258,7 @@ TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable * @param tableInfo table list. * @return */ -TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo, +TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, SMemRef *pRef); /** @@ -277,7 +277,7 @@ SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); * @return */ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, - void *qinfo, SMemRef *pRef); + uint64_t qId, SMemRef *pRef); /** diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index e46af8c29891cf52fbbac97c6afb42e0d0571215..ef43a9621a8298fff857a252a1b67a86988d786e 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -72,7 +72,7 @@ typedef struct SDiskbasedResultBuf { bool comp; // compressed before flushed to disk int32_t nextPos; // next page flush position - const void* handle; // for debug purpose + uint64_t qId; // for debug purpose SResultBufStatis statis; } SDiskbasedResultBuf; @@ -88,7 +88,7 @@ typedef struct SDiskbasedResultBuf { * @param handle * @return */ -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle); +int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId); /** * diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 31dfc350a39b642631c1e9c60f713e953d6898c3..cdd8b0707a86404597fc1379e0743173734a1db1 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -25,6 +25,7 @@ } while (0) #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) +#define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId) #define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.arg->argValue.i64:1) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 59e04c9cac4d8e6459022f0df0a7c4ec404e0ee2..7ba629cfc281bfff9be946bdeb052e15e7d77759 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1332,7 +1332,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn int16_t type = pColInfoData->info.type; if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { - qError("QInfo:%p group by not supported on double/float columns, abort", pRuntimeEnv->qinfo); + qError("QInfo:%"PRIu64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv)); return; } @@ -1715,7 +1715,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { } static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) { - qDebug("QInfo:%p setup runtime env", pRuntimeEnv->qinfo); + qDebug("QInfo:%"PRIu64" setup runtime env", GET_QID(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; pRuntimeEnv->prevGroupId = INT32_MIN; @@ -1748,7 +1748,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; } - qDebug("QInfo:%p init runtime environment completed", pRuntimeEnv->qinfo); + qDebug("QInfo:%"PRIu64" init runtime environment completed", GET_QID(pRuntimeEnv)); // group by normal column, sliding window query, interval query are handled by interval query processor // interval (down sampling operation) @@ -1847,7 +1847,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo* pQInfo = (SQInfo*) pRuntimeEnv->qinfo; - qDebug("QInfo:%p teardown runtime env", pQInfo); + qDebug("QInfo:%"PRIu64" teardown runtime env", pQInfo->qId); if (pRuntimeEnv->sasArray != NULL) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -1895,8 +1895,8 @@ bool isQueryKilled(SQInfo *pQInfo) { (!needBuildResAfterQueryComplete(pQInfo))) { assert(pQInfo->startExecTs != 0); - qDebug("QInfo:%p retrieve not arrive beyond %d sec, abort current query execution, start:%"PRId64", current:%d", pQInfo, 1, - pQInfo->startExecTs, taosGetTimestampSec()); + qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d sec, abort current query execution, start:%" PRId64 + ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); return true; } @@ -2066,11 +2066,11 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { /* * todo add more parameters to check soon.. */ -bool colIdCheck(SQuery *pQuery, void* qinfo) { +bool colIdCheck(SQuery *pQuery, uint64_t qId) { // load data column information is incorrect for (int32_t i = 0; i < pQuery->numOfCols - 1; ++i) { if (pQuery->colList[i].colId == pQuery->colList[i + 1].colId) { - qError("QInfo:%p invalid data load column for query", qinfo); + qError("QInfo:%"PRIu64" invalid data load column for query", qId); return false; } } @@ -2123,13 +2123,13 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo SQuery* pQuery = pQInfo->runtimeEnv.pQuery; // in case of point-interpolation query, use asc order scan - char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 + char msg[] = "QInfo:%"PRIu64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64; // todo handle the case the the order irrelevant query type mixed up with order critical query type // descending order query for last_row query if (isFirstLastRowQuery(pQuery)) { - qDebug("QInfo:%p scan order changed for last_row query, old:%d, new:%d", pQInfo, pQuery->order.order, TSDB_ORDER_ASC); + qDebug("QInfo:%"PRIu64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId, pQuery->order.order, TSDB_ORDER_ASC); pQuery->order.order = TSDB_ORDER_ASC; if (pQuery->window.skey > pQuery->window.ekey) { @@ -2151,7 +2151,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo if (isPointInterpoQuery(pQuery) && pQuery->interval.interval == 0) { if (!QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, pQInfo, "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); + qDebug(msg, pQInfo->qId, "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); } @@ -2162,7 +2162,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo if (pQuery->interval.interval == 0) { if (onlyFirstQuery(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, pQInfo, "only-first", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, + qDebug(msg, pQInfo->qId, "only-first", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); @@ -2172,7 +2172,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo pQuery->order.order = TSDB_ORDER_ASC; } else if (onlyLastQuery(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, pQInfo, "only-last", pQuery->order.order, TSDB_ORDER_DESC, pQuery->window.skey, + qDebug(msg, pQInfo->qId, "only-last", pQuery->order.order, TSDB_ORDER_DESC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); @@ -2186,7 +2186,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo if (stableQuery) { if (onlyFirstQuery(pQuery)) { if (!QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, pQInfo, "only-first stable", pQuery->order.order, TSDB_ORDER_ASC, + qDebug(msg, pQInfo->qId, "only-first stable", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); @@ -2196,7 +2196,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo pQuery->order.order = TSDB_ORDER_ASC; } else if (onlyLastQuery(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) { - qDebug(msg, pQInfo, "only-last stable", pQuery->order.order, TSDB_ORDER_DESC, + qDebug(msg, pQInfo->qId, "only-last stable", pQuery->order.order, TSDB_ORDER_DESC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); @@ -2604,7 +2604,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa SDataBlockInfo* pBlockInfo = &pBlock->info; if ((*status) == BLK_DATA_NO_NEEDED) { - qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey, + qDebug("QInfo:%"PRIu64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->discardBlocks += 1; } else if ((*status) == BLK_DATA_STATIS_NEEDED) { @@ -2632,7 +2632,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa (char*)&(pBlock->pBlockStatis[i].max)); if (!load) { // current block has been discard due to filter applied pCost->discardBlocks += 1; - qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, + qDebug("QInfo:%"PRIu64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); (*status) = BLK_DATA_DISCARD; return TSDB_CODE_SUCCESS; @@ -2644,7 +2644,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa // current block has been discard due to filter applied if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->discardBlocks += 1; - qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey, + qDebug("QInfo:%"PRIu64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); (*status) = BLK_DATA_DISCARD; return TSDB_CODE_SUCCESS; @@ -3311,10 +3311,10 @@ void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExpr int16_t tagType = pCtx[0].tag.nType; if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pRuntimeEnv->qinfo, + qDebug("QInfo:%"PRIu64" set tag value for join comparison, colId:%" PRId64 ", val:%s", GET_QID(pRuntimeEnv), pExprInfo->base.arg->argValue.i64, pCtx[0].tag.pz); } else { - qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pRuntimeEnv->qinfo, + qDebug("QInfo:%"PRIu64" set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, GET_QID(pRuntimeEnv), pExprInfo->base.arg->argValue.i64, pCtx[0].tag.i64); } } @@ -3334,9 +3334,9 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, // failed to find data with the specified tag value and vnodeId if (!tsBufIsValidElem(&elem)) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qError("QInfo:%p failed to find tag:%s in ts_comp", pRuntimeEnv->qinfo, pTag->pz); + qError("QInfo:%"PRIu64" failed to find tag:%s in ts_comp", GET_QID(pRuntimeEnv), pTag->pz); } else { - qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pRuntimeEnv->qinfo, pTag->i64); + qError("QInfo:%"PRIu64" failed to find tag:%" PRId64 " in ts_comp", GET_QID(pRuntimeEnv), pTag->i64); } return -1; @@ -3345,17 +3345,17 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, // Keep the cursor info of current table pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + qDebug("QInfo:%"PRIu64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_QID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } else { - qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + qDebug("QInfo:%"PRIu64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_QID(pRuntimeEnv), pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } } else { tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur); if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + qDebug("QInfo:%"PRIu64" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_QID(pRuntimeEnv), pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } else { - qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + qDebug("QInfo:%"PRIu64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_QID(pRuntimeEnv), pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } } @@ -3459,7 +3459,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* int32_t start = 0; int32_t step = -1; - qDebug("QInfo:%p start to copy data from windowResInfo to output buf", pRuntimeEnv->qinfo); + qDebug("QInfo:%"PRIu64" start to copy data from windowResInfo to output buf", GET_QID(pRuntimeEnv)); if (orderType == TSDB_ORDER_ASC) { start = pGroupResInfo->index; step = 1; @@ -3499,7 +3499,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* } } - qDebug("QInfo:%p copy data to query buf completed", pRuntimeEnv->qinfo); + qDebug("QInfo:%"PRIu64" copy data to query buf completed", GET_QID(pRuntimeEnv)); pBlock->info.rows = numOfResult; return 0; } @@ -3585,11 +3585,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data data += sizeof(STableIdInfo); total++; - qDebug("QInfo:%p set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo, item->tid, item->uid, item->key); + qDebug("QInfo:%"PRIu64" set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo->qId, item->tid, item->uid, item->key); item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, item); } - qDebug("QInfo:%p set %d subscribe info", pQInfo, total); + qDebug("QInfo:%"PRIu64" set %d subscribe info", pQInfo->qId, total); // Check if query is completed or not for stable query or normal table query respectively. if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) { setQueryStatus(pRuntimeEnv, QUERY_OVER); @@ -3628,12 +3628,12 @@ void queryCostStatis(SQInfo *pQInfo) { pSummary->numOfTimeWindows = 0; } - qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " + qDebug("QInfo:%"PRIu64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, - pQInfo, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, + pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); - qDebug("QInfo:%p :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo, pSummary->winInfoSize/1024.0, + qDebug("QInfo:%"PRIu64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); } @@ -3669,7 +3669,7 @@ void queryCostStatis(SQInfo *pQInfo) { // // int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); // -// qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, pRuntimeEnv->qinfo, +// qDebug("QInfo:%"PRIu64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QID(pRuntimeEnv), // pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey); //} @@ -3699,7 +3699,7 @@ void queryCostStatis(SQInfo *pQInfo) { // pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey; // pTableQueryInfo->lastKey += step; // -// qDebug("QInfo:%p skip rows:%d, offset:%" PRId64, pRuntimeEnv->qinfo, blockInfo.rows, +// qDebug("QInfo:%"PRIu64" skip rows:%d, offset:%" PRId64, GET_QID(pRuntimeEnv), blockInfo.rows, // pQuery->limit.offset); // } else { // find the appropriated start position in current block // updateOffsetVal(pRuntimeEnv, &blockInfo); @@ -3747,8 +3747,8 @@ void queryCostStatis(SQInfo *pQInfo) { // int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); // pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index // -// qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, -// pRuntimeEnv->qinfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, +// qDebug("QInfo:%"PRIu64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, +// GET_QID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, // pQuery->current->lastKey); // // return key; @@ -3903,7 +3903,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) terrno = TSDB_CODE_SUCCESS; if (isFirstLastRowQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo->qId, &pQuery->memRef); // update the query time window pQuery->window = cond.twindow; @@ -3924,9 +3924,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) } } } else if (isPointInterpoQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo->qId, &pQuery->memRef); } else { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo->qId, &pQuery->memRef); } return terrno; @@ -3969,7 +3969,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->stabledev = isStabledev(pQuery); pRuntimeEnv->prevResult = prevResult; - pRuntimeEnv->qinfo = pQInfo; setScanLimitationByResultBuffer(pQuery); @@ -4013,7 +4012,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts getIntermediateBufInfo(pRuntimeEnv, &ps, &pQuery->intermediateResultRowSize); int32_t TENMB = 1024*1024*10; - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4181,8 +4180,8 @@ static SSDataBlock* doTableScan(void* param) { pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey; } - qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey); + qDebug("QInfo:%"PRIu64" start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); } if (pTableScanInfo->reverseTimes > 0) { @@ -4191,8 +4190,8 @@ static SSDataBlock* doTableScan(void* param) { STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); - qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey); + qDebug("QInfo:%"PRIu64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); pRuntimeEnv->scanFlag = REVERSE_SCAN; @@ -5261,14 +5260,14 @@ static SSDataBlock* doTagScan(void* param) { count += 1; } - qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pRuntimeEnv->qinfo, count); + qDebug("QInfo:%"PRIu64" create (tableId, tag) info completed, rows:%d", GET_QID(pRuntimeEnv), count); } else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0); *(int64_t*)pColInfo->pData = pInfo->totalTables; count = 1; pOperator->status = OP_EXEC_DONE; - qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count); + qDebug("QInfo:%"PRIu64" create count(tbname) query, res:%d rows:1", GET_QID(pRuntimeEnv), count); } else { // return only the tags|table name etc. SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo @@ -5307,7 +5306,7 @@ static SSDataBlock* doTagScan(void* param) { pOperator->status = OP_EXEC_DONE; } - qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count); + qDebug("QInfo:%"PRIu64" create tag values results completed, rows:%d", GET_QID(pRuntimeEnv), count); } pRes->info.rows = count; @@ -5994,7 +5993,7 @@ SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex * return pGroupbyExpr; } -static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { +static int32_t createFilterInfo(SQuery *pQuery, uint64_t qId) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) { if (pQuery->colList[i].numOfFilters > 0) { pQuery->numOfFilterCols++; @@ -6030,13 +6029,13 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr; int32_t upper = pSingleColFilter->filterInfo.upperRelOptr; if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) { - qError("QInfo:%p invalid filter info", pQInfo); + qError("QInfo:%"PRIu64" invalid filter info", qId); return TSDB_CODE_QRY_INVALID_MSG; } pSingleColFilter->fp = getFilterOperator(lower, upper); if (pSingleColFilter->fp == NULL) { - qError("QInfo:%p invalid filter info", pQInfo); + qError("QInfo:%"PRIu64" invalid filter info", qId); return TSDB_CODE_QRY_INVALID_MSG; } @@ -6181,7 +6180,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr } doUpdateExprColumnIndex(pQuery); - int32_t ret = createFilterInfo(pQInfo, pQuery); + int32_t ret = createFilterInfo(pQuery, pQInfo->qId); if (ret != TSDB_CODE_SUCCESS) { goto _cleanup; } @@ -6256,7 +6255,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr } } - colIdCheck(pQuery, pQInfo); + colIdCheck(pQuery, pQInfo->qId); // todo refactor pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); @@ -6308,7 +6307,9 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p int32_t code = TSDB_CODE_SUCCESS; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + pRuntimeEnv->qinfo = pQInfo; + + SQuery *pQuery = pRuntimeEnv->pQuery; STSBuf *pTsBuf = NULL; if (pQueryMsg->tsLen > 0) { // open new file to save the result @@ -6330,7 +6331,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { - qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, + qDebug("QInfo:%"PRIu64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; @@ -6339,7 +6340,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p } if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { - qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); + qDebug("QInfo:%"PRIu64" no table qualified for tag filter, abort query", pQInfo->qId); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -6416,7 +6417,7 @@ void freeQInfo(SQInfo *pQInfo) { return; } - qDebug("QInfo:%p start to free QInfo", pQInfo); + qDebug("QInfo:%"PRIu64" start to free QInfo", pQInfo->qId); SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables); @@ -6465,7 +6466,7 @@ void freeQInfo(SQInfo *pQInfo) { taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); pQInfo->signature = 0; - qDebug("QInfo:%p QInfo is freed", pQInfo); + qDebug("QInfo:%"PRIu64" QInfo is freed", pQInfo->qId); tfree(pQInfo); } @@ -6485,7 +6486,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { off_t s = lseek(fileno(f), 0, SEEK_END); assert(s == pRuntimeEnv->outputBuf->info.rows); - qDebug("QInfo:%p ts comp data return, file:%p, size:%"PRId64, pQInfo, f, (uint64_t)s); + qDebug("QInfo:%"PRIu64" ts comp data return, file:%p, size:%"PRId64, pQInfo->qId, f, (uint64_t)s); if (fseek(f, 0, SEEK_SET) >= 0) { size_t sz = fread(data, 1, s, f); if(sz < s) { // todo handle error @@ -6516,11 +6517,11 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { } pRuntimeEnv->resultInfo.total += pRuntimeEnv->outputBuf->info.rows; - qDebug("QInfo:%p current numOfRes rows:%d, total:%" PRId64, pQInfo, + qDebug("QInfo:%"PRIu64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId, pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total); if (pQuery->limit.limit > 0 && pQuery->limit.limit == pRuntimeEnv->resultInfo.total) { - qDebug("QInfo:%p results limitation reached, limitation:%"PRId64, pQInfo, pQuery->limit.limit); + qDebug("QInfo:%"PRIu64" results limitation reached, limitation:%"PRId64, pQInfo->qId, pQuery->limit.limit); setQueryStatus(pRuntimeEnv, QUERY_OVER); } diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index d36664617244cb0059b4e291da88260fac9db225..e3326cc26bc4216d131211473e807a9845d49133 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, resetSlotInfo(pBucket); - int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL); + int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1); if (ret != TSDB_CODE_SUCCESS) { tMemBucketDestroy(pBucket); return NULL; diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index ed7f8c67194269b20e21809ff0948662f4f25147..c5dd6b3cac66cfa58f74e4ff0897761bd1704ce7 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -9,7 +9,7 @@ #define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle) { +int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId) { *pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf)); SDiskbasedResultBuf* pResBuf = *pResultBuf; @@ -24,7 +24,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa pResBuf->allocateId = -1; pResBuf->comp = true; pResBuf->file = NULL; - pResBuf->handle = handle; + pResBuf->qId = qId; pResBuf->fileSize = 0; // at least more than 2 pages must be in memory @@ -43,7 +43,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); - qDebug("QInfo:%p create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", handle, pResBuf->pageSize, + qDebug("QInfo:%"PRIu64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize, pResBuf->inMemPages, pResBuf->path); return TSDB_CODE_SUCCESS; @@ -406,13 +406,13 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { } if (pResultBuf->file != NULL) { - qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb", - pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0, + qDebug("QInfo:%"PRIu64" res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb", + pResultBuf->qId, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0, pResultBuf->fileSize/1024.0); fclose(pResultBuf->file); } else { - qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, no file created", pResultBuf->handle, + qDebug("QInfo:%"PRIu64" res output buffer closed, total:%.2f Kb, no file created", pResultBuf->qId, pResultBuf->totalBufSize/1024.0); } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 1e7d3c9b58d3594280edc294e2147e7c908b9d0f..da7dbcd501373f34f67e25595af2ea88c8d07f4a 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -468,7 +468,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes pTableQueryInfoList = malloc(POINTER_BYTES * size); if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) { - qError("QInfo:%p failed alloc memory", pRuntimeEnv->qinfo); + qError("QInfo:%"PRIu64" failed alloc memory", GET_QID(pRuntimeEnv)); code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _end; } @@ -540,7 +540,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes int64_t endt = taosGetTimestampMs(); - qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pRuntimeEnv->qinfo, + qDebug("QInfo:%"PRIu64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup, endt - startt); _end: @@ -567,13 +567,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu break; } - qDebug("QInfo:%p no result in group %d, continue", pRuntimeEnv->qinfo, pGroupResInfo->currentGroup); + qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup); cleanupGroupResInfo(pGroupResInfo); incNextGroup(pGroupResInfo); } int64_t elapsedTime = taosGetTimestampUs() - st; - qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo, + qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); // pQInfo->summary.firstStageMergeTime += elapsedTime; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index bba6a10df333d792fa0467548199a411ccd02e43..9d2fb70d9620f5f219b4b7206af18fe1ed9466dc 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -197,29 +197,30 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi return code; } -bool qTableQuery(qinfo_t qinfo) { +bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { SQInfo *pQInfo = (SQInfo *)qinfo; assert(pQInfo && pQInfo->signature == pQInfo); int64_t threadId = taosGetSelfPthreadId(); int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) { - qError("QInfo:%p qhandle is now executed by thread:%p", pQInfo, (void*) curOwner); + qError("QInfo:%"PRIu64"-%p qhandle is now executed by thread:%p", pQInfo->qId, pQInfo, (void*) curOwner); pQInfo->code = TSDB_CODE_QRY_IN_EXEC; return false; } + *qId = pQInfo->qId; pQInfo->startExecTs = taosGetTimestampSec(); if (isQueryKilled(pQInfo)) { - qDebug("QInfo:%p it is already killed, abort", pQInfo); + qDebug("QInfo:%"PRIu64" it is already killed, abort", pQInfo->qId); return doBuildResCheck(pQInfo); } SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { - qDebug("QInfo:%p no table exists for query, abort", pQInfo); + qDebug("QInfo:%"PRIu64" no table exists for query, abort", pQInfo->qId); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); return doBuildResCheck(pQInfo); } @@ -228,22 +229,22 @@ bool qTableQuery(qinfo_t qinfo) { int32_t ret = setjmp(pQInfo->runtimeEnv.env); if (ret != TSDB_CODE_SUCCESS) { pQInfo->code = ret; - qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code)); + qDebug("QInfo:%"PRIu64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code)); return doBuildResCheck(pQInfo); } - qDebug("QInfo:%p query task is launched", pQInfo); + qDebug("QInfo:%"PRIu64" query task is launched", pQInfo->qId); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); if (isQueryKilled(pQInfo)) { - qDebug("QInfo:%p query is killed", pQInfo); + qDebug("QInfo:%"PRIu64" query is killed", pQInfo->qId); } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { - qDebug("QInfo:%p over, %u tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, + qDebug("QInfo:%"PRIu64" over, %u tables queried, %"PRId64" rows are returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->resultInfo.total); } else { - qDebug("QInfo:%p query paused, %d rows returned, numOfTotal:%" PRId64 " rows", - pQInfo, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv)); + qDebug("QInfo:%"PRIu64" query paused, %d rows returned, numOfTotal:%" PRId64 " rows", + pQInfo->qId, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv)); } return doBuildResCheck(pQInfo); @@ -253,13 +254,13 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex SQInfo *pQInfo = (SQInfo *)qinfo; if (pQInfo == NULL || !isValidQInfo(pQInfo)) { - qError("QInfo:%p invalid qhandle", pQInfo); + qError("QInfo:%"PRIu64" invalid qhandle", pQInfo->qId); return TSDB_CODE_QRY_INVALID_QHANDLE; } *buildRes = false; if (IS_QUERY_KILLED(pQInfo)) { - qDebug("QInfo:%p query is killed, code:0x%08x", pQInfo, pQInfo->code); + qDebug("QInfo:%"PRIu64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code); return pQInfo->code; } @@ -279,11 +280,11 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex assert(pQInfo->rspContext == NULL); if (pQInfo->dataReady == QUERY_RESULT_READY) { *buildRes = true; - qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQuery->resultRowSize, + qDebug("QInfo:%"PRIu64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQuery->resultRowSize, GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code)); } else { *buildRes = false; - qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); + qDebug("QInfo:%"PRIu64" retrieve req set query return result after paused", pQInfo->qId); pQInfo->rspContext = pRspContext; assert(pQInfo->rspContext != NULL); } @@ -342,9 +343,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co // here current thread hold the refcount, so it is safe to free tsdbQueryHandle. *continueExec = false; (*pRsp)->completed = 1; // notify no more result to client + qDebug("QInfo:%"PRIu64" no more results to retrieve", pQInfo->qId); } else { *continueExec = true; - qDebug("QInfo:%p has more results to retrieve", pQInfo); + qDebug("QInfo:%"PRIu64" has more results to retrieve", pQInfo->qId); } // the memory should be freed if the code of pQInfo is not TSDB_CODE_SUCCESS @@ -397,7 +399,7 @@ void qDestroyQueryInfo(qinfo_t qHandle) { return; } - qDebug("QInfo:%p query completed", pQInfo); + qDebug("QInfo:%"PRIu64" query completed", pQInfo->qId); queryCostStatis(pQInfo); // print the query cost summary freeQInfo(pQInfo); } @@ -480,7 +482,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) { SQueryMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { - qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo); + qError("QInfo:%"PRIu64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo); terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } @@ -488,7 +490,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) { pthread_mutex_lock(&pQueryMgmt->lock); if (pQueryMgmt->closed) { pthread_mutex_unlock(&pQueryMgmt->lock); - qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo); + qError("QInfo:%"PRIu64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo); terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } else { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 648b6d3617a53b004d93add220d007fa228f7076..6c0137abf568c23363a88b679ff47180aeb6491f 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -111,7 +111,7 @@ typedef struct STsdbQueryHandle { bool loadExternalRow; // load time window external data rows bool currentLoadExternalRows; // current load external rows int32_t loadType; // block load type - void* qinfo; // query info handle, for debug purpose + uint64_t qId; // query info handle, for debug purpose int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows SDFileSet* pFileGroup; SFSIter fileIter; @@ -286,8 +286,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa } taosArrayPush(pTableCheckInfo, &info); - tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid, - info.tableId.tid, info.lastKey, pQueryHandle->qinfo); + tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %"PRIu64, pQueryHandle, info.tableId.uid, + info.tableId.tid, info.lastKey, pQueryHandle->qId); } } @@ -339,7 +339,7 @@ static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY s return pNew; } -static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, void* qinfo, SMemRef* pMemRef) { +static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) { STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); if (pQueryHandle == NULL) { goto out_of_memory; @@ -353,7 +353,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC pQueryHandle->cur.win = TSWINDOW_INITIALIZER; pQueryHandle->checkFiles = true; pQueryHandle->activeIndex = 0; // current active table index - pQueryHandle->qinfo = qinfo; + pQueryHandle->qId = qId; pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->allocSize = 0; pQueryHandle->locateStart = false; @@ -406,7 +406,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock); if (pQueryHandle->pDataCols == NULL) { - tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); + tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto out_of_memory; } @@ -422,8 +422,8 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC return NULL; } -TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo, SMemRef* pRef) { - STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo, pRef); +TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) { + STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef); STsdbMeta* pMeta = tsdbGetMeta(tsdb); assert(pMeta != NULL); @@ -440,7 +440,7 @@ TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STable tsdbMayTakeMemSnapshot(pQueryHandle, psTable); - tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); + tsdbDebug("%p total numOfTable:%" PRIzu " in query, %"PRIu64, pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qId); return (TsdbQueryHandleT) pQueryHandle; } @@ -512,7 +512,7 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); } -TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) { +TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { pCond->twindow = updateLastrowForEachGroup(groupList); // no qualified table @@ -520,7 +520,7 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable return NULL; } - STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pMemRef); + STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); int32_t code = checkForCachedLastRow(pQueryHandle, groupList); if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 terrno = code; @@ -581,10 +581,10 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr return pNew; } -TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pRef) { +TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pRef) { STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); - STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, pNew, qinfo, pRef); + STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, pRef); pQueryHandle->loadExternalRow = true; pQueryHandle->currentLoadExternalRows = true; @@ -651,9 +651,9 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SDataRow row = (SDataRow)SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p", + "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %"PRIu64, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pMem->keyFirst, pMem->keyLast, - pCheckInfo->lastKey, pMem->numOfRows, pHandle->qinfo); + pCheckInfo->lastKey, pMem->numOfRows, pHandle->qId); if (ASCENDING_TRAVERSE(order)) { assert(pCheckInfo->lastKey <= key); @@ -662,8 +662,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh } } else { - tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, - pHandle->qinfo); + tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %"PRIu64, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, + pHandle->qId); } if (!imemEmpty) { @@ -673,9 +673,9 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SDataRow row = (SDataRow)SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p", + "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %"PRIu64, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pIMem->keyFirst, pIMem->keyLast, - pCheckInfo->lastKey, pIMem->numOfRows, pHandle->qinfo); + pCheckInfo->lastKey, pIMem->numOfRows, pHandle->qId); if (ASCENDING_TRAVERSE(order)) { assert(pCheckInfo->lastKey <= key); @@ -683,8 +683,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh assert(pCheckInfo->lastKey >= key); } } else { - tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, - pHandle->qinfo); + tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %"PRIu64, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, + pHandle->qId); } return true; @@ -811,8 +811,8 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { } pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer - tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle, - pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo); + tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %"PRIu64, pHandle, + pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qId); // all data in mem are checked already. if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) || @@ -984,21 +984,21 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBloc STSchema *pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); int32_t code = tdInitDataCols(pQueryHandle->pDataCols, pSchema); if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); + tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _error; } code = tdInitDataCols(pQueryHandle->rhelper.pDCols[0], pSchema); if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo); + tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %"PRIu64, pQueryHandle, pQueryHandle->qId); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _error; } code = tdInitDataCols(pQueryHandle->rhelper.pDCols[1], pSchema); if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo); + tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %"PRIu64, pQueryHandle, pQueryHandle->qId); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _error; } @@ -1034,15 +1034,15 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBloc int64_t elapsedTime = (taosGetTimestampUs() - st); pQueryHandle->cost.blockLoadTime += elapsedTime; - tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %p", - pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo); + tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %"PRIu64, + pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qId); return TSDB_CODE_SUCCESS; _error: pBlock->numOfRows = 0; - tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %p", - pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pQueryHandle->qinfo); + tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %"PRIu64, + pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pQueryHandle->qId); return terrno; } @@ -1064,7 +1064,7 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p assert(cur->pos >= 0 && cur->pos <= binfo.rows); TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; - tsdbDebug("%p key in mem:%"PRId64", %p", pQueryHandle, key, pQueryHandle->qinfo); + tsdbDebug("%p key in mem:%"PRId64", %"PRIu64, pQueryHandle, key, pQueryHandle->qId); if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { @@ -1545,9 +1545,9 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pQueryHandle); - tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", + tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %"PRIu64, pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->mixBlock, cur->win.skey, - cur->win.ekey, cur->rows, pQueryHandle->qinfo); + cur->win.ekey, cur->rows, pQueryHandle->qId); } int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) { @@ -1599,9 +1599,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo); tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d," - "end:%d, %p", + "end:%d, %"PRIu64, pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, blockInfo.window.skey, blockInfo.window.ekey, - blockInfo.rows, cur->pos, endPos, pQueryHandle->qinfo); + blockInfo.rows, cur->pos, endPos, pQueryHandle->qId); // compared with the data from in-memory buffer, to generate the correct timestamp array list int32_t numOfRows = 0; @@ -1741,9 +1741,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pQueryHandle); - tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", + tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %"PRIu64, pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->mixBlock, cur->win.skey, - cur->win.ekey, cur->rows, pQueryHandle->qinfo); + cur->win.ekey, cur->rows, pQueryHandle->qId); } int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { @@ -1917,13 +1917,13 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks); cleanBlockOrderSupporter(&sup, numOfQualTables); - tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt, - pQueryHandle->qinfo); + tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %"PRIu64, pQueryHandle, cnt, + pQueryHandle->qId); return TSDB_CODE_SUCCESS; } - tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt, - numOfQualTables, pQueryHandle->qinfo); + tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %"PRIu64, pQueryHandle, cnt, + numOfQualTables, pQueryHandle->qId); assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 sup.numOfTables = numOfQualTables; @@ -1959,7 +1959,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO * } */ - tsdbDebug("%p %d data blocks sort completed, %p", pQueryHandle, cnt, pQueryHandle->qinfo); + tsdbDebug("%p %d data blocks sort completed, %"PRIu64, pQueryHandle, cnt, pQueryHandle->qId); cleanBlockOrderSupporter(&sup, numOfTables); free(pTree); @@ -2017,8 +2017,8 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); - tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle, - pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo); + tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %"PRIu64, pQueryHandle, + pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId); pQueryHandle->pFileGroup = NULL; assert(pQueryHandle->numOfBlocks == 0); break; @@ -2041,8 +2041,8 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist break; } - tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, numOfTables, - pQueryHandle->pFileGroup->fid, pQueryHandle->qinfo); + tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %"PRIu64, pQueryHandle, numOfBlocks, numOfTables, + pQueryHandle->pFileGroup->fid, pQueryHandle->qId); assert(numOfBlocks >= 0); if (numOfBlocks == 0) { @@ -2133,8 +2133,8 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); - tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle, - pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo); + tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %"PRIu64, pQueryHandle, + pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId); pQueryHandle->pFileGroup = NULL; break; } @@ -2157,8 +2157,8 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist break; } - tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, numOfTables, - pQueryHandle->pFileGroup->fid, pQueryHandle->qinfo); + tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %"PRIu64, pQueryHandle, numOfBlocks, numOfTables, + pQueryHandle->pFileGroup->fid, pQueryHandle->qId); if (numOfBlocks == 0) { continue; @@ -2205,8 +2205,8 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists if ((!cur->mixBlock) || cur->blockCompleted) { // all data blocks in current file has been checked already, try next file if exists } else { - tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos, - pQueryHandle->qinfo); + tsdbDebug("%p continue in current data block, index:%d, pos:%d, %"PRIu64, pQueryHandle, cur->slot, cur->pos, + pQueryHandle->qId); int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); *exists = (pQueryHandle->realNumOfRows > 0); @@ -2334,8 +2334,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } int64_t elapsedTime = taosGetTimestampUs() - st; - tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %p", pQueryHandle, - elapsedTime, numOfRows, numOfCols, pQueryHandle->qinfo); + tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %"PRIu64, pQueryHandle, + elapsedTime, numOfRows, numOfCols, pQueryHandle->qId); return numOfRows; } @@ -2593,7 +2593,7 @@ static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SM memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo)); } - pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pMemRef); + pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qId, pMemRef); tfree(cond.colList); // current table, only one table @@ -3393,8 +3393,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); SIOCostSummary* pCost = &pQueryHandle->cost; - tsdbDebug("%p :io-cost summary: statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %p", - pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo); + tsdbDebug("%p :io-cost summary: statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %"PRIu64, + pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qId); tfree(pQueryHandle); } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index ef68499b889d33544e601eea9ddd8cd5efe3f534..8233c45632b1b08c6dfe24a1a692f21379dc5036 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -25,7 +25,7 @@ static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SV static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); -static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); +static int32_t vnodeNotifyCurrentQhandle(void* handle, uint64_t qId, void* qhandle, int32_t vgId); int32_t vnodeInitRead(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; @@ -167,7 +167,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void * @param ahandle sqlObj address at client side * @return */ -static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle, void *ahandle) { +static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, uint64_t qId, void **handle, bool *freeHandle, void *ahandle) { bool continueExec = false; int32_t code = TSDB_CODE_SUCCESS; @@ -183,7 +183,7 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, } } else { *freeHandle = true; - vTrace("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); + vTrace("QInfo:%"PRIu64"-%p exec completed, free handle:%d", qId, *handle, *freeHandle); } } else { SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); @@ -220,27 +220,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { vError("error rpc msg in query, %s", tstrerror(pRead->code)); } -// assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL); -// if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { -// SCancelQueryMsg *pMsg = (SCancelQueryMsg *)pRead->pCont; -//// pMsg->free = htons(killQueryMsg->free); -// pMsg->qhandle = htobe64(pMsg->qhandle); -// -// vWarn("QInfo:%p connection %p broken, kill query", (void *)pMsg->qhandle, pRead->rpcHandle); -//// assert(pRead->contLen > 0 && pMsg->free == 1); -// -// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pMsg->qhandle); -// if (qhandle == NULL || *qhandle == NULL) { -// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pMsg->qhandle, pRead->rpcHandle); -// } else { -// assert(*qhandle == (void *)pMsg->qhandle); -// -// qKillQuery(*qhandle); -// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); -// } -// -// return TSDB_CODE_TSC_QUERY_CANCELLED; -// } int32_t code = TSDB_CODE_SUCCESS; void ** handle = NULL; @@ -274,7 +253,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } if (handle != NULL && - vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vnodeNotifyCurrentQhandle(pRead->rpcHandle, qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%"PRIu64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle, pRead->rpcHandle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; @@ -297,16 +276,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } else { assert(pCont != NULL); void **qhandle = (void **)pRead->qhandle; + uint64_t qId = 0; vTrace("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); // In the retrieve blocking model, only 50% CPU will be used in query processing if (tsRetrieveBlockingModel) { - qTableQuery(*qhandle); // do execute query + qTableQuery(*qhandle, &qId); // do execute query qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false); } else { bool freehandle = false; - bool buildRes = qTableQuery(*qhandle); // do execute query + bool buildRes = qTableQuery(*qhandle, &qId); // do execute query // build query rsp, the retrieve request has reached here already if (buildRes) { @@ -318,7 +298,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { pRead->rpcHandle); // set the real rsp error code - pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pRead->rpcHandle); + pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qId, qhandle, &freehandle, pRead->rpcHandle); // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client code = TSDB_CODE_QRY_HAS_RSP; @@ -348,32 +328,32 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { SRetrieveTableMsg *pRetrieve = pCont; pRetrieve->free = htons(pRetrieve->free); - pRetrieve->qhandle = htobe64(pRetrieve->qhandle); + pRetrieve->qId = htobe64(pRetrieve->qId); - vTrace("vgId:%d, QInfo:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qhandle, + vTrace("vgId:%d, qId:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qId, pRetrieve->free, pRead->rpcHandle); memset(pRet, 0, sizeof(SRspRet)); terrno = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS; - void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); + void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qId); if (handle == NULL) { code = terrno; terrno = TSDB_CODE_SUCCESS; - } else if (!checkQIdEqual(*handle, pRetrieve->qhandle)) { + } else if (!checkQIdEqual(*handle, pRetrieve->qId)) { code = TSDB_CODE_QRY_INVALID_QHANDLE; } if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qhandle); + vError("vgId:%d, invalid qId in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qId); vnodeBuildNoResultQueryRsp(pRet); return code; } // kill current query and free corresponding resources. if (pRetrieve->free == 1) { - vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qhandle, *handle); + vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qId, *handle); qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -383,7 +363,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } // register the qhandle to connect to quit query immediate if connection is broken - if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qKillQuery(*handle); @@ -413,7 +393,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } // ahandle is the sqlObj pointer - code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pRead->rpcHandle); + code = vnodeDumpQueryResult(pRet, pVnode, pRetrieve->qId, handle, &freeHandle, pRead->rpcHandle); } // If qhandle is not added into vread queue, the query should be completed already or paused with error. @@ -427,13 +407,13 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // notify connection(handle) that current qhandle is created, if current connection from // client is broken, the query needs to be killed immediately. -int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { +int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int32_t vgId) { SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); pMsg->qhandle = htobe64((uint64_t)qhandle); pMsg->header.vgId = htonl(vgId); pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); - vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle); + vTrace("QInfo:%"PRIu64"-%p register qhandle to connect:%p", qId, qhandle, handle); return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); }