提交 055b9b1b 编写于 作者: S slguan

add some code in dnode module

上级 a305bb42
......@@ -20,12 +20,11 @@
extern "C" {
#endif
int dnodeInitRead();
void dnodeCleanupRead();
void dnodeRead(SRpcMsg *);
void *dnodeAllocateReadWorker();
void dnodeFreeReadWorker(void *rqueue);
int32_t dnodeInitRead();
void dnodeCleanupRead();
void dnodeRead(void *pMsg);
void * dnodeAllocateReadWorker();
void dnodeFreeReadWorker(void *rqueue);
#ifdef __cplusplus
}
......
......@@ -20,12 +20,11 @@
extern "C" {
#endif
int dnodeInitWrite();
void dnodeCleanupWrite();
void dnodeWrite(SRpcMsg *pMsg);
void *dnodeAllocateWriteWorker();
void dnodeFreeWriteWorker(void *worker);
int32_t dnodeInitWrite();
void dnodeCleanupWrite();
void dnodeWrite(void *pMsg);
void * dnodeAllocateWriteWorker();
void dnodeFreeWriteWorker(void *worker);
#ifdef __cplusplus
}
......
......@@ -56,7 +56,7 @@ static void * tsDnodeVnodesHash = NULL;
int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
if (tsDnodeVnodesHash == NULL) {
......@@ -176,26 +176,24 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) {
// remove read queue
dnodeFreeReadWorker(pVnode->rworker);
pVnode->rworker = NULL;
// remove write queue
dnodeFreeWriteWorker(pVnode->wworker);
pVnode->wworker = NULL;
// remove wal
// remove tsdb
if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb);
pVnode->tsdb = NULL;
}
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
}
static int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnodeCfg) {
pVnodeCfg->vnode = htonl(pVnodeCfg->vnode);
pVnodeCfg->cfg.vgId = htonl(pVnodeCfg->cfg.vgId);
pVnodeCfg->cfg.maxSessions = htonl(pVnodeCfg->cfg.maxSessions);
pVnodeCfg->cfg.daysPerFile = htonl(pVnodeCfg->cfg.daysPerFile);
STsdbCfg tsdbCfg;
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->vnode;
......@@ -248,47 +246,60 @@ static void dnodeDropVnode(SVnodeObj *pVnode) {
dnodeCleanupVnode(pVnode);
}
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) {
// SVnodeObj *pVnode;
// int32_t vgId;
// SVPeersMsg *pCfg;
// check everything, if not ok, set terrno;
static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
// everything is ok
SCreateVnodeMsg *pCreate = (SCreateVnodeMsg *) rpcMsg->pCont;
pCreate->vnode = htonl(pCreate->vnode);
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
// dnodeCreateVnode(vgId, pCfg);
SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) {
rpcRsp.code = TSDB_CODE_SUCCESS;
} else {
rpcRsp.code = dnodeCreateVnode(pCreate);
}
//if (pVnode == NULL) terrno = TSDB_CODE
rpcSendResponse(&rpcRsp);
rpcFreeCont(rpcMsg->pCont);
}
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) {
SVnodeObj *pVnode;
int32_t vgId;
// check everything, if not ok, set terrno;
static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDropVnodeMsg *pDrop = (SCreateVnodeMsg *) rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
// everything is ok
dnodeDropVnode(pVnode);
SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId);
if (pVnodeObj != NULL) {
dnodeDropVnode(pVnodeObj);
rpcRsp.code = TSDB_CODE_SUCCESS;
} else {
rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID;
}
//if (pVnode == NULL) terrno = TSDB_CODE
rpcSendResponse(&rpcRsp);
rpcFreeCont(rpcMsg->pCont);
}
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg) {
SVnodeObj *pVnode;
int32_t vgId;
// check everything, if not ok, set terrno;
static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCreateVnodeMsg *pCreate = (SCreateVnodeMsg *) rpcMsg->pCont;
pCreate->vnode = htonl(pCreate->vnode);
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
// everything is ok
// dnodeAlterVnode(pVnode);
SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) {
rpcRsp.code = TSDB_CODE_SUCCESS;
} else {
rpcRsp.code = dnodeCreateVnode(pCreate);;
}
//if (pVnode == NULL) terrno = TSDB_CODE
rpcSendResponse(&rpcRsp);
rpcFreeCont(rpcMsg->pCont);
}
......@@ -16,10 +16,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "trpc.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "dnodeRead.h"
#include "dnodeMgmt.h"
......@@ -31,7 +31,7 @@ typedef struct {
typedef struct {
void *pCont;
int contLen;
int32_t contLen;
SRpcMsg rpcMsg;
void *pVnode;
SRpcContext *pRpcContext; // RPC message context
......@@ -42,16 +42,16 @@ static void dnodeProcessReadResult(SReadMsg *pRead);
static void dnodeHandleIdleReadWorker();
static void dnodeProcessQueryMsg(SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg);
static void (*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode);
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode);
// module global variable
static taos_qset readQset;
static int threads; // number of query threads
static int maxThreads;
static int minThreads;
static int32_t threads; // number of query threads
static int32_t maxThreads;
static int32_t minThreads;
int dnodeInitRead() {
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg;
int32_t dnodeInitRead() {
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg;
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg;
readQset = taosOpenQset();
......@@ -67,12 +67,14 @@ void dnodeCleanupRead() {
taosCloseQset(readQset);
}
void dnodeRead(SRpcMsg *pMsg) {
int leftLen = pMsg->contLen;
char *pCont = (char *)pMsg->pCont;
int contLen = 0;
int numOfVnodes = 0;
int32_t vgId = 0;
void dnodeRead(void *rpcMsg) {
SRpcMsg *pMsg = rpcMsg;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
int32_t contLen = 0;
int32_t numOfVnodes = 0;
int32_t vgId = 0;
SRpcContext *pRpcContext = NULL;
// parse head, get number of vnodes;
......@@ -87,31 +89,31 @@ void dnodeRead(SRpcMsg *pMsg) {
// get pVnode from vgId
void *pVnode = dnodeGetVnode(vgId);
if (pVnode == NULL) {
continue;
}
// put message into queue
SReadMsg readMsg;
readMsg.rpcMsg = *pMsg;
readMsg.pCont = pCont;
readMsg.contLen = contLen;
readMsg.pRpcContext = pRpcContext;
readMsg.pVnode = pVnode;
readMsg.rpcMsg = *pMsg;
readMsg.pCont = pCont;
readMsg.contLen = contLen;
readMsg.pRpcContext = pRpcContext;
readMsg.pVnode = pVnode;
taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, &readMsg);
// next vnode
// next vnode
leftLen -= contLen;
pCont -= contLen;
pCont -= contLen;
dnodeReleaseVnode(pVnode);
}
}
void *dnodeAllocateReadWorker() {
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
if ( queue == NULL ) return NULL;
if (queue == NULL) return NULL;
taosAddIntoQset(readQset, queue);
......@@ -131,7 +133,6 @@ void *dnodeAllocateReadWorker() {
}
void dnodeFreeReadWorker(void *rqueue) {
taosCloseQueue(rqueue);
// dynamically adjust the number of threads
......@@ -144,16 +145,16 @@ static void *dnodeProcessReadQueue(void *param) {
while (1) {
if (taosReadQitemFromQset(qset, &readMsg) <= 0) {
dnodeHandleIdleReadWorker();
continue;
continue;
}
terrno = 0;
if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) {
(*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
dnodeProcessReadResult(&readMsg);
}
......@@ -161,7 +162,7 @@ static void *dnodeProcessReadQueue(void *param) {
}
static void dnodeHandleIdleReadWorker() {
int num = taosGetQueueNumber(readQset);
int32_t num = taosGetQueueNumber(readQset);
if (num == 0 || (num <= minThreads && threads > minThreads)) {
threads--;
......@@ -180,10 +181,10 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
if (pRpcContext) {
if (terrno) {
if (pRpcContext->code == 0) pRpcContext->code = terrno;
if (pRpcContext->code == 0) pRpcContext->code = terrno;
}
int count = atomic_add_fetch_32(&pRpcContext->count, 1);
int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1);
if (count < pRpcContext->numOfVnodes) {
// not over yet, multiple vnodes
return;
......@@ -197,8 +198,8 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
SRpcMsg rsp;
rsp.handle = pRead->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
}
......
......@@ -15,11 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tlog.h"
#include "trpc.h"
#include "tqueue.h"
#include "taosmsg.h"
#include "trpc.h"
#include "dnodeWrite.h"
#include "dnodeMgmt.h"
......@@ -31,7 +31,7 @@ typedef struct {
typedef struct _write {
void *pCont;
int contLen;
int32_t contLen;
SRpcMsg rpcMsg;
void *pVnode; // pointer to vnode
SRpcContext *pRpcContext; // RPC message context
......@@ -40,12 +40,12 @@ typedef struct _write {
typedef struct {
taos_qset qset; // queue set
pthread_t thread; // thread
int workerId; // worker ID
int32_t workerId; // worker ID
} SWriteWorker;
typedef struct _thread_obj {
int max; // max number of workers
int nextId; // from 0 to max-1, cyclic
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker;
} SWriteWorkerPool;
......@@ -59,8 +59,8 @@ static void dnodeProcessDropTableMsg(SWriteMsg *pMsg);
SWriteWorkerPool wWorkerPool;
int dnodeInitWrite() {
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg;
int32_t dnodeInitWrite() {
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableMsg;
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessDropTableMsg;
......@@ -68,7 +68,7 @@ int dnodeInitWrite() {
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1;
for (int i=0; i<wWorkerPool.max; ++i) {
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
wWorkerPool.writeWorker[i].workerId = i;
}
......@@ -76,17 +76,17 @@ int dnodeInitWrite() {
}
void dnodeCleanupWrite() {
free(wWorkerPool.writeWorker);
}
void dnodeWrite(SRpcMsg *pMsg) {
int leftLen = pMsg->contLen;
char *pCont = (char *)pMsg->pCont;
int contLen = 0;
int numOfVnodes = 0;
int32_t vgId = 0;
void dnodeWrite(void *rpcMsg) {
SRpcMsg *pMsg = rpcMsg;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
int32_t contLen = 0;
int32_t numOfVnodes = 0;
int32_t vgId = 0;
SRpcContext *pRpcContext = NULL;
// parse head, get number of vnodes;
......@@ -108,11 +108,11 @@ void dnodeWrite(SRpcMsg *pMsg) {
// put message into queue
SWriteMsg writeMsg;
writeMsg.rpcMsg = *pMsg;
writeMsg.pCont = pCont;
writeMsg.contLen = contLen;
writeMsg.pRpcContext = pRpcContext;
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
writeMsg.rpcMsg = *pMsg;
writeMsg.pCont = pCont;
writeMsg.contLen = contLen;
writeMsg.pRpcContext = pRpcContext;
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, &writeMsg);
......@@ -150,7 +150,6 @@ void *dnodeAllocateWriteWorker() {
}
void dnodeFreeWriteWorker(void *wqueue) {
taosCloseQueue(wqueue);
// dynamically adjust the number of threads
......@@ -160,7 +159,7 @@ static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param;
taos_qall qall;
SWriteMsg writeMsg;
int numOfMsgs;
int32_t numOfMsgs;
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall);
......@@ -169,7 +168,7 @@ static void *dnodeProcessWriteQueue(void *param) {
continue;
}
for (int i=0; i<numOfMsgs; ++i) {
for (int32_t i=0; i<numOfMsgs; ++i) {
// retrieve all items, and write them into WAL
taosGetQitem(qall, &writeMsg);
......@@ -181,7 +180,7 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one
taosResetQitems(qall);
for (int i=0; i<numOfMsgs; ++i) {
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, &writeMsg);
terrno = 0;
......@@ -212,7 +211,7 @@ static void dnodeProcessWriteResult(SWriteMsg *pWrite) {
if (pRpcContext->code == 0) pRpcContext->code = terrno;
}
int count = atomic_add_fetch_32(&pRpcContext->count, 1);
int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1);
if (count < pRpcContext->numOfVnodes) {
// not over yet, multiple vnodes
return;
......@@ -226,15 +225,14 @@ static void dnodeProcessWriteResult(SWriteMsg *pWrite) {
SRpcMsg rsp;
rsp.handle = pWrite->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
rpcFreeCont(pWrite->rpcMsg.pCont); // free the received message
}
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int num = taosGetQueueNumber(pWorker->qset);
int32_t num = taosGetQueueNumber(pWorker->qset);
if (num > 0) {
usleep(100);
......@@ -248,15 +246,12 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
}
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
}
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
}
......@@ -349,8 +349,8 @@ typedef struct {
} SDRemoveSuperTableMsg;
typedef struct {
int32_t vnode;
} SFreeVnodeMsg;
int32_t vgId;
} SDropVnodeMsg;
typedef struct SColIndexEx {
int16_t colId;
......
......@@ -291,10 +291,10 @@ void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle)
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle);
SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg));
SDropVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SDropVnodeMsg));
if (pFreeVnode != NULL) {
pFreeVnode->vnode = htonl(vnode);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DROP_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DROP_VNODE, pFreeVnode, sizeof(SDropVnodeMsg), ahandle);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册