提交 7502039d 编写于 作者: D dapan1121

fix bug

上级 6722b336
......@@ -28,7 +28,7 @@ typedef void* qinfo_t;
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t *qId);
/**
......@@ -88,9 +88,10 @@ void* qOpenQueryMgmt(int32_t vgId);
void qQueryMgmtNotifyClosed(void* pExecutor);
void qQueryMgmtReOpen(void *pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId);
#ifdef __cplusplus
}
......
......@@ -262,6 +262,7 @@ enum {
typedef struct SQInfo {
void* signature;
uint64_t qId;
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
void* tsdb;
......@@ -311,7 +312,7 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu
SColumnInfo* pTagCols);
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql);
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId);
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
......
......@@ -112,6 +112,9 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
uint64_t queryHandleId = 0;
int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2;
}
......@@ -6570,8 +6573,12 @@ static void calResultBufSize(SQuery* pQuery) {
}
}
FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId;
}
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql) {
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId) {
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput;
......@@ -6749,8 +6756,10 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
// todo refactor
pQInfo->runtimeEnv.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
*qId = pQInfo->qId;
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
return pQInfo;
_cleanup_qinfo:
......
......@@ -68,7 +68,7 @@ void freeParam(SQueryParam *param) {
tfree(param->prevResult);
}
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo) {
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t *qId) {
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -158,7 +158,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql);
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId);
param.sql = NULL;
param.pExprs = NULL;
......@@ -479,7 +479,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) {
if (pMgmt == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
......@@ -499,8 +499,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
} else {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
(getMaximumIdleDurationSec()*1000));
pthread_mutex_unlock(&pQueryMgmt->lock);
......
......@@ -247,7 +247,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) {
qinfo_t pQInfo = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
uint64_t qId = 0;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
......@@ -259,22 +260,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// current connect is broken
if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
handle = qRegisterQInfo(pVnode->qMgmt, qId, (uint64_t)pQInfo);
if (handle == NULL) { // failed to register qhandle
pRsp->code = terrno;
terrno = 0;
vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
vError("vgId:%d, QInfo:%"PRIu64 "-%p register qhandle failed, return to app, code:%s", pVnode->vgId, qId, (void *)pQInfo,
tstrerror(pRsp->code));
qDestroyQueryInfo(pQInfo); // destroy it directly
return pRsp->code;
} else {
assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t)pQInfo);
pRsp->qhandle = htobe64(qId);
}
if (handle != NULL &&
vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
vError("vgId:%d, QInfo:%"PRIu64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle,
pRead->rpcHandle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -285,7 +286,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
if (handle != NULL) {
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
vTrace("vgId:%d, QInfo:%"PRIu64 "-%p, dnode query msg disposed, create qhandle and returns to app", vgId, qId, *handle);
code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle);
if (code != TSDB_CODE_SUCCESS) {
pRsp->code = code;
......@@ -349,7 +350,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
pRetrieve->free = htons(pRetrieve->free);
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
vTrace("vgId:%d, QInfo:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qhandle,
pRetrieve->free, pRead->rpcHandle);
memset(pRet, 0, sizeof(SRspRet));
......@@ -360,19 +361,19 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (handle == NULL) {
code = terrno;
terrno = TSDB_CODE_SUCCESS;
} else if ((*handle) != (void *)pRetrieve->qhandle) {
} else if (!checkQIdEqual(*handle, pRetrieve->qhandle)) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
}
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle);
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qhandle, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -383,7 +384,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pRead->rpcHandle);
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册