提交 ef58297a 编写于 作者: H Haojun Liao

[td-225] refactor qhandle mgmt

上级 aaaf7c21
...@@ -94,10 +94,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -94,10 +94,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
qinfo_t pQInfo = NULL;
void** handle = NULL; void** handle = NULL;
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
...@@ -116,19 +116,19 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -116,19 +116,19 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
qKillQuery(pQInfo); qKillQuery(pQInfo);
qKillQuery(pQInfo); qKillQuery(pQInfo);
pQInfo = NULL;
} else { } else {
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle)); pRsp->qhandle = htobe64((uint64_t) (handle));
} }
pQInfo = NULL;
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle); vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// NOTE: there two refcount, needs to kill twice // NOTE: there two refcount, needs to kill twice
// query has not been put into qhandle pool, kill it directly. // query has not been put into qhandle pool, kill it directly.
qKillQuery(pQInfo); qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return pRsp->code; return pRsp->code;
} }
...@@ -139,16 +139,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -139,16 +139,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
pQInfo = *(void**)(pCont); handle = qAcquireQInfo(pVnode->qMgmt, (void**) pCont);
handle = pCont; if (handle == NULL) {
code = TSDB_CODE_VND_ACTION_IN_PROGRESS; vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", *(void**) pCont, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo); } else {
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, *(void**) pCont);
code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
}
} }
if (pQInfo != NULL) { if (handle != NULL) {
qTableQuery(pQInfo); // do execute query qTableQuery(*handle); // do execute query
assert(handle != NULL);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
...@@ -160,57 +162,64 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -160,57 +162,64 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
SRetrieveTableMsg *pRetrieve = pCont; SRetrieveTableMsg *pRetrieve = pCont;
void **pQInfo = (void*) htobe64(pRetrieve->qhandle); pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free); pRetrieve->free = htons(pRetrieve->free);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *pQInfo); vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *(void**) pRetrieve->qhandle);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t ret = 0;
void** handle = qAcquireQInfo(pVnode->qMgmt, pQInfo); int32_t code = TSDB_CODE_SUCCESS;
if (handle == NULL || handle != pQInfo) { void** handle = qAcquireQInfo(pVnode->qMgmt, (void**) pRetrieve->qhandle);
ret = TSDB_CODE_QRY_INVALID_QHANDLE; if (handle == NULL || handle != (void**) pRetrieve->qhandle) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, *(void**) pRetrieve->qhandle);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->completed = true;
pRsp->useconds = 0;
return code;
} }
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
if (ret == TSDB_CODE_SUCCESS) { vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->completed = true;
pRsp->useconds = 0;
} else { // todo handle error
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
}
return ret;
}
int32_t code = qRetrieveQueryResultInfo(*pQInfo);
if (code != TSDB_CODE_SUCCESS || ret != TSDB_CODE_SUCCESS) {
//TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->completed = true;
pRsp->useconds = 0;
} else { return code;
// todo check code and handle error in build result set }
code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
bool freeHandle = true;
if (qHasMoreResultsToRetrieve(*handle)) { code = qRetrieveQueryResultInfo(*handle);
dnodePutItemIntoReadQueue(pVnode, handle); if (code != TSDB_CODE_SUCCESS) {
pRet->qhandle = handle; //TODO handle malloc failure
code = TSDB_CODE_SUCCESS; pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
} else { // no further execution invoked, release the ref to vnode memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); } else { // if failed to dump result, free qhandle immediately
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
if (qHasMoreResultsToRetrieve(*handle)) {
dnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = handle;
freeHandle = false;
}
} }
} }
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册