提交 9195433f 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy

......@@ -70,10 +70,7 @@ int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId);
void taosResetQsetThread(STaosQset *qset, void *pItem);
int32_t taosGetQueueItemsNumber(STaosQueue *queue);
int32_t taosGetQsetItemsNumber(STaosQset *qset);
......
......@@ -27,33 +27,33 @@ typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker {
int32_t id; // worker ID
TdThread thread; // thread
TdThread thread; // thread
SQWorkerPool *pool;
} SQWorker, SFWorker;
} SQWorker;
typedef struct SQWorkerPool {
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
STaosQset *qset;
const char *name;
SQWorker *workers;
int32_t max; // max number of workers
int32_t min; // min number of workers
int32_t num; // current number of workers
STaosQset *qset;
const char *name;
SQWorker *workers;
TdThreadMutex mutex;
} SQWorkerPool, SFWorkerPool;
} SQWorkerPool;
typedef struct SWWorker {
int32_t id; // worker id
TdThread thread; // thread
TdThread thread; // thread
STaosQall *qall;
STaosQset *qset; // queue set
SWWorkerPool *pool;
} SWWorker;
typedef struct SWWorkerPool {
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
const char *name;
SWWorker *workers;
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
const char *name;
SWWorker *workers;
TdThreadMutex mutex;
} SWWorkerPool;
......@@ -62,16 +62,43 @@ void tQWorkerCleanup(SQWorkerPool *pool);
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
int32_t tFWorkerInit(SFWorkerPool *pool);
void tFWorkerCleanup(SFWorkerPool *pool);
STaosQueue *tFWorkerAllocQueue(SFWorkerPool *pool, void *ahandle, FItem fp);
void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue);
int32_t tWWorkerInit(SWWorkerPool *pool);
void tWWorkerCleanup(SWWorkerPool *pool);
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue);
typedef struct {
const char *name;
int32_t minNum;
int32_t maxNum;
FItem fp;
void *param;
} SQWorkerAllCfg;
typedef struct {
const char *name;
STaosQueue *queue;
SQWorkerPool pool;
} SQWorkerAll;
typedef struct {
const char *name;
int32_t maxNum;
FItems fp;
void *param;
} SWWorkerAllCfg;
typedef struct {
const char *name;
STaosQueue *queue;
SWWorkerPool pool;
} SWWorkerAll;
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg);
void tQWorkerAllCleanup(SQWorkerAll *pWorker);
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg);
void tWWorkerAllCleanup(SWWorkerAll *pWorker);
#ifdef __cplusplus
}
#endif
......
......@@ -89,6 +89,9 @@ typedef struct {
} SDnodeWorker;
typedef struct SMsgHandle {
int32_t vgId;
NodeMsgFp vgIdMsgFp;
SMgmtWrapper *pVgIdWrapper; // Handle the case where the same message type is distributed to qnode or vnode
NodeMsgFp msgFp;
SMgmtWrapper *pWrapper;
} SMsgHandle;
......@@ -114,6 +117,7 @@ typedef struct SMgmtWrapper {
void *pMgmt;
SDnode *pDnode;
NodeMsgFp msgFps[TDMT_MAX];
int32_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
SMgmtFp fp;
} SMgmtWrapper;
......@@ -149,7 +153,7 @@ typedef struct SDnode {
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndSendMonitorReport(SDnode *pDnode);
......
......@@ -65,8 +65,9 @@ void dndCleanup() {
dInfo("dnode env is cleaned up");
}
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
}
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
......
......@@ -20,13 +20,26 @@
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
static inline void dndProcessQVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMsgHead *pHead = pMsg->pCont;
int32_t vgId = htonl(pHead->vgId);
SMgmtWrapper *pWrapper = pHandle->pWrapper;
if (vgId == pHandle->vgId && pHandle->pVgIdWrapper != NULL) {
pWrapper = pHandle->pVgIdWrapper;
}
dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name,
pMsg->handle, pMsg->ahandle, vgId);
dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
}
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pRsp->msgType;
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
// if (pRsp == NULL || pRsp->pCont == NULL) return;
dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
rpcFreeCont(pRsp->pCont);
return;
......@@ -34,9 +47,13 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->msgFp != NULL) {
dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
if (pHandle->vgId == 0) {
dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
} else {
dndProcessQVnodeRpcMsg(pHandle, pRsp, pEpSet);
}
} else {
dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
rpcFreeCont(pRsp->pCont);
......@@ -110,9 +127,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->msgFp != NULL) {
dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
pReq->handle, pReq->ahandle);
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
if (pHandle->vgId == 0) {
dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
pReq->handle, pReq->ahandle);
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
} else {
dndProcessQVnodeRpcMsg(pHandle, pReq, pEpSet);
}
} else {
dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
......@@ -245,17 +266,24 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
int32_t vgId = pWrapper->msgVgIds[msgIndex];
if (msgFp == NULL) continue;
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (pHandle->msgFp != NULL) {
dError("msg:%s has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex],
pHandle->pWrapper->name, pWrapper->name);
if (pHandle->msgFp != NULL && pHandle->vgId == vgId) {
dError("msg:%s has multiple process nodes, prev node:%s:%d, curr node:%s:%d", tMsgInfo[msgIndex],
pHandle->pWrapper->name, pHandle->pWrapper->msgVgIds[msgIndex], pWrapper->name, vgId);
return -1;
} else {
dTrace("msg:%s will be processed by %s", tMsgInfo[msgIndex], pWrapper->name);
pHandle->msgFp = msgFp;
pHandle->pWrapper = pWrapper;
dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
if (vgId == 0) {
pHandle->msgFp = msgFp;
pHandle->pWrapper = pWrapper;
} else {
pHandle->vgId = vgId;
pHandle->vgIdMsgFp = msgFp;
pHandle->pVgIdWrapper = pWrapper;
}
}
}
}
......
......@@ -114,19 +114,19 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, 0);
// Requests handled by MNODE
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
}
......@@ -28,9 +28,9 @@ typedef struct SMnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
SQWorkerAll readWorker;
SQWorkerAll writeWorker;
SQWorkerAll syncWorker;
SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica;
int8_t selfIndex;
......
......@@ -75,78 +75,78 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
// Requests handled by MNODE
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0);
// Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg, 0);
}
......@@ -42,9 +42,9 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......@@ -59,7 +59,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
return -1;
......@@ -68,7 +68,7 @@ static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
int32_t code = taosWriteQitem(pWorker->queue, pMsg);
if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
......@@ -89,18 +89,20 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr());
SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode write worker since %s", terrstr());
if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start mnode-write worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode sync worker since %s", terrstr());
if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) {
dError("failed to start mnode sync-worker since %s", terrstr());
return -1;
}
......@@ -108,7 +110,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
}
void mmStopWorker(SMnodeMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->readWorker);
dndCleanupWorker(&pMgmt->writeWorker);
dndCleanupWorker(&pMgmt->syncWorker);
tQWorkerAllCleanup(&pMgmt->readWorker);
tQWorkerAllCleanup(&pMgmt->writeWorker);
tQWorkerAllCleanup(&pMgmt->syncWorker);
}
......@@ -54,4 +54,16 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
}
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {}
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, 1);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, 1);
}
......@@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by SNODE
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg);
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, 0);
}
......@@ -30,7 +30,7 @@ typedef struct SVnodesMgmt {
SVnodesStat state;
STfs *pTfs;
SQWorkerPool queryPool;
SFWorkerPool fetchPool;
SQWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
const char *path;
......
......@@ -243,42 +243,42 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
}
......@@ -62,11 +62,10 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
......@@ -74,12 +73,10 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
// TODO: handle invalid write
/*rpcFreeCont(pMsg->rpcMsg.pCont);*/
/*taosFreeQitem(pMsg);*/
}
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
......@@ -163,7 +160,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr());
dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
return -1;
}
......@@ -265,7 +262,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
......@@ -280,7 +277,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
......@@ -306,11 +303,11 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pQPool->max = maxQueryThreads;
if (tQWorkerInit(pQPool) != 0) return -1;
SFWorkerPool *pFPool = &pMgmt->fetchPool;
SQWorkerPool *pFPool = &pMgmt->fetchPool;
pFPool->name = "vnode-fetch";
pFPool->min = minFetchThreads;
pFPool->max = maxFetchThreads;
if (tFWorkerInit(pFPool) != 0) return -1;
if (tQWorkerInit(pFPool) != 0) return -1;
SWWorkerPool *pWPool = &pMgmt->writePool;
pWPool->name = "vnode-write";
......@@ -333,7 +330,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
void vmStopWorker(SVnodesMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->mgmtWorker);
tFWorkerCleanup(&pMgmt->fetchPool);
tQWorkerCleanup(&pMgmt->fetchPool);
tQWorkerCleanup(&pMgmt->queryPool);
tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->syncPool);
......
......@@ -31,10 +31,10 @@ extern "C" {
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 1500
#define ELECT_TIMER_MS_MAX 3000
#define ELECT_TIMER_MS_MIN 150
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 300
#define HEARTBEAT_TIMER_MS 30
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
......
......@@ -160,6 +160,11 @@ typedef struct SSyncNode {
SSyncLogStore* pLogStore;
SyncIndex commitIndex;
// timer ms init
int32_t pingBaseLine;
int32_t electBaseLine;
int32_t hbBaseLine;
// ping timer
tmr_h pPingTimer;
int32_t pingTimerMS;
......
......@@ -44,7 +44,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
// ---- misc ----
int32_t syncUtilRand(int32_t max);
int32_t syncUtilElectRandomMS();
int32_t syncUtilElectRandomMS(int32_t min, int32_t max);
int32_t syncUtilQuorum(int32_t replicaNum);
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
cJSON* syncUtilRaftId2Json(const SRaftId* p);
......
......@@ -242,9 +242,14 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
assert(pSyncNode->pLogStore != NULL);
pSyncNode->commitIndex = SYNC_INDEX_INVALID;
// timer ms init
pSyncNode->pingBaseLine = PING_TIMER_MS;
pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;
// init ping timer
pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = PING_TIMER_MS;
pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
......@@ -252,7 +257,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
// init elect timer
pSyncNode->pElectTimer = NULL;
pSyncNode->electTimerMS = syncUtilElectRandomMS();
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
......@@ -260,7 +265,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
// init heartbeat timer
pSyncNode->pHeartbeatTimer = NULL;
pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS;
pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
......@@ -394,7 +399,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
int32_t electMS = syncUtilElectRandomMS();
int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
return ret;
}
......@@ -763,7 +768,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
syncTimeoutDestroy(pSyncMsg);
// reset timer ms
pSyncNode->electTimerMS = syncUtilElectRandomMS();
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
......
......@@ -96,7 +96,10 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
int32_t syncUtilElectRandomMS() { return ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); }
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
assert(min > 0 && max > 0 && max >= min);
return min + syncUtilRand(max - min);
}
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
......
......@@ -16,7 +16,7 @@ void logTest() {
void electRandomMSTest() {
for (int i = 0; i < 10; ++i) {
int32_t ms = syncUtilElectRandomMS();
int32_t ms = syncUtilElectRandomMS(150, 300);
printf("syncUtilElectRandomMS: %d \n", ms);
}
}
......
......@@ -68,7 +68,6 @@ STaosQueue *taosOpenQueue() {
return NULL;
}
queue->threadId = -1;
uDebug("queue:%p is opened", queue);
return queue;
}
......@@ -437,59 +436,11 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
return code;
}
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) {
STaosQnode *pNode = NULL;
int32_t code = -1;
tsem_wait(&qset->sem);
taosThreadMutexLock(&qset->mutex);
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) qset->current = qset->head;
STaosQueue *queue = qset->current;
if (queue) qset->current = queue->next;
if (queue == NULL) break;
if (queue->head == NULL) continue;
if (queue->threadId != -1 && queue->threadId != threadId) {
code = 0;
continue;
}
taosThreadMutexLock(&queue->mutex);
if (queue->head) {
pNode = queue->head;
pNode->queue = queue;
queue->threadId = threadId;
*ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp;
queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1;
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
}
taosThreadMutexUnlock(&queue->mutex);
if (pNode) break;
}
taosThreadMutexUnlock(&qset->mutex);
return code;
}
void taosResetQsetThread(STaosQset *qset, void *pItem) {
if (pItem == NULL) return;
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
taosThreadMutexLock(&qset->mutex);
pNode->queue->threadId = -1;
for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
tsem_post(&qset->sem);
}
......
......@@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) {
pool->qset = taosOpenQset();
pool->workers = calloc(sizeof(SQWorker), pool->max);
pool->workers = calloc(pool->max, sizeof(SQWorker));
if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -93,7 +93,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
return NULL;
}
STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) {
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
taosThreadMutexLock(&pool->mutex);
STaosQueue *queue = taosOpenQueue();
if (queue == NULL) {
......@@ -114,7 +114,7 @@ STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, Threa
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&worker->thread, &thAttr, threadFp, worker) != 0) {
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -134,58 +134,11 @@ STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, Threa
return queue;
}
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
}
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue);
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
}
int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); }
void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); }
static void *tFWorkerThreadFp(SQWorker *worker) {
SQWorkerPool *pool = worker->pool;
FItem fp = NULL;
void *msg = NULL;
void *ahandle = NULL;
int32_t code = 0;
taosBlockSIGPIPE();
setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) {
code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id);
if (code < 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break;
} else if (code == 0) {
// uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset);
continue;
}
if (fp != NULL) {
(*fp)(ahandle, msg);
}
taosResetQsetThread(pool->qset, msg);
}
return NULL;
}
STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
}
void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); }
int32_t tWWorkerInit(SWWorkerPool *pool) {
pool->nextId = 0;
pool->workers = calloc(pool->max, sizeof(SWWorker));
......@@ -326,3 +279,60 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue);
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
}
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
SQWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name;
pPool->min = pCfg->minNum;
pPool->max = pCfg->maxNum;
if (tQWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->name = pCfg->name;
return 0;
}
void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
taosMsleep(10);
}
tQWorkerCleanup(&pWorker->pool);
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
SWWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name;
pPool->max = pCfg->maxNum;
if (tWWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->name = pCfg->name;
return 0;
}
void tWWorkerAllCleanup(SWWorkerAll *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
taosMsleep(10);
}
tWWorkerCleanup(&pWorker->pool);
tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}
......@@ -21,4 +21,7 @@
# ---- query
./test.sh -f tsim/query/interval.sim
# ---- tmq
./test.sh -f tsim/tmq/basic.sim
#======================b1-end===============
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wal -v 1
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 100
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
#root@trd02 /data2/dnode $ tmq_demo --help
#Used to tmq_demo
# -c Configuration directory, default is
# -d The name of the database to be created, default is tmqdb
# -s The name of the super table to be created, default is stb
# -f The file of result, default is ./tmqResult.txt
# -w The path of vnode of wal, default is /data2/dnode/data/vnodes/vnode2/wal
# -t numOfThreads, default is 1
# -n numOfTables, default is 1
# -v numOfVgroups, default is 1
# -a runMode, default is 0
# -l numOfColumn, default is 1
# -q ratio, default is 1.000000
# -b batchNumOfRow, default is 1
# -r totalRowsOfPerTbl, default is 10000
# -m startTimestamp, default is 1640966400000 [2022-01-01 00:00:00]
# -g showMsgFlag, default is 0
#
#system_content ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg
system ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg
print result-> $system_content
sql show databases
print ===> $rows $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != tmqdb then
return -1
endi
sql use tmqdb
sql show tables
print ===> $rows $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != stb0 then
return -1
endi
sql select count(*) from stb0
print ===> $rows $data00 $data01 $data02 $data03
if $rows != 1 then
return -1
endi
if $data00 != 10000 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册