diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5d570821cb6608fb382398e146766eb1f8aff284..fbf699297ad3e440fe1bba04ce17cc4c14ad6be2 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -206,6 +206,7 @@ typedef struct SQInfo { void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; pthread_mutex_t lock; // used to synchronize the rsp/query threads + tsem_t ready; int32_t dataReady; // denote if query result is ready or not void* rspContext; // response context } SQInfo; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 1070f65284305210a9ae81b568e0efe8717d84a2..b9a5295ed25116f23ff2b7e8bf13e68f42646744 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6699,6 +6699,8 @@ static bool doBuildResCheck(SQInfo* pQInfo) { pQInfo->owner = 0; pthread_mutex_unlock(&pQInfo->lock); + + tsem_post(&pQInfo->ready); return buildRes; } @@ -6767,13 +6769,16 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex } *buildRes = false; - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); return pQInfo->code; } int32_t code = TSDB_CODE_SUCCESS; + +#if 0 + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + pthread_mutex_lock(&pQInfo->lock); assert(pQInfo->rspContext == NULL); @@ -6790,6 +6795,12 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex code = pQInfo->code; pthread_mutex_unlock(&pQInfo->lock); +#else + tsem_wait(&pQInfo->ready); + *buildRes = true; + code = pQInfo->code; +#endif + return code; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index a610a8b0df2e43b11356cb4af9ebba5809ba9c62..ede5921dc1558338a555eafebab85093b3825819 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -14,7 +14,8 @@ */ #define _DEFAULT_SOURCE -//#include +#define _NON_BLOCKING_RETRIEVE 0 + #include "os.h" #include "tglobal.h" @@ -206,6 +207,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); + +#if _NON_BLOCKING_RETRIEVE bool freehandle = false; bool buildRes = qTableQuery(*qhandle); // do execute query @@ -235,6 +238,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (freehandle || (!buildRes)) { qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); } +#else + qTableQuery(*qhandle); // do execute query +#endif } return code; @@ -294,12 +300,15 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); freeHandle = true; } else { // result is not ready, return immediately + assert(buildRes == true); +#if _NON_BLOCKING_RETRIEVE if (!buildRes) { assert(pReadMsg->rpcMsg.handle != NULL); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); return TSDB_CODE_QRY_NOT_READY; } +#endif // ahandle is the sqlObj pointer code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);