提交 b3165d41 编写于 作者: H Haojun Liao

[td-225]

上级 1e8461e6
...@@ -206,6 +206,7 @@ typedef struct SQInfo { ...@@ -206,6 +206,7 @@ typedef struct SQInfo {
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads pthread_mutex_t lock; // used to synchronize the rsp/query threads
tsem_t ready;
int32_t dataReady; // denote if query result is ready or not int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context void* rspContext; // response context
} SQInfo; } SQInfo;
......
...@@ -6699,6 +6699,8 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -6699,6 +6699,8 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
pQInfo->owner = 0; pQInfo->owner = 0;
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
tsem_post(&pQInfo->ready);
return buildRes; return buildRes;
} }
...@@ -6767,13 +6769,16 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -6767,13 +6769,16 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
} }
*buildRes = false; *buildRes = false;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
return pQInfo->code; return pQInfo->code;
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
#if 0
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pthread_mutex_lock(&pQInfo->lock); pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL); assert(pQInfo->rspContext == NULL);
...@@ -6790,6 +6795,12 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -6790,6 +6795,12 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
code = pQInfo->code; code = pQInfo->code;
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
#else
tsem_wait(&pQInfo->ready);
*buildRes = true;
code = pQInfo->code;
#endif
return code; return code;
} }
......
...@@ -14,7 +14,8 @@ ...@@ -14,7 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
//#include <dnode.h> #define _NON_BLOCKING_RETRIEVE 0
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -206,6 +207,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -206,6 +207,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
#if _NON_BLOCKING_RETRIEVE
bool freehandle = false; bool freehandle = false;
bool buildRes = qTableQuery(*qhandle); // do execute query bool buildRes = qTableQuery(*qhandle); // do execute query
...@@ -235,6 +238,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -235,6 +238,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (freehandle || (!buildRes)) { if (freehandle || (!buildRes)) {
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
} }
#else
qTableQuery(*qhandle); // do execute query
#endif
} }
return code; return code;
...@@ -294,12 +300,15 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -294,12 +300,15 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true; freeHandle = true;
} else { // result is not ready, return immediately } else { // result is not ready, return immediately
assert(buildRes == true);
#if _NON_BLOCKING_RETRIEVE
if (!buildRes) { if (!buildRes) {
assert(pReadMsg->rpcMsg.handle != NULL); assert(pReadMsg->rpcMsg.handle != NULL);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
return TSDB_CODE_QRY_NOT_READY; return TSDB_CODE_QRY_NOT_READY;
} }
#endif
// ahandle is the sqlObj pointer // ahandle is the sqlObj pointer
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle); code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册