提交 fe059415 编写于 作者: H Haojun Liao

[td-255]

上级 38417520
...@@ -6372,16 +6372,14 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -6372,16 +6372,14 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
return pQInfo->code; return pQInfo->code;
} }
*buildRes = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pthread_mutex_lock(&pQInfo->lock); pthread_mutex_lock(&pQInfo->lock);
if (pQInfo->dataReady == QUERY_RESULT_READY) { if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true; *buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows,
pQInfo->code); pQInfo->code);
} else { } else {
*buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
pQInfo->rspContext = pRspContext; pQInfo->rspContext = pRspContext;
} }
...@@ -6473,7 +6471,6 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6473,7 +6471,6 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
} else { // failed to dump result, free qhandle immediately } else { // failed to dump result, free qhandle immediately
*continueExec = false; *continueExec = false;
qKillQuery(pQInfo); qKillQuery(pQInfo);
qDestroyQueryInfo(pQInfo);
} }
return code; return code;
......
...@@ -419,7 +419,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -419,7 +419,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
// note: extend lifespan before dec ref count // note: extend lifespan before dec ref count
bool inTrashCan = pNode->inTrashCan; bool inTrashCan = pNode->inTrashCan;
if (pCacheObj->extendLifespan && (!inTrashCan)) { if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime); uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime);
} }
...@@ -643,6 +643,7 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t ...@@ -643,6 +643,7 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
while (taosHashIterNext(pIter)) { while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) { if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode); taosCacheReleaseNode(pCacheObj, pNode);
continue; continue;
......
...@@ -66,12 +66,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -66,12 +66,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
} }
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle, void* handle) { static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle; pRead->pCont = qhandle;
pRead->contLen = 0; pRead->contLen = 0;
pRead->rpcMsg.handle = handle; pRead->rpcMsg.handle = NULL;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
...@@ -99,6 +99,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -99,6 +99,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else { } else {
assert(*qhandle == (void*) killQueryMsg->qhandle); assert(*qhandle == (void*) killQueryMsg->qhandle);
qKillQuery(*qhandle); qKillQuery(*qhandle);
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
} }
...@@ -123,8 +124,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -123,8 +124,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken // current connect is broken
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
// handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
handle = &pQInfo;
if (handle == NULL) { // failed to register qhandle if (handle == NULL) { // failed to register qhandle
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
tstrerror(pRsp->code)); tstrerror(pRsp->code));
...@@ -135,11 +135,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -135,11 +135,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp->qhandle = htobe64((uint64_t) pQInfo); pRsp->qhandle = htobe64((uint64_t) pQInfo);
} }
// pQInfo = NULL;
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return pRsp->code; return pRsp->code;
} }
} else { } else {
...@@ -149,15 +148,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -149,15 +148,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) { if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); vnodePutItemIntoReadQueue(pVnode, *handle);
// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
void* p = (void*) pCont;
handle = &p; handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
// handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
if (handle == NULL) { if (handle == NULL) {
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle); vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_QRY_INVALID_QHANDLE; code = TSDB_CODE_QRY_INVALID_QHANDLE;
...@@ -166,23 +164,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -166,23 +164,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
bool buildRes = qTableQuery(*handle); // do execute query bool buildRes = qTableQuery(*handle); // do execute query
if (buildRes) { // build result rsp if (buildRes) { // build result rsp
// update the connection info according to the retrieve connection
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
assert(pReadMsg->rpcMsg.handle != NULL);
void* retrieveHandle = qGetResultRetrieveMsg(*handle); vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
assert(retrieveHandle != NULL);
vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, retrieveHandle);
pReadMsg->rpcMsg.handle = retrieveHandle; // update the connection info according to the retrieve connection
pRet = &pReadMsg->rspRet; pRet = &pReadMsg->rspRet;
code = TSDB_CODE_QRY_HAS_RSP; // code = TSDB_CODE_QRY_HAS_RSP;
bool continueExec = false; bool continueExec = false;
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) { if (continueExec) {
vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); vTrace("QInfo:%p add to queue for further exec", *handle);
vnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle; pRet->qhandle = *handle;
code = TSDB_CODE_SUCCESS; // code = TSDB_CODE_SUCCESS;
} else {
vDebug("QInfo:%p query completed", *handle);
} }
} else { // todo handle error } else { // todo handle error
} }
...@@ -190,7 +190,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -190,7 +190,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code = TSDB_CODE_QRY_HAS_RSP; code = TSDB_CODE_QRY_HAS_RSP;
} }
} }
// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
return code; return code;
...@@ -209,11 +210,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -209,11 +210,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void** handle = NULL; void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
void* p1 = (void*) pRetrieve->qhandle;
handle = &p1;
// void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
code = TSDB_CODE_QRY_INVALID_QHANDLE; code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
...@@ -233,7 +230,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -233,7 +230,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle); qKillQuery(*handle);
// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp); pRet->len = sizeof(SRetrieveTableRsp);
...@@ -255,16 +252,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -255,16 +252,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
//TODO handle malloc failure //TODO handle malloc failure
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else { } else { // result is not ready, return immediately
// result is not ready, return immediately
if (!buildRes) { if (!buildRes) {
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
return TSDB_CODE_QRY_NOT_READY; return TSDB_CODE_QRY_NOT_READY;
} }
bool continueExec = false; bool continueExec = false;
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) { if (continueExec) {
vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); vnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle; pRet->qhandle = *handle;
freeHandle = false; freeHandle = false;
} }
...@@ -274,8 +271,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -274,8 +271,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} }
} }
UNUSED(freeHandle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
// qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册