提交 1a9fce81 编写于 作者: H Haojun Liao

[td-225] avoid the retrieved thread being blocked.

上级 c7c6b8c1
...@@ -208,9 +208,13 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -208,9 +208,13 @@ static void *dnodeProcessReadQueue(void *param) {
if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
} else {
if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
} else { } else {
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
} }
}
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
} }
......
...@@ -28,7 +28,7 @@ typedef void* qinfo_t; ...@@ -28,7 +28,7 @@ typedef void* qinfo_t;
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, qinfo_t* qinfo); int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
/** /**
...@@ -38,7 +38,10 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs ...@@ -38,7 +38,10 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs
* @param qinfo * @param qinfo
* @return * @return
*/ */
void qTableQuery(qinfo_t qinfo); bool qTableQuery(qinfo_t qinfo);
void* pGetRspMsg(qinfo_t qinfo);
/** /**
* Retrieve the produced results information, if current query is not paused or completed, * Retrieve the produced results information, if current query is not paused or completed,
...@@ -48,7 +51,7 @@ void qTableQuery(qinfo_t qinfo); ...@@ -48,7 +51,7 @@ void qTableQuery(qinfo_t qinfo);
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo); int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext);
/** /**
* *
...@@ -60,7 +63,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo); ...@@ -60,7 +63,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo);
* @param contLen payload length * @param contLen payload length
* @return * @return
*/ */
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen); int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/** /**
* Decide if more results will be produced or not, NOTE: this function will increase the ref count of QInfo, * Decide if more results will be produced or not, NOTE: this function will increase the ref count of QInfo,
......
...@@ -211,6 +211,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_APP_ERROR, 0, 0x0704, "query app ...@@ -211,6 +211,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_APP_ERROR, 0, 0x0704, "query app
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "query duplicated join key") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "query duplicated join key")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXCEED_TAGS_LIMIT, 0, 0x0706, "query tag conditon too many") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXCEED_TAGS_LIMIT, 0, 0x0706, "query tag conditon too many")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "query not ready") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "query not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "query should response")
// grant // grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "grant expired") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "grant expired")
......
...@@ -177,13 +177,18 @@ typedef struct SQueryRuntimeEnv { ...@@ -177,13 +177,18 @@ typedef struct SQueryRuntimeEnv {
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
enum {
QUERY_RESULT_NOT_READY = 1,
QUERY_RESULT_READY = 2,
};
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
sem_t dataReady; // sem_t dataReady;
void* tsdb; void* tsdb;
void* param;
int32_t vgId; int32_t vgId;
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list> STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
...@@ -202,6 +207,9 @@ typedef struct SQInfo { ...@@ -202,6 +207,9 @@ typedef struct SQInfo {
int32_t numOfGroupResultPages; int32_t numOfGroupResultPages;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads
int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context
} SQInfo; } SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -5894,16 +5894,11 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5894,16 +5894,11 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
} }
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo));
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
pthread_mutex_init(&pQInfo->lock, NULL);
pQuery->pos = -1; pQuery->pos = -1;
pQuery->window = pQueryMsg->window; pQuery->window = pQueryMsg->window;
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
int32_t code = TAOS_SYSTEM_ERROR(errno);
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, tstrerror(code));
goto _cleanup;
}
colIdCheck(pQuery); colIdCheck(pQuery);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
...@@ -5943,7 +5938,7 @@ static bool isValidQInfo(void *param) { ...@@ -5943,7 +5938,7 @@ static bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo); return (sig == (uint64_t)pQInfo);
} }
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param) { static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
...@@ -5966,8 +5961,6 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ ...@@ -5966,8 +5961,6 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pQInfo->param = param;
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
...@@ -6012,7 +6005,6 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -6012,7 +6005,6 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->sdata[col]); tfree(pQuery->sdata[col]);
} }
sem_destroy(&(pQInfo->dataReady));
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
...@@ -6164,7 +6156,7 @@ typedef struct SQueryMgmt { ...@@ -6164,7 +6156,7 @@ typedef struct SQueryMgmt {
pthread_mutex_t lock; pthread_mutex_t lock;
} SQueryMgmt; } SQueryMgmt;
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, qinfo_t* pQInfo) { int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo) {
assert(pQueryMsg != NULL && tsdb != NULL); assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -6260,7 +6252,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo ...@@ -6260,7 +6252,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
goto _over; goto _over;
} }
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param); code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery);
_over: _over:
free(tagCond); free(tagCond);
...@@ -6300,26 +6292,32 @@ void qDestroyQueryInfo(qinfo_t qHandle) { ...@@ -6300,26 +6292,32 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
freeQInfo(pQInfo); freeQInfo(pQInfo);
} }
void qTableQuery(qinfo_t qinfo) { static void setQueryResultReady(SQInfo* pQInfo) {
pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY;
pthread_mutex_unlock(&pQInfo->lock);
}
bool qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo) {
qDebug("QInfo:%p has been freed, no need to execute", pQInfo); qDebug("QInfo:%p has been freed, no need to execute", pQInfo);
return; return false;
} }
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
sem_post(&pQInfo->dataReady); setQueryResultReady(pQInfo);
return; return false;
} }
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED); setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
setQueryResultReady(pQInfo);
qDebug("QInfo:%p no table exists for query, abort", pQInfo); qDebug("QInfo:%p no table exists for query, abort", pQInfo);
sem_post(&pQInfo->dataReady); return false;
return;
} }
// error occurs, record the error code and return to client // error occurs, record the error code and return to client
...@@ -6327,8 +6325,9 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -6327,8 +6325,9 @@ void qTableQuery(qinfo_t qinfo) {
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret; pQInfo->code = ret;
qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code)); qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
sem_post(&pQInfo->dataReady);
return; setQueryResultReady(pQInfo);
return false;
} }
qDebug("QInfo:%p query task is launched", pQInfo); qDebug("QInfo:%p query task is launched", pQInfo);
...@@ -6353,10 +6352,23 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -6353,10 +6352,23 @@ void qTableQuery(qinfo_t qinfo) {
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
} }
sem_post(&pQInfo->dataReady); taosMsleep(1000);
// pQInfo->dataReady = QUERY_RESULT_READY;
bool buildRes = false;
pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY;
if (pQInfo->rspContext != NULL) {
buildRes = true;
}
pthread_mutex_unlock(&pQInfo->lock);
return buildRes;
// sem_post(&pQInfo->dataReady);
} }
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) { if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
...@@ -6369,10 +6381,20 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { ...@@ -6369,10 +6381,20 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
return pQInfo->code; return pQInfo->code;
} }
sem_wait(&pQInfo->dataReady); *buildRes = false;
pthread_mutex_lock(&pQInfo->lock);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows,
pQInfo->code); pQInfo->code);
} else {
pQInfo->rspContext = pRspContext;
}
pthread_mutex_unlock(&pQInfo->lock);
// sem_wait(&pQInfo->dataReady);
return pQInfo->code; return pQInfo->code;
} }
...@@ -6385,6 +6407,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { ...@@ -6385,6 +6407,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
} }
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
bool ret = false; bool ret = false;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
ret = false; ret = false;
...@@ -6403,7 +6426,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { ...@@ -6403,7 +6426,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
return ret; return ret;
} }
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen) { int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) { if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
...@@ -6413,8 +6436,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6413,8 +6436,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows); size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
size += sizeof(int32_t); size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo); size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo);
*contLen = size + sizeof(SRetrieveTableRsp); *contLen = size + sizeof(SRetrieveTableRsp);
// todo proper handle failed to allocate memory, // todo proper handle failed to allocate memory,
...@@ -6423,6 +6448,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6423,6 +6448,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
if (*pRsp == NULL) { if (*pRsp == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
(*pRsp)->numOfRows = htonl(pQuery->rec.rows); (*pRsp)->numOfRows = htonl(pQuery->rec.rows);
int32_t code = pQInfo->code; int32_t code = pQInfo->code;
...@@ -6430,8 +6456,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6430,8 +6456,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
(*pRsp)->offset = htobe64(pQuery->limit.offset); (*pRsp)->offset = htobe64(pQuery->limit.offset);
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime); (*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
} else { } else {
(*pRsp)->offset = 0;
(*pRsp)->useconds = 0; (*pRsp)->useconds = 0;
(*pRsp)->offset = 0;
} }
(*pRsp)->precision = htons(pQuery->precision); (*pRsp)->precision = htons(pQuery->precision);
...@@ -6442,10 +6468,21 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6442,10 +6468,21 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
code = pQInfo->code; code = pQInfo->code;
} }
pQInfo->rspContext = NULL;
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
(*pRsp)->completed = 1; // notify no more result to client (*pRsp)->completed = 1; // notify no more result to client
} }
if (qHasMoreResultsToRetrieve(pQInfo)) {
*continueExec = true;
} else { // failed to dump result, free qhandle immediately
*continueExec = false;
qKillQuery(pQInfo);
qDestroyQueryInfo(pQInfo);
}
return code; return code;
} }
...@@ -6456,7 +6493,7 @@ int32_t qKillQuery(qinfo_t qinfo) { ...@@ -6456,7 +6493,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
sem_post(&pQInfo->dataReady); // sem_post(&pQInfo->dataReady);
setQueryKilled(pQInfo); setQueryKilled(pQInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -66,11 +66,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -66,11 +66,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
} }
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle, void* handle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle; pRead->pCont = qhandle;
pRead->contLen = 0; pRead->contLen = 0;
pRead->rpcMsg.handle = handle;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
...@@ -110,7 +111,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -110,7 +111,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
...@@ -148,7 +149,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -148,7 +149,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) { if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
vnodePutItemIntoReadQueue(pVnode, *handle); vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle);
// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); // qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
...@@ -163,7 +164,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -163,7 +164,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} else { } else {
vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont); vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
code = TSDB_CODE_VND_ACTION_IN_PROGRESS; code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
qTableQuery(*handle); // do execute query bool buildRes = qTableQuery(*handle); // do execute query
if (buildRes) { // build result rsp
pRet = &pReadMsg->rspRet;
bool continueExec = false;
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) {
vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle);
pRet->qhandle = *handle;
}
} else { // todo handle error
}
code = TSDB_CODE_QRY_HAS_RSP;
}
} }
// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); // qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
...@@ -223,22 +240,29 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -223,22 +240,29 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} }
bool freeHandle = true; bool freeHandle = true;
code = qRetrieveQueryResultInfo(*handle); bool buildRes = false;
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
//TODO handle malloc failure //TODO handle malloc failure
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else { // if failed to dump result, free qhandle immediately } else {
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { // result is not ready, return immediately
if (qHasMoreResultsToRetrieve(*handle)) { if (!buildRes) {
vnodePutItemIntoReadQueue(pVnode, *handle); return TSDB_CODE_QRY_NOT_READY;
}
bool continueExec = false;
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) {
vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle);
pRet->qhandle = *handle; pRet->qhandle = *handle;
freeHandle = false; freeHandle = false;
} else {
qKillQuery(*handle);
qDestroyQueryInfo(*handle);
freeHandle = true;
} }
} else {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册