提交 ea374b6f 编写于 作者: S Shengliang Guan

TD-1915

上级 5b3d0de3
......@@ -92,33 +92,23 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
taos_queue queue = vnodeAcquireRqueue(pHead->vgId);
if (queue == NULL) {
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
continue;
void *pVnode = vnodeAcquire(pHead->vgId);
if (pVnode != NULL) {
int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
if (code == TSDB_CODE_SUCCESS) queuedMsgNum++;
vnodeRelease(pVnode);
}
// put message into queue
SVReadMsg *pRead = taosAllocateQitem(sizeof(SVReadMsg));
pRead->rpcMsg = *pMsg;
pRead->pCont = pCont;
pRead->contLen = pHead->contLen;
// next vnode
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
queuedMsgNum++;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
}
rpcFreeCont(pMsg->pCont);
}
void *dnodeAllocVReadQueue(void *pVnode) {
......@@ -162,50 +152,48 @@ void dnodeFreeVReadQueue(void *rqueue) {
void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.handle = pRead->rpcHandle,
.pCont = pRead->rspRet.rsp,
.contLen = pRead->rspRet.len,
.code = code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode);
}
void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode);
}
static void *dnodeProcessReadQueue(void *param) {
SVReadMsg *pReadMsg;
SVReadMsg *pRead;
int32_t qtype;
void * pVnode;
while (1) {
if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pReadMsg, &pVnode) == 0) {
if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pRead, &pVnode) == 0) {
dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset);
break;
}
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], qtype, pReadMsg);
dDebug("%p, msg:%p:%s will be processed in vread queue, qtype:%d", pRead->rpcAhandle, pRead,
taosMsg[pRead->msgType], qtype);
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
int32_t code = vnodeProcessRead(pVnode, pRead);
if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcVReadRsp(pVnode, pReadMsg, code);
dnodeSendRpcVReadRsp(pVnode, pRead, code);
} else {
if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcVReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
dnodeSendRpcVReadRsp(pVnode, pRead, pRead->code);
} else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5));
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
assert(pRead->rpcHandle == NULL || (pRead->rpcHandle != NULL && pRead->msgType == 5));
dnodeDispatchNonRspMsg(pVnode, pRead, code);
}
}
taosFreeQitem(pReadMsg);
taosFreeQitem(pRead);
}
return NULL;
......
......@@ -37,10 +37,15 @@ typedef struct {
} SRspRet;
typedef struct {
SRspRet rspRet;
void * pCont;
int32_t code;
int32_t contLen;
SRpcMsg rpcMsg;
void * rpcHandle;
void * rpcAhandle;
void * qhandle;
int8_t qtype;
int8_t msgType;
SRspRet rspRet;
char pCont[];
} SVReadMsg;
typedef struct {
......@@ -62,13 +67,11 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
void* vnodeAcquire(int32_t vgId); // add refcount
void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
......@@ -77,8 +80,8 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources();
void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pReadMsg);
int32_t vnodeCheckRead(void *pVnode);
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
#ifdef __cplusplus
}
......
......@@ -468,21 +468,6 @@ void *vnodeAcquire(int32_t vgId) {
return *ppVnode;
}
void *vnodeAcquireRqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL;
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
vInfo("vgId:%d, can not provide read service, status is %s", vgId, vnodeStatus[pVnode->status]);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->rqueue;
}
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
}
......
......@@ -46,7 +46,7 @@ void vnodeInitReadFp(void) {
//
int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType;
int32_t msgType = pReadMsg->msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
......@@ -56,7 +56,7 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
}
int32_t vnodeCheckRead(void *param) {
static int32_t vnodeCheckRead(void *param) {
SVnodeObj *pVnode = param;
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
......@@ -78,24 +78,58 @@ int32_t vnodeCheckRead(void *param) {
return TSDB_CODE_SUCCESS;
}
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
SVReadMsg *pRead = (SVReadMsg *)taosAllocateQitem(sizeof(SVReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.ahandle = ahandle;
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam;
atomic_add_fetch_32(&pVnode->refCount, 1);
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) {
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
}
int32_t size = sizeof(SVReadMsg) + contLen;
SVReadMsg *pRead = taosAllocateQitem(size);
if (pRead == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
if (rparam != NULL) {
SRpcMsg *pRpcMsg = rparam;
pRead->rpcHandle = pRpcMsg->handle;
pRead->rpcAhandle = pRpcMsg->ahandle;
pRead->msgType = pRpcMsg->msgType;
pRead->code = pRpcMsg->code;
}
if (contLen != 0) {
pRead->contLen = contLen;
memcpy(pRead->pCont, pCont, contLen);
} else {
pRead->qhandle = pCont;
}
pRead->qtype = qtype;
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode rqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->rqueue, qtype, pRead);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
rpcMsg.ahandle = ahandle;
int32_t code = vnodeWriteToRQueue(pVnode, qhandle, 0, TAOS_QTYPE_QUERY, &rpcMsg);
if (code == TSDB_CODE_SUCCESS) {
vDebug("QInfo:%p add to vread queue for exec query", *qhandle);
}
return code;
}
/**
*
* @param pRet response message object
......@@ -155,18 +189,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet));
// qHandle needs to be freed correctly
if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pReadMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont;
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcHandle);
assert(pReadMsg->contLen > 0 && killQueryMsg->free == 1);
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
pReadMsg->rpcMsg.handle);
pReadMsg->rpcHandle);
} else {
assert(*qhandle == (void *)killQueryMsg->qhandle);
......@@ -208,9 +242,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
}
if (handle != NULL &&
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
pReadMsg->rpcMsg.handle);
pReadMsg->rpcHandle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
return pRsp->code;
......@@ -221,7 +255,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcHandle);
if (code != TSDB_CODE_SUCCESS) {
pRsp->code = code;
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -230,7 +264,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
}
} else {
assert(pCont != NULL);
void **qhandle = (void **)pCont;
void **qhandle = (void **)pReadMsg->qhandle;
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
......@@ -242,14 +276,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
// build query rsp, the retrieve request has reached here already
if (buildRes) {
// update the connection info according to the retrieve connection
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*qhandle);
assert(pReadMsg->rpcMsg.handle != NULL);
pReadMsg->rpcHandle = qGetResultRetrieveMsg(*qhandle);
assert(pReadMsg->rpcHandle != NULL);
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
pReadMsg->rpcMsg.handle);
pReadMsg->rpcHandle);
// set the real rsp error code
pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcMsg.ahandle);
pReadMsg->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcHandle);
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code = TSDB_CODE_QRY_HAS_RSP;
......@@ -283,7 +317,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
pRetrieve->free, pReadMsg->rpcMsg.handle);
pRetrieve->free, pReadMsg->rpcHandle);
memset(pRet, 0, sizeof(SRspRet));
......@@ -314,9 +348,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
}
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle,
pReadMsg->rpcMsg.handle);
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcHandle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -326,7 +359,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
bool freeHandle = true;
bool buildRes = false;
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcHandle);
if (code != TSDB_CODE_SUCCESS) {
// TODO handle malloc failure
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
......@@ -337,7 +370,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
assert(buildRes == true);
#if _NON_BLOCKING_RETRIEVE
if (!buildRes) {
assert(pReadMsg->rpcMsg.handle != NULL);
assert(pReadMsg->rpcHandle != NULL);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
return TSDB_CODE_QRY_NOT_READY;
......@@ -345,7 +378,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
#endif
// ahandle is the sqlObj pointer
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcHandle);
}
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
......
......@@ -97,7 +97,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
return syncCode;
}
int32_t vnodeCheckWrite(void *param) {
static int32_t vnodeCheckWrite(void *param) {
SVnodeObj *pVnode = param;
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册