未验证 提交 508ce47d 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1551 from taosdata/feature/vpeer

[TD-116] split dnodeRead to dnodeVnode
......@@ -15,38 +15,25 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "twal.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "queryExecutor.h"
#include "vnode.h"
typedef struct {
int32_t code;
int32_t count;
int32_t numOfVnodes;
} SRpcContext;
typedef struct {
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
SRpcContext *pRpcContext; // RPC message context
SRspRet rspRet;
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
} SReadMsg;
static void *dnodeProcessReadQueue(void *param);
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead);
static void dnodeHandleIdleReadWorker();
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg);
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode);
// module global variable
static taos_qset readQset;
......@@ -55,14 +42,11 @@ static int32_t maxThreads;
static int32_t minThreads;
int32_t dnodeInitRead() {
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg;
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg;
readQset = taosOpenQset();
minThreads = 3;
maxThreads = tsNumOfCores*tsNumOfThreadsPerCore;
if (maxThreads <= minThreads*2) maxThreads = 2*minThreads;
maxThreads = tsNumOfCores * tsNumOfThreadsPerCore;
if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads;
dPrint("dnode read is opened");
return 0;
......@@ -77,7 +61,6 @@ void dnodeRead(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
SRpcContext *pRpcContext = NULL;
void *pVnode;
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
......@@ -105,7 +88,6 @@ void dnodeRead(SRpcMsg *pMsg) {
pRead->rpcMsg = *pMsg;
pRead->pCont = pCont;
pRead->contLen = pHead->contLen;
pRead->pRpcContext = pRpcContext;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
......@@ -156,6 +138,34 @@ void dnodeFreeRqueue(void *rqueue) {
// dynamically adjust the number of threads
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_ACTION_NEED_REPROCESSED) {
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
}
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp,
.contLen = pRead->rspRet.len,
.code = pRead->rspRet.code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
}
static void *dnodeProcessReadQueue(void *param) {
taos_qset qset = (taos_qset)param;
SReadMsg *pReadMsg;
......@@ -168,13 +178,8 @@ static void *dnodeProcessReadQueue(void *param) {
continue;
}
terrno = 0;
if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) {
(*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
taosFreeQitem(pReadMsg);
}
......@@ -193,118 +198,3 @@ static void dnodeHandleIdleReadWorker() {
}
}
UNUSED_FUNC
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
SRpcContext *pRpcContext = pRead->pRpcContext;
int32_t code = 0;
if (pRpcContext) {
if (terrno) {
if (pRpcContext->code == 0) pRpcContext->code = terrno;
}
int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1);
if (count < pRpcContext->numOfVnodes) {
// not over yet, multiple vnodes
return;
}
// over, result can be merged now
code = pRpcContext->code;
} else {
code = terrno;
}
//TODO: query handle is returned by dnodeProcessQueryMsg
if (0) {
SRpcMsg rsp;
rsp.handle = pRead->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
}
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->pRpcContext = pMsg->pRpcContext;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL;
if (pMsg->contLen != 0) {
void* tsdb = vnodeGetTsdb(pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle);
} else {
pQInfo = pMsg->pCont;
}
qTableQuery(pQInfo); // do execute query
}
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
int32_t contLen = 0;
SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp);
pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
} else {
// todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
if (qHasMoreResultsToRetrieve(pQInfo)) {
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode
qDestroyQueryInfo(pQInfo);
// dnodeProcessReadResult(pVnode, pMsg);
vnodeRelease(pVnode);
}
}
SRpcMsg rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle);
}
......@@ -47,6 +47,7 @@ static STaosError errors[] = {
// rpc
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NEED_REPROCESSED, 0, 3, "action need to be reprocessed")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_ALREADY_PROCESSED, 0, 5, "message already processed")
TAOS_DEFINE_ERROR(TSDB_CODE_REDIRECT, 0, 6, "redirect")
......
......@@ -22,7 +22,9 @@ extern "C" {
typedef struct {
int len;
int code;
void *rsp;
void *qhandle; //used by query and retrieve msg
} SRspRet;
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
......@@ -42,6 +44,8 @@ void* vnodeGetTsdb(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item);
void vnodeBuildStatusMsg(void * param);
int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret);
#ifdef __cplusplus
}
#endif
......
......@@ -47,6 +47,7 @@ typedef struct {
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
void vnodeInitWriteFp(void);
void vnodeInitReadFp(void);
#ifdef __cplusplus
}
......
......@@ -41,6 +41,7 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
static void vnodeInit() {
vnodeInitWriteFp();
vnodeInitReadFp();
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt);
if (tsDnodeVnodesHash == NULL) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "dataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "queryExecutor.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = vnodeProcessRetrieveMsg;
}
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
SVnodeObj *pVnode = (SVnodeObj *)param;
if (vnodeProcessReadMsgFp[msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING)
return TSDB_CODE_NOT_ACTIVE_VNODE;
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
SQInfo* pQInfo = NULL;
if (contLen != 0) {
void* tsdb = vnodeGetTsdb(pVnode);
pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
pRsp->code = pRet->code;
pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp;
dTrace("pVnode:%p vgId:%d QInfo:%p, dnode query msg disposed", pVnode, pVnode->vgId, pQInfo);
} else {
pQInfo = pCont;
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
qTableQuery(pQInfo); // do execute query
return code;
}
static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is received", pVnode, pVnode->vgId, pQInfo);
pRet->code = qRetrieveQueryResultInfo(pQInfo);
if (pRet->code != TSDB_CODE_SUCCESS) {
//TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else {
// todo check code and handle error in build result set
pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(pQInfo)) {
pRet->qhandle = pQInfo;
code = TSDB_CODE_ACTION_NEED_REPROCESSED;
} else {
// no further execution invoked, release the ref to vnode
qDestroyQueryInfo(pQInfo);
vnodeRelease(pVnode);
}
}
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is disposed", pVnode, pVnode->vgId, pQInfo);
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册