提交 43d1e3f7 编写于 作者: H Haojun Liao

[td-11818] fix bug.

...@@ -320,7 +320,7 @@ typedef struct SEpSet { ...@@ -320,7 +320,7 @@ typedef struct SEpSet {
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
uint32_t clusterId; int64_t clusterId;
int32_t connId; int32_t connId;
int8_t superUser; int8_t superUser;
int8_t reserved[5]; int8_t reserved[5];
...@@ -644,20 +644,23 @@ typedef struct { ...@@ -644,20 +644,23 @@ typedef struct {
typedef struct { typedef struct {
int32_t sver; int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int64_t clusterId;
int64_t rebootTime; // time stamp for last reboot int64_t rebootTime;
int64_t updateTime;
int16_t numOfCores; int16_t numOfCores;
int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes; int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes;
char dnodeEp[TSDB_EP_LEN]; char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads; SVnodeLoads vnodeLoads;
} SStatusMsg; } SStatusMsg;
typedef struct {
int32_t reserved;
} STransMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int64_t clusterId;
int8_t dropped; int8_t dropped;
char reserved[7]; char reserved[7];
} SDnodeCfg; } SDnodeCfg;
......
...@@ -67,23 +67,21 @@ enum { ...@@ -67,23 +67,21 @@ enum {
#endif #endif
// Requests handled by DNODE // Requests handled by DNODE
TD_NEW_MSG_SEG(TDMT_DND_MSG) TD_NEW_MSG_SEG(TDMT_DND_MSG)
TD_DEF_MSG_TYPE(TDMT_DND_NETWORK_TEST, "dnode-nettest", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE, "dnode-alter-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_VNODE, "dnode-create-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_VNODE, "dnode-create-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE, "dnode-alter-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE, "dnode-alter-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_DROP_VNODE, "dnode-drop-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_DROP_VNODE, "dnode-drop-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_AUTH_VNODE, "dnode-auth-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_AUTH_VNODE, "dnode-auth-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYNC_VNODE, "dnode-sync-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYNC_VNODE, "dnode-sync-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_COMPACT_VNODE, "dnode-compact-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_COMPACT_VNODE, "dnode-compact-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE, "dnode-alter-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "dnode-config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "dnode-config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_NETWORK_TEST, "dnode-nettest", NULL, NULL)
// Requests handled by MNODE // Requests handled by MNODE
TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "mnode-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "mnode-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TABLE, "mnode-create-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TABLE, "mnode-drop-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "mnode-create-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "mnode-create-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "mnode-alter-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "mnode-alter-acct", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_ACCT, "mnode-drop-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_ACCT, "mnode-drop-acct", NULL, NULL)
...@@ -107,6 +105,7 @@ enum { ...@@ -107,6 +105,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STB_META, "mnode-stb-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL)
...@@ -114,6 +113,7 @@ enum { ...@@ -114,6 +113,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", NULL, NULL)
...@@ -125,10 +125,15 @@ enum { ...@@ -125,10 +125,15 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "vnode-submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "vnode-submit", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "vnode-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "vnode-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "vnode-fetch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "vnode-fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "vnode-create-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "vnode-alter-table", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "vnode-alter-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TABLE, "vnode-drop-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "vnode-update-tag-val", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "vnode-update-tag-val", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateTbReq, SVCreateTbRsp)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
...@@ -138,9 +143,6 @@ enum { ...@@ -138,9 +143,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-super-table", SVCreateTbReq, SVCreateTbRsp)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
......
...@@ -28,9 +28,7 @@ typedef struct SDnode SDnode; ...@@ -28,9 +28,7 @@ typedef struct SDnode SDnode;
typedef struct { typedef struct {
int32_t sver; int32_t sver;
int16_t numOfCores; int16_t numOfCores;
int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes; int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes;
int8_t enableTelem; int8_t enableTelem;
int32_t statusInterval; int32_t statusInterval;
float numOfThreadsPerCore; float numOfThreadsPerCore;
......
...@@ -56,7 +56,7 @@ typedef struct SMnodeCfg { ...@@ -56,7 +56,7 @@ typedef struct SMnodeCfg {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int64_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
......
...@@ -60,7 +60,7 @@ typedef struct SAppInstInfo { ...@@ -60,7 +60,7 @@ typedef struct SAppInstInfo {
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
SInstanceSummary summary; SInstanceSummary summary;
SList *pConnList; // STscObj linked list SList *pConnList; // STscObj linked list
uint32_t clusterId; int64_t clusterId;
void *pTransporter; void *pTransporter;
} SAppInstInfo; } SAppInstInfo;
......
...@@ -167,11 +167,11 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { ...@@ -167,11 +167,11 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
if (pDcl->msgType == TDMT_MND_CREATE_TABLE) { if (pDcl->msgType == TDMT_VND_CREATE_TABLE) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
char buf[12] = {0}; char buf[18] = {0};
sprintf(buf, "%d", pRequest->pTscObj->pAppInfo->clusterId); sprintf(buf, "%" PRId64, pRequest->pTscObj->pAppInfo->clusterId);
int32_t code = catalogGetHandle(buf, &pCatalog); int32_t code = catalogGetHandle(buf, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -230,11 +230,13 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -230,11 +230,13 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
if (qIsDdlQuery(pQuery)) { if (qIsDdlQuery(pQuery)) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
} else {
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
} }
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
_return: _return:
qDestoryQuery(pQuery); qDestoryQuery(pQuery);
......
...@@ -34,6 +34,8 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -34,6 +34,8 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code; pRequest->code = code;
terrno = code; terrno = code;
sem_post(&pRequest->body.rspSem);
return code; return code;
} }
...@@ -42,7 +44,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -42,7 +44,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData; SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData;
pConnect->acctId = htonl(pConnect->acctId); pConnect->acctId = htonl(pConnect->acctId);
pConnect->connId = htonl(pConnect->connId); pConnect->connId = htonl(pConnect->connId);
pConnect->clusterId = htonl(pConnect->clusterId); pConnect->clusterId = htobe64(pConnect->clusterId);
assert(pConnect->epSet.numOfEps > 0); assert(pConnect->epSet.numOfEps > 0);
for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
...@@ -64,8 +66,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -64,8 +66,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->clusterId = pConnect->clusterId; pTscObj->pAppInfo->clusterId = pConnect->clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns); tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
pTscObj->pAppInfo->numOfConns);
sem_post(&pRequest->body.rspSem); sem_post(&pRequest->body.rspSem);
return 0; return 0;
...@@ -289,6 +292,6 @@ void initMsgHandleFp() { ...@@ -289,6 +292,6 @@ void initMsgHandleFp() {
handleRequestRspFp[TDMT_MND_SHOW_RETRIEVE] = processRetrieveMnodeRsp; handleRequestRspFp[TDMT_MND_SHOW_RETRIEVE] = processRetrieveMnodeRsp;
handleRequestRspFp[TDMT_MND_CREATE_DB] = processCreateDbRsp; handleRequestRspFp[TDMT_MND_CREATE_DB] = processCreateDbRsp;
handleRequestRspFp[TDMT_MND_USE_DB] = processUseDbRsp; handleRequestRspFp[TDMT_MND_USE_DB] = processUseDbRsp;
handleRequestRspFp[TDMT_MND_CREATE_TABLE] = processCreateTableRsp; handleRequestRspFp[TDMT_MND_CREATE_STB] = processCreateTableRsp;
handleRequestRspFp[TDMT_MND_DROP_DB] = processDropDbRsp; handleRequestRspFp[TDMT_MND_DROP_DB] = processDropDbRsp;
} }
\ No newline at end of file
...@@ -160,6 +160,12 @@ TEST(testCase, create_db_Test) { ...@@ -160,6 +160,12 @@ TEST(testCase, create_db_Test) {
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
taos_close(pConn); taos_close(pConn);
} }
......
...@@ -139,9 +139,7 @@ void dmnWaitSignal() { ...@@ -139,9 +139,7 @@ void dmnWaitSignal() {
void dmnInitOption(SDnodeOpt *pOption) { void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = 30000000; //3.0.0.0 pOption->sver = 30000000; //3.0.0.0
pOption->numOfCores = tsNumOfCores; pOption->numOfCores = tsNumOfCores;
pOption->numOfSupportMnodes = 1;
pOption->numOfSupportVnodes = 1; pOption->numOfSupportVnodes = 1;
pOption->numOfSupportQnodes = 1;
pOption->statusInterval = tsStatusInterval; pOption->statusInterval = tsStatusInterval;
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
pOption->ratioOfQueryCores = tsRatioOfQueryCores; pOption->ratioOfQueryCores = tsRatioOfQueryCores;
......
...@@ -23,15 +23,16 @@ extern "C" { ...@@ -23,15 +23,16 @@ extern "C" {
int32_t dndInitDnode(SDnode *pDnode); int32_t dndInitDnode(SDnode *pDnode);
void dndCleanupDnode(SDnode *pDnode); void dndCleanupDnode(SDnode *pDnode);
void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndGetDnodeId(SDnode *pDnode); int32_t dndGetDnodeId(SDnode *pDnode);
int32_t dndGetClusterId(SDnode *pDnode); int64_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
void dndSendStatusMsg(SDnode *pDnode); void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
void dndSendStatusMsg(SDnode *pDnode);
void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,10 +22,11 @@ extern "C" { ...@@ -22,10 +22,11 @@ extern "C" {
#include "cJSON.h" #include "cJSON.h"
#include "os.h" #include "os.h"
#include "tmsg.h" #include "tep.h"
#include "thash.h" #include "thash.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tlog.h" #include "tlog.h"
#include "tmsg.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tthread.h" #include "tthread.h"
...@@ -51,21 +52,27 @@ typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); ...@@ -51,21 +52,27 @@ typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
typedef struct { typedef struct {
char *dnode; char *dnode;
char *mnode; char *mnode;
char *qnode;
char *snode;
char *bnode;
char *vnodes; char *vnodes;
} SDnodeDir; } SDnodeDir;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t dropped; int32_t dropped;
int32_t clusterId; int64_t clusterId;
int64_t rebootTime; int64_t rebootTime;
int8_t statusSent; int64_t updateTime;
SEpSet mnodeEpSet; int8_t statusSent;
char *file; SEpSet mnodeEpSet;
SHashObj *dnodeHash; char *file;
SDnodeEps *dnodeEps; SHashObj *dnodeHash;
pthread_t *threadId; SDnodeEps *dnodeEps;
SRWLatch latch; pthread_t *threadId;
SRWLatch latch;
taos_queue pMgmtQ;
SWorkerPool mgmtPool;
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {
...@@ -81,8 +88,6 @@ typedef struct { ...@@ -81,8 +88,6 @@ typedef struct {
taos_queue pReadQ; taos_queue pReadQ;
taos_queue pWriteQ; taos_queue pWriteQ;
taos_queue pSyncQ; taos_queue pSyncQ;
taos_queue pMgmtQ;
SWorkerPool mgmtPool;
SWorkerPool readPool; SWorkerPool readPool;
SWorkerPool writePool; SWorkerPool writePool;
SWorkerPool syncPool; SWorkerPool syncPool;
...@@ -93,8 +98,6 @@ typedef struct { ...@@ -93,8 +98,6 @@ typedef struct {
int32_t openVnodes; int32_t openVnodes;
int32_t totalVnodes; int32_t totalVnodes;
SRWLatch latch; SRWLatch latch;
taos_queue pMgmtQ;
SWorkerPool mgmtPool;
SWorkerPool queryPool; SWorkerPool queryPool;
SWorkerPool fetchPool; SWorkerPool fetchPool;
SMWorkerPool syncPool; SMWorkerPool syncPool;
......
...@@ -23,11 +23,14 @@ extern "C" { ...@@ -23,11 +23,14 @@ extern "C" {
int32_t dndInitMnode(SDnode *pDnode); int32_t dndInitMnode(SDnode *pDnode);
void dndCleanupMnode(SDnode *pDnode); void dndCleanupMnode(SDnode *pDnode);
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,12 +24,18 @@ extern "C" { ...@@ -24,12 +24,18 @@ extern "C" {
int32_t dndInitVnodes(SDnode *pDnode); int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode);
void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads); void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads);
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -17,7 +17,21 @@ ...@@ -17,7 +17,21 @@
#include "dndDnode.h" #include "dndDnode.h"
#include "dndTransport.h" #include "dndTransport.h"
#include "dndVnodes.h" #include "dndVnodes.h"
#include "tep.h"
static int32_t dndInitMgmtWorker(SDnode *pDnode);
static void dndCleanupMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMgmtQueue(SDnode *pDnode);
static void dndFreeMgmtQueue(SDnode *pDnode);
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndReadDnodes(SDnode *pDnode);
static int32_t dndWriteDnodes(SDnode *pDnode);
static void *dnodeThreadRoutine(void *param);
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg);
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg);
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg);
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dndGetDnodeId(SDnode *pDnode) { int32_t dndGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
...@@ -27,10 +41,10 @@ int32_t dndGetDnodeId(SDnode *pDnode) { ...@@ -27,10 +41,10 @@ int32_t dndGetDnodeId(SDnode *pDnode) {
return dnodeId; return dnodeId;
} }
int32_t dndGetClusterId(SDnode *pDnode) { int64_t dndGetClusterId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
int32_t clusterId = pMgmt->clusterId; int64_t clusterId = pMgmt->clusterId;
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
return clusterId; return clusterId;
} }
...@@ -68,7 +82,7 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -68,7 +82,7 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0}; SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet); dndGetMnodeEpSet(pDnode, &epSet);
dDebug("RPC %p, msg:%s is redirected, num:%d inUse:%d", pMsg->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) { if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) {
...@@ -82,7 +96,7 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -82,7 +96,7 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
} }
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
...@@ -165,7 +179,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) { ...@@ -165,7 +179,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 30000; int32_t maxLen = 256 * 1024;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
cJSON *root = NULL; cJSON *root = NULL;
FILE *fp = NULL; FILE *fp = NULL;
...@@ -198,11 +212,11 @@ static int32_t dndReadDnodes(SDnode *pDnode) { ...@@ -198,11 +212,11 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
pMgmt->dnodeId = dnodeId->valueint; pMgmt->dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_Number) { if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", pMgmt->file); dError("failed to read %s since clusterId not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pMgmt->clusterId = clusterId->valueint; pMgmt->clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) { if (!dropped || dropped->type != cJSON_Number) {
...@@ -217,20 +231,20 @@ static int32_t dndReadDnodes(SDnode *pDnode) { ...@@ -217,20 +231,20 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
int32_t numOfNodes = cJSON_GetArraySize(dnodes); int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
if (numOfNodes <= 0) { if (numOfDnodes <= 0) {
dError("failed to read %s since numOfNodes:%d invalid", pMgmt->file, numOfNodes); dError("failed to read %s since numOfDnodes:%d invalid", pMgmt->file, numOfDnodes);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pMgmt->dnodeEps = calloc(1, numOfNodes * sizeof(SDnodeEp) + sizeof(SDnodeEps)); pMgmt->dnodeEps = calloc(1, numOfDnodes * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (pMgmt->dnodeEps == NULL) { if (pMgmt->dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno)); dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pMgmt->dnodeEps->num = numOfNodes; pMgmt->dnodeEps->num = numOfDnodes;
for (int32_t i = 0; i < numOfNodes; ++i) { for (int32_t i = 0; i < numOfDnodes; ++i) {
cJSON *node = cJSON_GetArrayItem(dnodes, i); cJSON *node = cJSON_GetArrayItem(dnodes, i);
if (node == NULL) break; if (node == NULL) break;
...@@ -238,28 +252,28 @@ static int32_t dndReadDnodes(SDnode *pDnode) { ...@@ -238,28 +252,28 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
cJSON *dnodeId = cJSON_GetObjectItem(node, "id"); cJSON *dnodeId = cJSON_GetObjectItem(node, "id");
if (!dnodeId || dnodeId->type != cJSON_Number) { if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", pMgmt->file); dError("failed to read %s since dnodeId not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->id = dnodeId->valueint; pDnodeEp->id = dnodeId->valueint;
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", pMgmt->file); dError("failed to read %s since dnodeFqdn not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
if (!dnodePort || dnodePort->type != cJSON_Number) { if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s, dnodePort not found", pMgmt->file); dError("failed to read %s since dnodePort not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->port = dnodePort->valueint; pDnodeEp->port = dnodePort->valueint;
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
if (!isMnode || isMnode->type != cJSON_Number) { if (!isMnode || isMnode->type != cJSON_Number) {
dError("failed to read %s, isMnode not found", pMgmt->file); dError("failed to read %s since isMnode not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->isMnode = isMnode->valueint; pDnodeEp->isMnode = isMnode->valueint;
...@@ -282,7 +296,7 @@ PRASE_DNODE_OVER: ...@@ -282,7 +296,7 @@ PRASE_DNODE_OVER:
if (pMgmt->dnodeEps == NULL) { if (pMgmt->dnodeEps == NULL) {
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
pMgmt->dnodeEps->num = 1; pMgmt->dnodeEps->num = 1;
pMgmt->dnodeEps->eps[0].isMnode = 1; pMgmt->dnodeEps->eps[0].isMnode = 1;
taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port); taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port);
} }
...@@ -303,12 +317,12 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { ...@@ -303,12 +317,12 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
} }
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 30000; int32_t maxLen = 256 * 1024;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": %d,\n", pMgmt->clusterId); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
...@@ -331,6 +345,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { ...@@ -331,6 +345,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
free(content); free(content);
terrno = 0; terrno = 0;
pMgmt->updateTime = taosGetTimestampMs();
dInfo("successed to write %s", pMgmt->file); dInfo("successed to write %s", pMgmt->file);
return 0; return 0;
} }
...@@ -348,12 +363,11 @@ void dndSendStatusMsg(SDnode *pDnode) { ...@@ -348,12 +363,11 @@ void dndSendStatusMsg(SDnode *pDnode) {
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pStatus->sver = htonl(pDnode->opt.sver); pStatus->sver = htonl(pDnode->opt.sver);
pStatus->dnodeId = htonl(pMgmt->dnodeId); pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htonl(pMgmt->clusterId); pStatus->clusterId = htobe64(pMgmt->clusterId);
pStatus->rebootTime = htobe64(pMgmt->rebootTime); pStatus->rebootTime = htobe64(pMgmt->rebootTime);
pStatus->updateTime = htobe64(pMgmt->updateTime);
pStatus->numOfCores = htons(pDnode->opt.numOfCores); pStatus->numOfCores = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores); pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfSupportVnodes);
pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportQnodes = htons(pDnode->opt.numOfCores);
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
...@@ -379,7 +393,7 @@ void dndSendStatusMsg(SDnode *pDnode) { ...@@ -379,7 +393,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) { if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) {
dInfo("set dnodeId:%d clusterId:%d dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); dInfo("set dnodeId:%d clusterId:% " PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId; pMgmt->dnodeId = pCfg->dnodeId;
...@@ -410,13 +424,9 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { ...@@ -410,13 +424,9 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
} }
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0) {
dndUpdateMnodeEpSet(pDnode, pEpSet);
}
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
return; return;
...@@ -425,7 +435,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -425,7 +435,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SStatusRsp *pRsp = pMsg->pCont; SStatusRsp *pRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pRsp->dnodeCfg; SDnodeCfg *pCfg = &pRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htonl(pCfg->clusterId); pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg); dndUpdateDnodeCfg(pDnode, pCfg);
if (pCfg->dropped) { if (pCfg->dropped) {
...@@ -444,9 +454,9 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -444,9 +454,9 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
} }
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); }
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); }
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
dError("config msg is received, but not supported yet"); dError("config msg is received, but not supported yet");
...@@ -457,7 +467,7 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -457,7 +467,7 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
} }
static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
dDebug("startup msg is received"); dDebug("startup msg is received");
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
...@@ -491,6 +501,7 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -491,6 +501,7 @@ int32_t dndInitDnode(SDnode *pDnode) {
pMgmt->rebootTime = taosGetTimestampMs(); pMgmt->rebootTime = taosGetTimestampMs();
pMgmt->dropped = 0; pMgmt->dropped = 0;
pMgmt->clusterId = 0; pMgmt->clusterId = 0;
taosInitRWLatch(&pMgmt->latch);
char path[PATH_MAX]; char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode); snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode);
...@@ -512,7 +523,15 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -512,7 +523,15 @@ int32_t dndInitDnode(SDnode *pDnode) {
return -1; return -1;
} }
taosInitRWLatch(&pMgmt->latch); if (dndInitMgmtWorker(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndAllocMgmtQueue(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode);
if (pMgmt->threadId == NULL) { if (pMgmt->threadId == NULL) {
...@@ -528,6 +547,9 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -528,6 +547,9 @@ int32_t dndInitDnode(SDnode *pDnode) {
void dndCleanupDnode(SDnode *pDnode) { void dndCleanupDnode(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupMgmtWorker(pDnode);
dndFreeMgmtQueue(pDnode);
if (pMgmt->threadId != NULL) { if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId); taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL; pMgmt->threadId = NULL;
...@@ -554,39 +576,123 @@ void dndCleanupDnode(SDnode *pDnode) { ...@@ -554,39 +576,123 @@ void dndCleanupDnode(SDnode *pDnode) {
dInfo("dnode-dnode is cleaned up"); dInfo("dnode-dnode is cleaned up");
} }
void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static int32_t dndInitMgmtWorker(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "dnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("dnode mgmt worker is initialized");
return 0;
}
static void dndCleanupMgmtWorker(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
tWorkerCleanup(&pMgmt->mgmtPool);
dDebug("dnode mgmt worker is closed");
}
static int32_t dndAllocMgmtQueue(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMgmtQueue(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
pMgmt->pMgmtQ = NULL;
}
void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0 && pRpcMsg->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pDnode, pEpSet);
}
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg != NULL) *pMsg = *pRpcMsg;
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
}
rpcFreeCont(pRpcMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_DND_CREATE_MNODE:
code = dndProcessCreateMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_MNODE:
code = dndProcessAlterMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_MNODE:
code = dndProcessDropMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_NETWORK_TEST: case TDMT_DND_NETWORK_TEST:
dndProcessStartupReq(pDnode, pMsg); dndProcessStartupReq(pDnode, pMsg);
break; break;
case TDMT_DND_CONFIG_DNODE: case TDMT_DND_CONFIG_DNODE:
dndProcessConfigDnodeReq(pDnode, pMsg); dndProcessConfigDnodeReq(pDnode, pMsg);
break; break;
default:
dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
switch (pMsg->msgType) {
case TDMT_MND_STATUS_RSP: case TDMT_MND_STATUS_RSP:
dndProcessStatusRsp(pDnode, pMsg, pEpSet); dndProcessStatusRsp(pDnode, pMsg);
break; break;
case TDMT_MND_AUTH_RSP: case TDMT_MND_AUTH_RSP:
dndProcessAuthRsp(pDnode, pMsg, pEpSet); dndProcessAuthRsp(pDnode, pMsg);
break; break;
case TDMT_MND_GRANT_RSP: case TDMT_MND_GRANT_RSP:
dndProcessGrantRsp(pDnode, pMsg, pEpSet); dndProcessGrantRsp(pDnode, pMsg);
break;
case TDMT_DND_CREATE_VNODE:
code = dndProcessCreateVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = dndProcessAlterVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = dndProcessDropVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_AUTH_VNODE:
code = dndProcessAuthVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = dndProcessSyncVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = dndProcessCompactVnodeReq(pDnode, pMsg);
break; break;
default: default:
dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
break;
}
if (pMsg->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
rpcSendResponse(&rsp);
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
taosFreeQitem(pMsg);
} }
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
static int32_t dndInitMnodeReadWorker(SDnode *pDnode); static int32_t dndInitMnodeReadWorker(SDnode *pDnode);
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode); static int32_t dndInitMnodeWriteWorker(SDnode *pDnode);
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode); static int32_t dndInitMnodeSyncWorker(SDnode *pDnode);
static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode);
static void dndCleanupMnodeReadWorker(SDnode *pDnode); static void dndCleanupMnodeReadWorker(SDnode *pDnode);
static void dndCleanupMnodeWriteWorker(SDnode *pDnode); static void dndCleanupMnodeWriteWorker(SDnode *pDnode);
static void dndCleanupMnodeSyncWorker(SDnode *pDnode); static void dndCleanupMnodeSyncWorker(SDnode *pDnode);
...@@ -29,7 +28,6 @@ static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); ...@@ -29,7 +28,6 @@ static void dndCleanupMnodeMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); static int32_t dndAllocMnodeReadQueue(SDnode *pDnode);
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode);
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode);
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode);
static void dndFreeMnodeReadQueue(SDnode *pDnode); static void dndFreeMnodeReadQueue(SDnode *pDnode);
static void dndFreeMnodeWriteQueue(SDnode *pDnode); static void dndFreeMnodeWriteQueue(SDnode *pDnode);
static void dndFreeMnodeSyncQueue(SDnode *pDnode); static void dndFreeMnodeSyncQueue(SDnode *pDnode);
...@@ -38,12 +36,10 @@ static void dndFreeMnodeMgmtQueue(SDnode *pDnode); ...@@ -38,12 +36,10 @@ static void dndFreeMnodeMgmtQueue(SDnode *pDnode);
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndStartMnodeWorker(SDnode *pDnode); static int32_t dndStartMnodeWorker(SDnode *pDnode);
static void dndStopMnodeWorker(SDnode *pDnode); static void dndStopMnodeWorker(SDnode *pDnode);
...@@ -58,10 +54,6 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption); ...@@ -58,10 +54,6 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption); static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndDropMnode(SDnode *pDnode); static int32_t dndDropMnode(SDnode *pDnode);
static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static SMnode *dndAcquireMnode(SDnode *pDnode) { static SMnode *dndAcquireMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = NULL; SMnode *pMnode = NULL;
...@@ -488,7 +480,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { ...@@ -488,7 +480,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
return pMsg; return pMsg;
} }
static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
...@@ -504,7 +496,7 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -504,7 +496,7 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
} }
} }
static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
...@@ -524,7 +516,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -524,7 +516,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
return dndWriteMnodeFile(pDnode); return dndWriteMnodeFile(pDnode);
} }
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDropMnodeInMsg *pMsg = pRpcMsg->pCont; SDropMnodeInMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId); pMsg->dnodeId = htonl(pMsg->dnodeId);
...@@ -536,33 +528,6 @@ static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -536,33 +528,6 @@ static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
} }
} }
static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) {
case TDMT_DND_CREATE_MNODE:
code = dndProcessCreateMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_MNODE:
code = dndProcessAlterMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_MNODE:
code = dndProcessDropMnodeReq(pDnode, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
break;
}
if (pMsg->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
...@@ -622,25 +587,6 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs ...@@ -622,25 +587,6 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs
return 0; return 0;
} }
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg != NULL) *pMsg = *pRpcMsg;
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
}
rpcFreeCont(pRpcMsg->pCont);
taosFreeQitem(pMsg);
}
dndReleaseMnode(pDnode, pMnode);
}
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
...@@ -686,42 +632,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -686,42 +632,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
} }
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMnodeMgmtQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
pMgmt->pMgmtQ = NULL;
}
static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "mnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("mnode mgmt worker is initialized");
return 0;
}
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->mgmtPool);
dDebug("mnode mgmt worker is closed");
}
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
...@@ -842,16 +752,6 @@ int32_t dndInitMnode(SDnode *pDnode) { ...@@ -842,16 +752,6 @@ int32_t dndInitMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosInitRWLatch(&pMgmt->latch); taosInitRWLatch(&pMgmt->latch);
if (dndInitMnodeMgmtWorker(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndAllocMnodeMgmtQueue(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
char path[PATH_MAX]; char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode);
pMgmt->file = strdup(path); pMgmt->file = strdup(path);
...@@ -894,8 +794,6 @@ void dndCleanupMnode(SDnode *pDnode) { ...@@ -894,8 +794,6 @@ void dndCleanupMnode(SDnode *pDnode) {
dInfo("dnode-mnode start to clean up"); dInfo("dnode-mnode start to clean up");
if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); if (pMgmt->pMnode) dndStopMnodeWorker(pDnode);
dndCleanupMnodeMgmtWorker(pDnode);
dndFreeMnodeMgmtQueue(pDnode);
tfree(pMgmt->file); tfree(pMgmt->file);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
dInfo("dnode-mnode is cleaned up"); dInfo("dnode-mnode is cleaned up");
......
...@@ -30,27 +30,30 @@ ...@@ -30,27 +30,30 @@
#define INTERNAL_SECRET "_secret" #define INTERNAL_SECRET "_secret"
static void dndInitMsgFp(STransMgmt *pMgmt) { static void dndInitMsgFp(STransMgmt *pMgmt) {
// msg from client to dnode // Requests handled by DNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg;
// msg from client to mnode // Requests handled by MNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = dndProcessMnodeWriteMsg;
...@@ -75,53 +78,51 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -75,53 +78,51 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STB_META)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
// message from client to dnode // Requests handled by VNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessDnodeReq; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
// message from mnode to vnode pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
// message from mnode to dnode pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessVnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessDnodeReq;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg;
// message from dnode to mnode
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessDnodeRsp;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessDnodeRsp;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessDnodeRsp;
} }
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
...@@ -189,7 +190,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -189,7 +190,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
tmsg_t msgType = pMsg->msgType; tmsg_t msgType = pMsg->msgType;
if (msgType == TDMT_DND_NETWORK_TEST) { if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code);
dndProcessDnodeReq(pDnode, pMsg, pEpSet); dndProcessStartupReq(pDnode, pMsg);
return; return;
} }
...@@ -208,7 +209,8 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -208,7 +209,8 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
} }
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType),
pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN}; SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
return; return;
......
...@@ -56,11 +56,9 @@ typedef struct { ...@@ -56,11 +56,9 @@ typedef struct {
static int32_t dndInitVnodeReadWorker(SDnode *pDnode); static int32_t dndInitVnodeReadWorker(SDnode *pDnode);
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode); static int32_t dndInitVnodeWriteWorker(SDnode *pDnode);
static int32_t dndInitVnodeSyncWorker(SDnode *pDnode); static int32_t dndInitVnodeSyncWorker(SDnode *pDnode);
static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode);
static void dndCleanupVnodeReadWorker(SDnode *pDnode); static void dndCleanupVnodeReadWorker(SDnode *pDnode);
static void dndCleanupVnodeWriteWorker(SDnode *pDnode); static void dndCleanupVnodeWriteWorker(SDnode *pDnode);
static void dndCleanupVnodeSyncWorker(SDnode *pDnode); static void dndCleanupVnodeSyncWorker(SDnode *pDnode);
static void dndCleanupVnodeMgmtWorker(SDnode *pDnode);
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode);
...@@ -77,12 +75,10 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); ...@@ -77,12 +75,10 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
...@@ -96,13 +92,6 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode); ...@@ -96,13 +92,6 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode);
static int32_t dndOpenVnodes(SDnode *pDnode); static int32_t dndOpenVnodes(SDnode *pDnode);
static void dndCloseVnodes(SDnode *pDnode); static void dndCloseVnodes(SDnode *pDnode);
static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = NULL; SVnodeObj *pVnode = NULL;
...@@ -600,7 +589,7 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { ...@@ -600,7 +589,7 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) {
return pAuth; return pAuth;
} }
static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg); SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg);
dDebug("vgId:%d, create vnode req is received", pCreate->vgId); dDebug("vgId:%d, create vnode req is received", pCreate->vgId);
...@@ -641,7 +630,7 @@ static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { ...@@ -641,7 +630,7 @@ static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0; return 0;
} }
static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg); SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg);
dDebug("vgId:%d, alter vnode req is received", pAlter->vgId); dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
...@@ -680,7 +669,7 @@ static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { ...@@ -680,7 +669,7 @@ static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return code; return code;
} }
static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg); SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg);
int32_t vgId = pDrop->vgId; int32_t vgId = pDrop->vgId;
...@@ -707,7 +696,7 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { ...@@ -707,7 +696,7 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0; return 0;
} }
static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
int32_t code = 0; int32_t code = 0;
...@@ -725,7 +714,7 @@ static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { ...@@ -725,7 +714,7 @@ static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0; return 0;
} }
static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
int32_t vgId = pAuth->vgId; int32_t vgId = pAuth->vgId;
...@@ -747,7 +736,7 @@ static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { ...@@ -747,7 +736,7 @@ static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0; return 0;
} }
static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg); SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
int32_t vgId = pCompact->vgId; int32_t vgId = pCompact->vgId;
...@@ -769,39 +758,6 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { ...@@ -769,39 +758,6 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0; return 0;
} }
static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) {
case TDMT_DND_CREATE_VNODE:
code = dndProcessCreateVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = dndProcessAlterVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = dndProcessDropVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_AUTH_VNODE:
code = dndProcessAuthVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = dndProcessSyncVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = dndProcessCompactVnodeReq(pDnode, pMsg);
break;
default:
code = TSDB_CODE_MSG_NOT_PROCESSED;
break;
}
SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp); vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp);
...@@ -909,11 +865,6 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -909,11 +865,6 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
return pVnode; return pVnode;
} }
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
dndWriteRpcMsgToVnodeQueue(pMgmt->pMgmtQ, pMsg);
}
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
...@@ -957,35 +908,6 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMs ...@@ -957,35 +908,6 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMs
return code; return code;
} }
static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "vnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("vnode mgmt worker is initialized");
return 0;
}
static void dndCleanupVnodeMgmtWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
tWorkerCleanup(&pMgmt->mgmtPool);
pMgmt->pMgmtQ = NULL;
dDebug("vnode mgmt worker is closed");
}
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);
...@@ -1167,11 +1089,6 @@ int32_t dndInitVnodes(SDnode *pDnode) { ...@@ -1167,11 +1089,6 @@ int32_t dndInitVnodes(SDnode *pDnode) {
return -1; return -1;
} }
if (dndInitVnodeMgmtWorker(pDnode) != 0) {
dError("failed to init vnodes mgmt worker since %s", terrstr());
return -1;
}
if (dndOpenVnodes(pDnode) != 0) { if (dndOpenVnodes(pDnode) != 0) {
dError("failed to open vnodes since %s", terrstr()); dError("failed to open vnodes since %s", terrstr());
return -1; return -1;
...@@ -1187,7 +1104,6 @@ void dndCleanupVnodes(SDnode *pDnode) { ...@@ -1187,7 +1104,6 @@ void dndCleanupVnodes(SDnode *pDnode) {
dndCleanupVnodeReadWorker(pDnode); dndCleanupVnodeReadWorker(pDnode);
dndCleanupVnodeWriteWorker(pDnode); dndCleanupVnodeWriteWorker(pDnode);
dndCleanupVnodeSyncWorker(pDnode); dndCleanupVnodeSyncWorker(pDnode);
dndCleanupVnodeMgmtWorker(pDnode);
dInfo("dnode-vnodes is cleaned up"); dInfo("dnode-vnodes is cleaned up");
} }
......
...@@ -28,14 +28,14 @@ Testbase DndTestCluster::test; ...@@ -28,14 +28,14 @@ Testbase DndTestCluster::test;
TEST_F(DndTestCluster, 01_ShowCluster) { TEST_F(DndTestCluster, 01_ShowCluster) {
test.SendShowMetaMsg(TSDB_MGMT_TABLE_CLUSTER, ""); test.SendShowMetaMsg(TSDB_MGMT_TABLE_CLUSTER, "");
CHECK_META( "show cluster", 3); CHECK_META( "show cluster", 3);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id"); CHECK_SCHEMA(0, TSDB_DATA_TYPE_BIGINT, 8, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, "name"); CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, "name");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
test.SendShowRetrieveMsg(); test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
IgnoreInt32(); IgnoreInt64();
IgnoreBinary(TSDB_CLUSTER_ID_LEN); IgnoreBinary(TSDB_CLUSTER_ID_LEN);
CheckTimestamp(); CheckTimestamp();
} }
\ No newline at end of file
...@@ -42,7 +42,7 @@ TEST_F(DndTestProfile, 01_ConnectMsg) { ...@@ -42,7 +42,7 @@ TEST_F(DndTestProfile, 01_ConnectMsg) {
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
pRsp->acctId = htonl(pRsp->acctId); pRsp->acctId = htonl(pRsp->acctId);
pRsp->clusterId = htonl(pRsp->clusterId); pRsp->clusterId = htobe64(pRsp->clusterId);
pRsp->connId = htonl(pRsp->connId); pRsp->connId = htonl(pRsp->connId);
pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]);
...@@ -174,7 +174,7 @@ TEST_F(DndTestProfile, 05_KillConnMsg) { ...@@ -174,7 +174,7 @@ TEST_F(DndTestProfile, 05_KillConnMsg) {
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
pRsp->acctId = htonl(pRsp->acctId); pRsp->acctId = htonl(pRsp->acctId);
pRsp->clusterId = htonl(pRsp->clusterId); pRsp->clusterId = htobe64(pRsp->clusterId);
pRsp->connId = htonl(pRsp->connId); pRsp->connId = htonl(pRsp->connId);
pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]);
......
...@@ -133,7 +133,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -133,7 +133,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(contLen); STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(contLen);
strcpy(pReq->tableFname, "1.d1.stb"); strcpy(pReq->tableFname, "1.d1.stb");
SRpcMsg* pMsg = test.SendMsg(TDMT_VND_TABLE_META, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_STB_META, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
......
...@@ -26,9 +26,7 @@ SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t p ...@@ -26,9 +26,7 @@ SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t p
SDnodeOpt option = {0}; SDnodeOpt option = {0};
option.sver = 1; option.sver = 1;
option.numOfCores = 1; option.numOfCores = 1;
option.numOfSupportMnodes = 1;
option.numOfSupportVnodes = 1; option.numOfSupportVnodes = 1;
option.numOfSupportQnodes = 1;
option.statusInterval = 1; option.statusInterval = 1;
option.numOfThreadsPerCore = 1; option.numOfThreadsPerCore = 1;
option.ratioOfQueryCores = 1; option.ratioOfQueryCores = 1;
......
...@@ -62,10 +62,14 @@ typedef enum { ...@@ -62,10 +62,14 @@ typedef enum {
typedef enum { typedef enum {
TRN_STAGE_PREPARE = 0, TRN_STAGE_PREPARE = 0,
TRN_STAGE_EXECUTE = 1, TRN_STAGE_REDO_LOG = 1,
TRN_STAGE_ROLLBACK = 2, TRN_STAGE_REDO_ACTION = 2,
TRN_STAGE_COMMIT = 3, TRN_STAGE_UNDO_LOG = 3,
TRN_STAGE_OVER = 4, TRN_STAGE_UNDO_ACTION = 4,
TRN_STAGE_COMMIT_LOG = 5,
TRN_STAGE_COMMIT = 6,
TRN_STAGE_ROLLBACK = 7,
TRN_STAGE_FINISHED = 8
} ETrnStage; } ETrnStage;
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
...@@ -95,7 +99,8 @@ typedef struct { ...@@ -95,7 +99,8 @@ typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
int32_t retryTimes; int32_t code;
int32_t failedTimes;
void *rpcHandle; void *rpcHandle;
void *rpcAHandle; void *rpcAHandle;
SArray *redoLogs; SArray *redoLogs;
...@@ -106,7 +111,7 @@ typedef struct { ...@@ -106,7 +111,7 @@ typedef struct {
} STrans; } STrans;
typedef struct { typedef struct {
int32_t id; int64_t id;
char name[TSDB_CLUSTER_ID_LEN]; char name[TSDB_CLUSTER_ID_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
...@@ -313,12 +318,6 @@ typedef struct SMnodeMsg { ...@@ -313,12 +318,6 @@ typedef struct SMnodeMsg {
void *pCont; void *pCont;
} SMnodeMsg; } SMnodeMsg;
typedef struct {
int32_t id;
int32_t code;
void *rpcHandle;
} STransMsg;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -70,11 +70,12 @@ typedef struct { ...@@ -70,11 +70,12 @@ typedef struct {
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int64_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer; tmr_h timer;
tmr_h transTimer;
char *path; char *path;
SMnodeCfg cfg; SMnodeCfg cfg;
int64_t checkTime; int64_t checkTime;
......
...@@ -44,11 +44,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); ...@@ -44,11 +44,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); void mndTransProcessRsp(SMnodeMsg *pMsg);
void mndTransHandleActionRsp(SMnodeMsg *pMsg);
char *mndTransStageStr(ETrnStage stage);
char *mndTransPolicyStr(ETrnPolicy policy);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -67,7 +67,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { ...@@ -67,7 +67,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pCluster->id); SDB_SET_INT64(pRaw, dataPos, pCluster->id);
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime) SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime) SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN)
...@@ -91,7 +91,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { ...@@ -91,7 +91,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
if (pCluster == NULL) return NULL; if (pCluster == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime)
SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN) SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN)
...@@ -101,17 +101,17 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { ...@@ -101,17 +101,17 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
} }
static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%d, perform insert action", pCluster->id); mTrace("cluster:%" PRId64 ", perform insert action", pCluster->id);
return 0; return 0;
} }
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%d, perform delete action", pCluster->id); mTrace("cluster:%" PRId64 ", perform delete action", pCluster->id);
return 0; return 0;
} }
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster) { static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster) {
mTrace("cluster:%d, perform update action", pOldCluster->id); mTrace("cluster:%" PRId64 ", perform update action", pOldCluster->id);
return 0; return 0;
} }
...@@ -125,17 +125,17 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -125,17 +125,17 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
strcpy(clusterObj.name, "tdengine2.0"); strcpy(clusterObj.name, "tdengine2.0");
mError("failed to get name from system, set to default val %s", clusterObj.name); mError("failed to get name from system, set to default val %s", clusterObj.name);
} else { } else {
mDebug("cluster:%d, name is %s", clusterObj.id, clusterObj.name); mDebug("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
} }
clusterObj.id = MurmurHash3_32(clusterObj.name, TSDB_CLUSTER_ID_LEN); clusterObj.id = MurmurHash3_32(clusterObj.name, TSDB_CLUSTER_ID_LEN);
clusterObj.id = abs(clusterObj.id); clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id);
pMnode->clusterId = clusterObj.id; pMnode->clusterId = clusterObj.id;
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("cluster:%d, will be created while deploy sdb", clusterObj.id); mDebug("cluster:%" PRId64 ", will be created while deploy sdb", clusterObj.id);
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
...@@ -143,8 +143,8 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg ...@@ -143,8 +143,8 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema; SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_INT; pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "id"); strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]); pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++; cols++;
...@@ -192,7 +192,7 @@ static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, ...@@ -192,7 +192,7 @@ static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
cols = 0; cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pCluster->id; *(int64_t *)pWrite = pCluster->id;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
...@@ -276,12 +276,11 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { ...@@ -276,12 +276,11 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
static void mndParseStatusMsg(SStatusMsg *pStatus) { static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->sver = htonl(pStatus->sver); pStatus->sver = htonl(pStatus->sver);
pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htonl(pStatus->clusterId); pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->rebootTime = htobe64(pStatus->rebootTime); pStatus->rebootTime = htobe64(pStatus->rebootTime);
pStatus->updateTime = htobe64(pStatus->updateTime);
pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes);
pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes); pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes);
pStatus->numOfSupportQnodes = htons(pStatus->numOfSupportQnodes);
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
} }
...@@ -324,13 +323,14 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { ...@@ -324,13 +323,14 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
} }
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %d", pDnode->id, pDnode->ep, pMnode->clusterId); mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else { } else {
if (pStatus->clusterId != pMnode->clusterId) { if (pStatus->clusterId != pMnode->clusterId) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
} }
mError("dnode:%d, clusterId %d not match exist %d", pDnode->id, pStatus->clusterId, pMnode->clusterId); mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
pMnode->clusterId);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID; terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID;
return -1; return -1;
...@@ -356,9 +356,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { ...@@ -356,9 +356,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pDnode->rebootTime = pStatus->rebootTime; pDnode->rebootTime = pStatus->rebootTime;
pDnode->numOfCores = pStatus->numOfCores; pDnode->numOfCores = pStatus->numOfCores;
pDnode->numOfSupportMnodes = pStatus->numOfSupportMnodes;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
pDnode->numOfSupportQnodes = pStatus->numOfSupportQnodes;
pDnode->lastAccessTime = taosGetTimestampMs(); pDnode->lastAccessTime = taosGetTimestampMs();
pDnode->status = DND_STATUS_READY; pDnode->status = DND_STATUS_READY;
...@@ -373,7 +371,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { ...@@ -373,7 +371,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.dropped = 0; pRsp->dnodeCfg.dropped = 0;
pRsp->dnodeCfg.clusterId = htonl(pMnode->clusterId); pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pMsg->contLen = contLen; pMsg->contLen = contLen;
......
...@@ -567,17 +567,17 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { ...@@ -567,17 +567,17 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
......
...@@ -225,7 +225,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -225,7 +225,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
} }
pRsp->clusterId = htonl(pMnode->clusterId); pRsp->clusterId = htobe64(pMnode->clusterId);
pRsp->connId = htonl(pConn->id); pRsp->connId = htonl(pConn->id);
mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndGetMnodeEpSet(pMnode, &pRsp->epSet);
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
......
...@@ -58,7 +58,7 @@ int32_t mndInitStb(SMnode *pMnode) { ...@@ -58,7 +58,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_TABLE_META, mndProcessStbMetaMsg); mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaMsg);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
...@@ -552,7 +552,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { ...@@ -552,7 +552,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
...@@ -616,7 +616,7 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { ...@@ -616,7 +616,7 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
...@@ -728,7 +728,7 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { ...@@ -728,7 +728,7 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
...@@ -886,14 +886,19 @@ static void mndExtractTableName(char *tableId, char *name) { ...@@ -886,14 +886,19 @@ static void mndExtractTableName(char *tableId, char *name) {
} }
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode * pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SSdb * pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
int32_t cols = 0; int32_t cols = 0;
char * pWrite; char *pWrite;
char prefix[64] = {0}; char prefix[64] = {0};
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) {
return TSDB_CODE_MND_INVALID_DB;
}
tstrncpy(prefix, pShow->db, 64); tstrncpy(prefix, pShow->db, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix); int32_t prefixLen = (int32_t)strlen(prefix);
...@@ -902,7 +907,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 ...@@ -902,7 +907,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb); pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
if (strncmp(pStb->name, prefix, prefixLen) != 0) { if (pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb); sdbRelease(pSdb, pStb);
continue; continue;
} }
...@@ -931,6 +936,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 ...@@ -931,6 +936,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
sdbRelease(pSdb, pStb); sdbRelease(pSdb, pStb);
} }
mndReleaseDb(pMnode, pDb);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows; return numOfRows;
......
...@@ -524,7 +524,7 @@ static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) { ...@@ -524,7 +524,7 @@ static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
...@@ -636,7 +636,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { ...@@ -636,7 +636,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
...@@ -706,7 +706,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { ...@@ -706,7 +706,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
......
...@@ -333,17 +333,17 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { ...@@ -333,17 +333,17 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
} }
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg); mndTransProcessRsp(pMsg);
return 0; return 0;
} }
......
...@@ -50,6 +50,20 @@ void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) { ...@@ -50,6 +50,20 @@ void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) {
} }
} }
static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param;
if (mndIsMaster(pMnode)) {
STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg));
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
epSet.port[0] = pMnode->replicas[pMnode->selfIndex].port;
memcpy(epSet.fqdn[0], pMnode->replicas[pMnode->selfIndex].fqdn, TSDB_FQDN_LEN);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)};
mndSendMsgToDnode(pMnode, &epSet, &rpcMsg);
}
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
}
static int32_t mndInitTimer(SMnode *pMnode) { static int32_t mndInitTimer(SMnode *pMnode) {
if (pMnode->timer == NULL) { if (pMnode->timer == NULL) {
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND"); pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
...@@ -60,11 +74,18 @@ static int32_t mndInitTimer(SMnode *pMnode) { ...@@ -60,11 +74,18 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1; return -1;
} }
if (taosTmrReset(mndTransReExecute, 1000, pMnode, pMnode->timer, &pMnode->transTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0; return 0;
} }
static void mndCleanupTimer(SMnode *pMnode) { static void mndCleanupTimer(SMnode *pMnode) {
if (pMnode->timer != NULL) { if (pMnode->timer != NULL) {
taosTmrStop(pMnode->transTimer);
pMnode->transTimer = NULL;
taosTmrCleanUp(pMnode->timer); taosTmrCleanUp(pMnode->timer);
pMnode->timer = NULL; pMnode->timer = NULL;
} }
......
...@@ -123,6 +123,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -123,6 +123,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosWLockLatch(pLock);
// remove attached object such as trans
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
if (deleteFp != NULL) (*deleteFp)(pSdb, pRow->pObj);
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
......
...@@ -23,7 +23,7 @@ int vnodeBuildReq(void **buf, const SVnodeReq *pReq, tmsg_t type) { ...@@ -23,7 +23,7 @@ int vnodeBuildReq(void **buf, const SVnodeReq *pReq, tmsg_t type) {
tsize += taosEncodeFixedU64(buf, pReq->ver); tsize += taosEncodeFixedU64(buf, pReq->ver);
switch (type) { switch (type) {
case TDMT_MND_CREATE_TABLE: case TDMT_VND_CREATE_STB:
tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq)); tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq));
break; break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
...@@ -40,7 +40,7 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, tmsg_t type) { ...@@ -40,7 +40,7 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, tmsg_t type) {
buf = taosDecodeFixedU64(buf, &(pReq->ver)); buf = taosDecodeFixedU64(buf, &(pReq->ver));
switch (type) { switch (type) {
case TDMT_MND_CREATE_TABLE: case TDMT_VND_CREATE_STB:
buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq)); buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq));
break; break;
......
...@@ -78,7 +78,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -78,7 +78,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: maybe need to clear the requst struct // TODO: maybe need to clear the requst struct
break; break;
case TDMT_VND_DROP_STB: case TDMT_VND_DROP_STB:
case TDMT_MND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// TODO: handle error // TODO: handle error
} }
......
...@@ -84,14 +84,14 @@ static void vtBuildCreateStbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) { ...@@ -84,14 +84,14 @@ static void vtBuildCreateStbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
SVnodeReq vCreateSTbReq; SVnodeReq vCreateSTbReq;
vnodeSetCreateStbReq(&vCreateSTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema); vnodeSetCreateStbReq(&vCreateSTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
zs = vnodeBuildReq(NULL, &vCreateSTbReq, TDMT_MND_CREATE_TABLE); zs = vnodeBuildReq(NULL, &vCreateSTbReq, TDMT_VND_CREATE_STB);
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs); pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
pMsg->msgType = TDMT_MND_CREATE_TABLE; pMsg->msgType = TDMT_VND_CREATE_STB;
pMsg->contLen = zs; pMsg->contLen = zs;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg)); pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg));
pBuf = pMsg->pCont; pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateSTbReq, TDMT_MND_CREATE_TABLE); vnodeBuildReq(&pBuf, &vCreateSTbReq, TDMT_VND_CREATE_STB);
META_CLEAR_TB_CFG(&vCreateSTbReq); META_CLEAR_TB_CFG(&vCreateSTbReq);
tdFreeSchema(pSchema); tdFreeSchema(pSchema);
...@@ -108,14 +108,14 @@ static void vtBuildCreateCtbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) { ...@@ -108,14 +108,14 @@ static void vtBuildCreateCtbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
SVnodeReq vCreateCTbReq; SVnodeReq vCreateCTbReq;
vnodeSetCreateCtbReq(&vCreateCTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pTag); vnodeSetCreateCtbReq(&vCreateCTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pTag);
tz = vnodeBuildReq(NULL, &vCreateCTbReq, TDMT_MND_CREATE_TABLE); tz = vnodeBuildReq(NULL, &vCreateCTbReq, TDMT_VND_CREATE_TABLE);
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz); pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz);
pMsg->msgType = TDMT_MND_CREATE_TABLE; pMsg->msgType = TDMT_VND_CREATE_TABLE;
pMsg->contLen = tz; pMsg->contLen = tz;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg)); pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
void *pBuf = pMsg->pCont; void *pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateCTbReq, TDMT_MND_CREATE_TABLE); vnodeBuildReq(&pBuf, &vCreateCTbReq, TDMT_VND_CREATE_TABLE);
META_CLEAR_TB_CFG(&vCreateCTbReq); META_CLEAR_TB_CFG(&vCreateCTbReq);
free(pTag); free(pTag);
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "exception.h" #include "exception.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "thash.h" #include "thash.h"
//#include "queryLog.h"
#include "function.h" #include "function.h"
#include "tcompare.h" #include "tcompare.h"
#include "tcompression.h" #include "tcompression.h"
...@@ -7685,7 +7684,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp ...@@ -7685,7 +7684,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
type = s.type; type = s.type;
bytes = s.bytes; bytes = s.bytes;
} else if (pExprs[i].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX && functionId == FUNCTION_TAGPRJ) { // parse the normal column } else if (pExprs[i].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX && functionId == FUNCTION_TAGPRJ) { // parse the normal column
SSchema* s = tGetTbnameColumnSchema(); const SSchema* s = tGetTbnameColumnSchema();
type = s->type; type = s->type;
bytes = s->bytes; bytes = s->bytes;
} else if (pExprs[i].base.pColumns->info.colId <= TSDB_UD_COLUMN_INDEX && pExprs[i].base.pColumns->info.colId > TSDB_RES_COL_ID) { } else if (pExprs[i].base.pColumns->info.colId <= TSDB_UD_COLUMN_INDEX && pExprs[i].base.pColumns->info.colId > TSDB_RES_COL_ID) {
...@@ -7716,7 +7715,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp ...@@ -7716,7 +7715,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
type = pCol->type; type = pCol->type;
bytes = pCol->bytes; bytes = pCol->bytes;
} else { } else {
SSchema* s = tGetTbnameColumnSchema(); const SSchema* s = tGetTbnameColumnSchema();
type = s->type; type = s->type;
bytes = s->bytes; bytes = s->bytes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册