提交 5d6079e3 编写于 作者: H Haojun Liao

[td-255] refactor codes.

上级 228ea8ae
......@@ -6356,7 +6356,6 @@ bool qTableQuery(qinfo_t qinfo) {
pthread_mutex_unlock(&pQInfo->lock);
return buildRes;
// sem_post(&pQInfo->dataReady);
}
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext) {
......
......@@ -77,6 +77,39 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) {
bool continueExec = false;
int32_t code = TSDB_CODE_SUCCESS;
if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) {
vDebug("QInfo:%p add to query task queue for exec", handle);
vnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = handle;
*freeHandle = false;
} else {
vDebug("QInfo:%p exec completed", handle);
*freeHandle = true;
}
} else {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
*freeHandle = true;
}
return code;
}
static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->completed = true;
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void *pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen;
......@@ -161,37 +194,27 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
} else {
vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
bool freehandle = false;
bool buildRes = qTableQuery(*handle); // do execute query
if (buildRes) { // build result rsp
// build query rsp
if (buildRes) {
// update the connection info according to the retrieve connection
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
assert(pReadMsg->rpcMsg.handle != NULL);
vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
pRet = &pReadMsg->rspRet;
// code = TSDB_CODE_QRY_HAS_RSP;
bool continueExec = false;
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) {
vTrace("QInfo:%p add to queue for further exec", *handle);
vnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle;
// code = TSDB_CODE_SUCCESS;
} else {
vDebug("QInfo:%p query completed", *handle);
}
} else { // todo handle error
// todo test the error code case
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_HAS_RSP;
}
code = TSDB_CODE_QRY_HAS_RSP;
}
}
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freehandle);
}
}
return code;
......@@ -214,16 +237,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->useconds = 0;
pRsp->completed = true;
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
......@@ -232,15 +247,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->completed = true;
pRsp->useconds = 0;
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
......@@ -258,17 +265,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
return TSDB_CODE_QRY_NOT_READY;
}
bool continueExec = false;
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) {
vnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle;
freeHandle = false;
}
} else {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
}
code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
}
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册