未验证 提交 71081b5b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2564 from taosdata/feature/query

[td-225] refactor code.
...@@ -396,7 +396,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -396,7 +396,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
} }
void *pVnode = vnodeAccquireVnode(pCreate->cfg.vgId); void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
int32_t code = vnodeAlter(pVnode, pCreate); int32_t code = vnodeAlter(pVnode, pCreate);
vnodeRelease(pVnode); vnodeRelease(pVnode);
......
...@@ -98,7 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -98,7 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pVnode = vnodeAccquireVnode(pHead->vgId); pVnode = vnodeAcquireVnode(pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
...@@ -175,13 +175,15 @@ void dnodeFreeVnodeRqueue(void *rqueue) { ...@@ -175,13 +175,15 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads // dynamically adjust the number of threads
} }
void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle) { void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle; pRead->pCont = qhandle;
pRead->contLen = 0; pRead->contLen = 0;
taos_queue queue = vnodeAccquireRqueue(pVnode); assert(pVnode != NULL);
taos_queue queue = vnodeAcquireRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
} }
......
...@@ -53,7 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); ...@@ -53,7 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue); void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode); void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue); void dnodeFreeVnodeRqueue(void *rqueue);
void dnodePutQhandleIntoReadQueue(void *pVnode, void *qhandle); void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue(); int32_t dnodeAllocateMnodePqueue();
......
...@@ -49,10 +49,10 @@ int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); ...@@ -49,10 +49,10 @@ int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode); void vnodeRelease(void *pVnode);
void* vnodeAccquireVnode(int32_t vgId); // add refcount void* vnodeAcquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
void* vnodeAccquireRqueue(void *); void* vnodeAcquireRqueue(void *);
void* vnodeGetRqueue(void *); void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId); void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
......
...@@ -299,7 +299,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -299,7 +299,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
int32_t vnodeStartStream(int32_t vnode) { int32_t vnodeStartStream(int32_t vnode) {
SVnodeObj* pVnode = vnodeAccquireVnode(vnode); SVnodeObj* pVnode = vnodeAcquireVnode(vnode);
if (pVnode != NULL) { if (pVnode != NULL) {
tsdbStartStream(pVnode->tsdb); tsdbStartStream(pVnode->tsdb);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -383,7 +383,7 @@ void *vnodeGetVnode(int32_t vgId) { ...@@ -383,7 +383,7 @@ void *vnodeGetVnode(int32_t vgId) {
return *ppVnode; return *ppVnode;
} }
void *vnodeAccquireVnode(int32_t vgId) { void *vnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId); SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return pVnode; if (pVnode == NULL) return pVnode;
...@@ -393,7 +393,7 @@ void *vnodeAccquireVnode(int32_t vgId) { ...@@ -393,7 +393,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
return pVnode; return pVnode;
} }
void *vnodeAccquireRqueue(void *param) { void *vnodeAcquireRqueue(void *param) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = param;
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
...@@ -407,7 +407,7 @@ void *vnodeGetRqueue(void *pVnode) { ...@@ -407,7 +407,7 @@ void *vnodeGetRqueue(void *pVnode) {
} }
void *vnodeGetWqueue(int32_t vgId) { void *vnodeGetWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAccquireVnode(vgId); SVnodeObj *pVnode = vnodeAcquireVnode(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
return pVnode->wqueue; return pVnode->wqueue;
} }
...@@ -473,7 +473,7 @@ void vnodeBuildStatusMsg(void *param) { ...@@ -473,7 +473,7 @@ void vnodeBuildStatusMsg(void *param) {
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId); pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAccquireVnode(pAccess[i].vgId); SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState; pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
......
...@@ -203,7 +203,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -203,7 +203,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(*handle)) { if (qHasMoreResultsToRetrieve(*handle)) {
dnodePutQhandleIntoReadQueue(pVnode, handle); dnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = handle; pRet->qhandle = handle;
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
} else { // no further execution invoked, release the ref to vnode } else { // no further execution invoked, release the ref to vnode
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册