diff --git a/src/inc/query.h b/src/inc/query.h index 88badc2d7b5a849114ee4437855d7cfe1c51c1d8..c648270b21cbf906b21e1f43343e4f9cf6864be3 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -87,8 +87,8 @@ int32_t qKillQuery(qinfo_t qinfo); void* qOpenQueryMgmt(int32_t vgId); void qSetQueryMgmtClosed(void* pExecutor); void qCleanupQueryMgmt(void* pExecutor); -void** qRegisterQInfo(void* pMgmt, void* qInfo); -void** qAcquireQInfo(void* pMgmt, void** key); +void** qRegisterQInfo(void* pMgmt, uint64_t qInfo); +void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree); #ifdef __cplusplus diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 8c987b6ef18750c2f422bd53684cc6fcd89ef0c7..ac89d1dabb44705711bdc4e2468e4d0ec330aa54 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -473,7 +473,7 @@ typedef struct { typedef struct { int32_t code; - uint64_t qhandle; + uint64_t qhandle; // query handle } SQueryTableRsp; typedef struct { @@ -486,7 +486,7 @@ typedef struct SRetrieveTableRsp { int32_t numOfRows; int8_t completed; // all results are returned to client int16_t precision; - int64_t offset; // updated offset value for multi-vnode projection query + int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; char data[]; } SRetrieveTableRsp; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ae41ac3e9f8b5e4b849b3056f4227635eaa394cb..b550106c7047191c22287f6bbdbda875df75dc7d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6517,11 +6517,13 @@ void qCleanupQueryMgmt(void* pQMgmt) { qDebug("vgId:%d querymgmt cleanup completed", vgId); } -void** qRegisterQInfo(void* pMgmt, void* qInfo) { +void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { if (pMgmt == NULL) { return NULL; } + const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2; + SQueryMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { return NULL; @@ -6533,21 +6535,23 @@ void** qRegisterQInfo(void* pMgmt, void* qInfo) { return NULL; } else { - void** handle = taosCachePut(pQueryMgmt->qinfoPool, qInfo, POINTER_BYTES, &qInfo, POINTER_BYTES, tsShellActivityTimer*2); + uint64_t handleVal = (uint64_t) qInfo; + + void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(int64_t), &qInfo, POINTER_BYTES, DEFAULT_QHANDLE_LIFE_SPAN); pthread_mutex_unlock(&pQueryMgmt->lock); return handle; } } -void** qAcquireQInfo(void* pMgmt, void** key) { +void** qAcquireQInfo(void* pMgmt, uint64_t key) { SQueryMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) { return NULL; } - void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, key, POINTER_BYTES); + void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(uint64_t)); if (handle == NULL || *handle == NULL) { return NULL; } else { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index fa65a56c81913c7445a06a8aed8dee2e3deef85f..f054ae390410dc0041825cc8c421b0664dba727d 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -61,7 +61,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { } static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { - void * pCont = pReadMsg->pCont; + void *pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; SRspRet *pRet = &pReadMsg->rspRet; @@ -74,19 +74,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - void* handle = NULL; - if ((void**) killQueryMsg->qhandle != NULL) { - handle = *(void**) killQueryMsg->qhandle; - } - - vWarn("QInfo:%p connection %p broken, kill query", handle, pReadMsg->rpcMsg.handle); + vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); - void** qhandle = qAcquireQInfo(pVnode->qMgmt, (void**) killQueryMsg->qhandle); + void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle); if (qhandle == NULL || *qhandle == NULL) { vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); } else { - assert(qhandle == (void**) killQueryMsg->qhandle); + assert(*qhandle == (void*) killQueryMsg->qhandle); qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); } @@ -110,7 +105,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // current connect is broken if (code == TSDB_CODE_SUCCESS) { - handle = qRegisterQInfo(pVnode->qMgmt, pQInfo); + handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); if (handle == NULL) { // failed to register qhandle pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; @@ -118,11 +113,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { qKillQuery(pQInfo); } else { assert(*handle == pQInfo); - pRsp->qhandle = htobe64((uint64_t) (handle)); + pRsp->qhandle = htobe64((uint64_t) pQInfo); } pQInfo = NULL; - if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; @@ -136,18 +131,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { assert(pQInfo == NULL); } if (handle != NULL) { - dnodePutItemIntoReadQueue(pVnode, handle); + dnodePutItemIntoReadQueue(pVnode, *handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo); } else { assert(pCont != NULL); - handle = qAcquireQInfo(pVnode->qMgmt, (void**) pCont); + handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont); if (handle == NULL) { - vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", *(void**) pCont, pReadMsg->rpcMsg.handle); + vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle); code = TSDB_CODE_QRY_INVALID_QHANDLE; } else { - vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, *(void**) pCont); + vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, (void*) pCont); code = TSDB_CODE_VND_ACTION_IN_PROGRESS; qTableQuery(*handle); // do execute query } @@ -169,10 +164,10 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet, 0, sizeof(SRspRet)); int32_t code = TSDB_CODE_SUCCESS; - void** handle = qAcquireQInfo(pVnode->qMgmt, (void**) pRetrieve->qhandle); - if (handle == NULL || handle != (void**) pRetrieve->qhandle) { + void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); + if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { code = TSDB_CODE_QRY_INVALID_QHANDLE; - vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, *(void**) pRetrieve->qhandle); + vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); @@ -180,8 +175,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); SRetrieveTableRsp* pRsp = pRet->rsp; pRsp->numOfRows = 0; - pRsp->completed = true; pRsp->useconds = 0; + pRsp->completed = true; return code; } @@ -211,8 +206,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } else { // if failed to dump result, free qhandle immediately if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { if (qHasMoreResultsToRetrieve(*handle)) { - dnodePutItemIntoReadQueue(pVnode, handle); - pRet->qhandle = handle; + dnodePutItemIntoReadQueue(pVnode, *handle); + pRet->qhandle = *handle; freeHandle = false; } }