提交 041e6716 编写于 作者: S Shengliang Guan

refact mnode queue

上级 d60941d4
...@@ -147,28 +147,12 @@ void mndCleanupMsg(SMnodeMsg *pMsg); ...@@ -147,28 +147,12 @@ void mndCleanupMsg(SMnodeMsg *pMsg);
void mndSendRsp(SMnodeMsg *pMsg, int32_t code); void mndSendRsp(SMnodeMsg *pMsg, int32_t code);
/** /**
* @brief Process the read request. * @brief Process the read, write, sync request.
* *
* @param pMsg The request msg. * @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure. * @return int32_t 0 for success, -1 for failure.
*/ */
void mndProcessReadMsg(SMnodeMsg *pMsg); void mndProcessMsg(SMnodeMsg *pMsg);
/**
* @brief Process the write request.
*
* @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure.
*/
void mndProcessWriteMsg(SMnodeMsg *pMsg);
/**
* @brief Process the sync request.
*
* @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure.
*/
void mndProcessSyncMsg(SMnodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -79,7 +79,7 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); ...@@ -79,7 +79,7 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
* @param pRsp The response message * @param pRsp The response message
* @return int32_t 0 for success, -1 for failure * @return int32_t 0 for success, -1 for failure
*/ */
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/** /**
* @brief Drop a snode. * @brief Drop a snode.
......
...@@ -65,8 +65,10 @@ typedef struct { ...@@ -65,8 +65,10 @@ typedef struct {
void *queueFp; void *queueFp;
SDnode *pDnode; SDnode *pDnode;
taos_queue queue; taos_queue queue;
SWorkerPool pool; union {
SMWorkerPool mpool; SWorkerPool pool;
SMWorkerPool mpool;
};
} SDnodeWorker; } SDnodeWorker;
typedef struct { typedef struct {
...@@ -95,21 +97,17 @@ typedef struct { ...@@ -95,21 +97,17 @@ typedef struct {
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {
int32_t refCount; int32_t refCount;
int8_t deployed; int8_t deployed;
int8_t dropped; int8_t dropped;
int8_t replica; SMnode *pMnode;
int8_t selfIndex; SRWLatch latch;
SReplica replicas[TSDB_MAX_REPLICA]; SDnodeWorker readWorker;
char *file; SDnodeWorker writeWorker;
SMnode *pMnode; SDnodeWorker syncWorker;
SRWLatch latch; int8_t replica;
taos_queue pReadQ; int8_t selfIndex;
taos_queue pWriteQ; SReplica replicas[TSDB_MAX_REPLICA];
taos_queue pSyncQ;
SWorkerPool readPool;
SWorkerPool writePool;
SWorkerPool syncPool;
} SMnodeMgmt; } SMnodeMgmt;
typedef struct { typedef struct {
......
...@@ -140,20 +140,22 @@ static int32_t dndWriteBnodeFile(SDnode *pDnode) { ...@@ -140,20 +140,22 @@ static int32_t dndWriteBnodeFile(SDnode *pDnode) {
fclose(fp); fclose(fp);
free(content); free(content);
if (taosRenameFile(file, file) != 0) { char realfile[PATH_MAX + 20];
snprintf(realfile, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode);
if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR;
dError("failed to rename %s since %s", file, terrstr()); dError("failed to rename %s since %s", file, terrstr());
return -1; return -1;
} }
dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
return 0; return 0;
} }
static int32_t dndStartBnodeWorker(SDnode *pDnode) { static int32_t dndStartBnodeWorker(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt; SBnodeMgmt *pMgmt = &pDnode->bmgmt;
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "bnode-write", 0, 1, if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, dndProcessBnodeQueue) != 0) {
(FProcessItem)dndProcessBnodeQueue) != 0) {
dError("failed to start bnode write worker since %s", terrstr()); dError("failed to start bnode write worker since %s", terrstr());
return -1; return -1;
} }
...@@ -202,7 +204,9 @@ static int32_t dndOpenBnode(SDnode *pDnode) { ...@@ -202,7 +204,9 @@ static int32_t dndOpenBnode(SDnode *pDnode) {
return -1; return -1;
} }
pMgmt->deployed = 1;
if (dndWriteBnodeFile(pDnode) != 0) { if (dndWriteBnodeFile(pDnode) != 0) {
pMgmt->deployed = 0;
dError("failed to write bnode file since %s", terrstr()); dError("failed to write bnode file since %s", terrstr());
dndStopBnodeWorker(pDnode); dndStopBnodeWorker(pDnode);
bndClose(pBnode); bndClose(pBnode);
...@@ -211,7 +215,6 @@ static int32_t dndOpenBnode(SDnode *pDnode) { ...@@ -211,7 +215,6 @@ static int32_t dndOpenBnode(SDnode *pDnode) {
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->pBnode = pBnode; pMgmt->pBnode = pBnode;
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
dInfo("bnode open successfully"); dInfo("bnode open successfully");
...@@ -243,6 +246,8 @@ static int32_t dndDropBnode(SDnode *pDnode) { ...@@ -243,6 +246,8 @@ static int32_t dndDropBnode(SDnode *pDnode) {
dndReleaseBnode(pDnode, pBnode); dndReleaseBnode(pDnode, pBnode);
dndStopBnodeWorker(pDnode); dndStopBnodeWorker(pDnode);
pMgmt->deployed = 0;
dndWriteBnodeFile(pDnode);
bndClose(pBnode); bndClose(pBnode);
pMgmt->pBnode = NULL; pMgmt->pBnode = NULL;
bndDestroy(pDnode->dir.bnode); bndDestroy(pDnode->dir.bnode);
...@@ -353,7 +358,12 @@ int32_t dndInitBnode(SDnode *pDnode) { ...@@ -353,7 +358,12 @@ int32_t dndInitBnode(SDnode *pDnode) {
return -1; return -1;
} }
if (pMgmt->dropped) return 0; if (pMgmt->dropped) {
dInfo("bnode has been deployed and needs to be deleted");
bndDestroy(pDnode->dir.bnode);
return 0;
}
if (!pMgmt->deployed) return 0; if (!pMgmt->deployed) return 0;
return dndOpenBnode(pDnode); return dndOpenBnode(pDnode);
......
...@@ -17,42 +17,9 @@ ...@@ -17,42 +17,9 @@
#include "dndMnode.h" #include "dndMnode.h"
#include "dndDnode.h" #include "dndDnode.h"
#include "dndTransport.h" #include "dndTransport.h"
#include "dndWorker.h"
static int32_t dndInitMnodeReadWorker(SDnode *pDnode); static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode);
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode);
static void dndCleanupMnodeReadWorker(SDnode *pDnode);
static void dndCleanupMnodeWriteWorker(SDnode *pDnode);
static void dndCleanupMnodeSyncWorker(SDnode *pDnode);
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode);
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode);
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode);
static void dndFreeMnodeReadQueue(SDnode *pDnode);
static void dndFreeMnodeWriteQueue(SDnode *pDnode);
static void dndFreeMnodeSyncQueue(SDnode *pDnode);
static void dndFreeMnodeMgmtQueue(SDnode *pDnode);
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndStartMnodeWorker(SDnode *pDnode);
static void dndStopMnodeWorker(SDnode *pDnode);
static SMnode *dndAcquireMnode(SDnode *pDnode);
static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode);
static int32_t dndReadMnodeFile(SDnode *pDnode);
static int32_t dndWriteMnodeFile(SDnode *pDnode);
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndDropMnode(SDnode *pDnode);
static SMnode *dndAcquireMnode(SDnode *pDnode) { static SMnode *dndAcquireMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
...@@ -97,49 +64,52 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { ...@@ -97,49 +64,52 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
cJSON *root = NULL; cJSON *root = NULL;
FILE *fp = fopen(pMgmt->file, "r"); char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode);
FILE *fp = fopen(file, "r");
if (fp == NULL) { if (fp == NULL) {
dDebug("file %s not exist", pMgmt->file); dDebug("file %s not exist", file);
code = 0; code = 0;
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
len = (int32_t)fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s since content is null", pMgmt->file); dError("failed to read %s since content is null", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
content[len] = 0; content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read %s since invalid json format", pMgmt->file); dError("failed to read %s since invalid json format", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_Number) { if (!deployed || deployed->type != cJSON_Number) {
dError("failed to read %s since deployed not found", pMgmt->file); dError("failed to read %s since deployed not found", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
pMgmt->deployed = deployed->valueint; pMgmt->deployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) { if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", pMgmt->file); dError("failed to read %s since dropped not found", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
pMgmt->dropped = dropped->valueint; pMgmt->dropped = dropped->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (!mnodes || mnodes->type != cJSON_Array) { if (!mnodes || mnodes->type != cJSON_Array) {
dError("failed to read %s since nodes not found", pMgmt->file); dError("failed to read %s since nodes not found", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
pMgmt->replica = cJSON_GetArraySize(mnodes); pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica); dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
...@@ -151,28 +121,28 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { ...@@ -151,28 +121,28 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
cJSON *id = cJSON_GetObjectItem(node, "id"); cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) { if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", pMgmt->file); dError("failed to read %s since id not found", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
pReplica->id = id->valueint; pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", pMgmt->file); dError("failed to read %s since fqdn not found", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
cJSON *port = cJSON_GetObjectItem(node, "port"); cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_Number) { if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", pMgmt->file); dError("failed to read %s since port not found", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
pReplica->port = port->valueint; pReplica->port = port->valueint;
} }
code = 0; code = 0;
dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
PRASE_MNODE_OVER: PRASE_MNODE_OVER:
if (content != NULL) free(content); if (content != NULL) free(content);
...@@ -186,8 +156,8 @@ PRASE_MNODE_OVER: ...@@ -186,8 +156,8 @@ PRASE_MNODE_OVER:
static int32_t dndWriteMnodeFile(SDnode *pDnode) { static int32_t dndWriteMnodeFile(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
char file[PATH_MAX + 20] = {0}; char file[PATH_MAX + 20];
snprintf(file, sizeof(file), "%s.bak", pMgmt->file); snprintf(file, PATH_MAX + 20, "%s/mnode.json.bak", pDnode->dir.dnode);
FILE *fp = fopen(file, "w"); FILE *fp = fopen(file, "w");
if (fp == NULL) { if (fp == NULL) {
...@@ -223,47 +193,36 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) { ...@@ -223,47 +193,36 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) {
fclose(fp); fclose(fp);
free(content); free(content);
if (taosRenameFile(file, pMgmt->file) != 0) { char realfile[PATH_MAX + 20];
snprintf(realfile, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode);
if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
dError("failed to rename %s since %s", pMgmt->file, terrstr()); dError("failed to rename %s since %s", file, terrstr());
return -1; return -1;
} }
dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
return 0; return 0;
} }
static int32_t dndStartMnodeWorker(SDnode *pDnode) { static int32_t dndStartMnodeWorker(SDnode *pDnode) {
if (dndInitMnodeReadWorker(pDnode) != 0) { SMnodeMgmt *pMgmt = &pDnode->mmgmt;
if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr()); dError("failed to start mnode read worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitMnodeWriteWorker(pDnode) != 0) { if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, dndProcessMnodeQueue) != 0) {
dError("failed to start mnode write worker since %s", terrstr()); dError("failed to start mnode write worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitMnodeSyncWorker(pDnode) != 0) { if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, dndProcessMnodeQueue) != 0) {
dError("failed to start mnode sync worker since %s", terrstr()); dError("failed to start mnode sync worker since %s", terrstr());
return -1; return -1;
} }
if (dndAllocMnodeReadQueue(pDnode) != 0) {
dError("failed to alloc mnode read queue since %s", terrstr());
return -1;
}
if (dndAllocMnodeWriteQueue(pDnode) != 0) {
dError("failed to alloc mnode write queue since %s", terrstr());
return -1;
}
if (dndAllocMnodeSyncQueue(pDnode) != 0) {
dError("failed to alloc mnode sync queue since %s", terrstr());
return -1;
}
return 0; return 0;
} }
...@@ -274,18 +233,13 @@ static void dndStopMnodeWorker(SDnode *pDnode) { ...@@ -274,18 +233,13 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
pMgmt->deployed = 0; pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 1) taosMsleep(10); while (pMgmt->refCount > 1) {
while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10); taosMsleep(10);
while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10); }
while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10);
dndCleanupMnodeReadWorker(pDnode);
dndCleanupMnodeWriteWorker(pDnode);
dndCleanupMnodeSyncWorker(pDnode);
dndFreeMnodeReadQueue(pDnode); dndCleanupWorker(&pMgmt->readWorker);
dndFreeMnodeWriteQueue(pDnode); dndCleanupWorker(&pMgmt->writeWorker);
dndFreeMnodeSyncQueue(pDnode); dndCleanupWorker(&pMgmt->syncWorker);
} }
static bool dndNeedDeployMnode(SDnode *pDnode) { static bool dndNeedDeployMnode(SDnode *pDnode) {
...@@ -383,28 +337,21 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -383,28 +337,21 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
dError("failed to open mnode since %s", terrstr()); dError("failed to open mnode since %s", terrstr());
return -1; return -1;
} }
pMgmt->deployed = 1;
int32_t code = dndWriteMnodeFile(pDnode); if (dndStartMnodeWorker(pDnode) != 0) {
if (code != 0) { dError("failed to start mnode worker since %s", terrstr());
dError("failed to write mnode file since %s", terrstr());
code = terrno;
pMgmt->deployed = 0;
mndClose(pMnode); mndClose(pMnode);
mndDestroy(pDnode->dir.mnode); mndDestroy(pDnode->dir.mnode);
terrno = code;
return -1; return -1;
} }
code = dndStartMnodeWorker(pDnode); pMgmt->deployed = 1;
if (code != 0) { if (dndWriteMnodeFile(pDnode) != 0) {
dError("failed to start mnode worker since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
code = terrno;
pMgmt->deployed = 0; pMgmt->deployed = 0;
dndStopMnodeWorker(pDnode); dndStopMnodeWorker(pDnode);
mndClose(pMnode); mndClose(pMnode);
mndDestroy(pDnode->dir.mnode); mndDestroy(pDnode->dir.mnode);
terrno = code;
return -1; return -1;
} }
...@@ -461,6 +408,7 @@ static int32_t dndDropMnode(SDnode *pDnode) { ...@@ -461,6 +408,7 @@ static int32_t dndDropMnode(SDnode *pDnode) {
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
dndStopMnodeWorker(pDnode); dndStopMnodeWorker(pDnode);
pMgmt->deployed = 0;
dndWriteMnodeFile(pDnode); dndWriteMnodeFile(pDnode);
mndClose(pMnode); mndClose(pMnode);
pMgmt->pMnode = NULL; pMgmt->pMnode = NULL;
...@@ -528,13 +476,12 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -528,13 +476,12 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
} }
} }
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) { if (pMnode != NULL) {
mndProcessReadMsg(pMsg); mndProcessMsg(pMsg);
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
} else { } else {
mndSendRsp(pMsg, terrno); mndSendRsp(pMsg, terrno);
...@@ -543,208 +490,43 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -543,208 +490,43 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
mndCleanupMsg(pMsg); mndCleanupMsg(pMsg);
} }
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) { if (pMnode != NULL) {
mndProcessWriteMsg(pMsg); SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
dndReleaseMnode(pDnode, pMnode); if (pMsg == NULL) {
} else { code = TSDB_CODE_OUT_OF_MEMORY;
mndSendRsp(pMsg, terrno); } else {
} code = dndWriteMsgToWorker(pWorker, pMsg, 0);
mndCleanupMsg(pMsg);
}
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
mndProcessSyncMsg(pMsg);
dndReleaseMnode(pDnode, pMnode);
} else {
mndSendRsp(pMsg, terrno);
}
mndCleanupMsg(pMsg);
}
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (taosWriteQitem(pQueue, pMsg) != 0) {
mndCleanupMsg(pMsg);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
} }
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
dndReleaseMnode(pDnode, pMnode); if (code != 0) {
} mndCleanupMsg(pMsg);
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
} }
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} }
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
}
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { if (code != 0) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; if (pRpcMsg->msgType & 1u) {
SMnode *pMnode = dndAcquireMnode(pDnode); SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = code};
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
pMsg->pCont = NULL;
}
dndReleaseMnode(pDnode, pMnode);
}
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, pDnode, (FProcessItem)dndProcessMnodeReadQueue);
if (pMgmt->pReadQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
return 0;
}
static void dndFreeMnodeReadQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->readPool, pMgmt->pReadQ);
pMgmt->pReadQ = NULL;
}
static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SWorkerPool *pPool = &pMgmt->readPool;
pPool->name = "mnode-read";
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("mnode read worker is initialized");
return 0;
}
static void dndCleanupMnodeReadWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->readPool);
dDebug("mnode read worker is closed");
}
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeWriteQueue);
if (pMgmt->pWriteQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMnodeWriteQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pWriteQ);
pMgmt->pWriteQ = NULL;
} }
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
SWorkerPool *pPool = &pMgmt->writePool;
pPool->name = "mnode-write";
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("mnode write worker is initialized");
return 0;
}
static void dndCleanupMnodeWriteWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->writePool);
dDebug("mnode write worker is closed");
}
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, pDnode, (FProcessItem)dndProcessMnodeSyncQueue);
if (pMgmt->pSyncQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMnodeSyncQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->syncPool, pMgmt->pSyncQ);
pMgmt->pSyncQ = NULL;
} }
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
SWorkerPool *pPool = &pMgmt->syncPool;
pPool->name = "mnode-sync";
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("mnode sync worker is initialized");
return 0;
} }
static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
tWorkerCleanup(&pMgmt->syncPool);
dDebug("mnode sync worker is closed");
} }
int32_t dndInitMnode(SDnode *pDnode) { int32_t dndInitMnode(SDnode *pDnode) {
...@@ -752,14 +534,6 @@ int32_t dndInitMnode(SDnode *pDnode) { ...@@ -752,14 +534,6 @@ int32_t dndInitMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosInitRWLatch(&pMgmt->latch); taosInitRWLatch(&pMgmt->latch);
char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode);
pMgmt->file = strdup(path);
if (pMgmt->file == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndReadMnodeFile(pDnode) != 0) { if (dndReadMnodeFile(pDnode) != 0) {
return -1; return -1;
} }
...@@ -790,13 +564,13 @@ int32_t dndInitMnode(SDnode *pDnode) { ...@@ -790,13 +564,13 @@ int32_t dndInitMnode(SDnode *pDnode) {
} }
void dndCleanupMnode(SDnode *pDnode) { void dndCleanupMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
dInfo("dnode-mnode start to clean up"); dInfo("dnode-mnode start to clean up");
if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tfree(pMgmt->file); if (pMgmt->pMnode) {
mndClose(pMgmt->pMnode); dndStopMnodeWorker(pDnode);
pMgmt->pMnode = NULL; mndClose(pMgmt->pMnode);
pMgmt->pMnode = NULL;
}
dInfo("dnode-mnode is cleaned up"); dInfo("dnode-mnode is cleaned up");
} }
......
...@@ -140,26 +140,27 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) { ...@@ -140,26 +140,27 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) {
fclose(fp); fclose(fp);
free(content); free(content);
if (taosRenameFile(file, file) != 0) { char realfile[PATH_MAX + 20];
snprintf(realfile, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR; terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR;
dError("failed to rename %s since %s", file, terrstr()); dError("failed to rename %s since %s", file, terrstr());
return -1; return -1;
} }
dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
return 0; return 0;
} }
static int32_t dndStartQnodeWorker(SDnode *pDnode) { static int32_t dndStartQnodeWorker(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt; SQnodeMgmt *pMgmt = &pDnode->qmgmt;
if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, dndProcessQnodeQueue) != 0) {
(FProcessItem)dndProcessQnodeQueue) != 0) {
dError("failed to start qnode query worker since %s", terrstr()); dError("failed to start qnode query worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, dndProcessQnodeQueue) != 0) {
(FProcessItem)dndProcessQnodeQueue) != 0) {
dError("failed to start qnode fetch worker since %s", terrstr()); dError("failed to start qnode fetch worker since %s", terrstr());
return -1; return -1;
} }
...@@ -209,7 +210,9 @@ static int32_t dndOpenQnode(SDnode *pDnode) { ...@@ -209,7 +210,9 @@ static int32_t dndOpenQnode(SDnode *pDnode) {
return -1; return -1;
} }
pMgmt->deployed = 1;
if (dndWriteQnodeFile(pDnode) != 0) { if (dndWriteQnodeFile(pDnode) != 0) {
pMgmt->deployed = 0;
dError("failed to write qnode file since %s", terrstr()); dError("failed to write qnode file since %s", terrstr());
dndStopQnodeWorker(pDnode); dndStopQnodeWorker(pDnode);
qndClose(pQnode); qndClose(pQnode);
...@@ -218,7 +221,6 @@ static int32_t dndOpenQnode(SDnode *pDnode) { ...@@ -218,7 +221,6 @@ static int32_t dndOpenQnode(SDnode *pDnode) {
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->pQnode = pQnode; pMgmt->pQnode = pQnode;
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
dInfo("qnode open successfully"); dInfo("qnode open successfully");
...@@ -250,6 +252,8 @@ static int32_t dndDropQnode(SDnode *pDnode) { ...@@ -250,6 +252,8 @@ static int32_t dndDropQnode(SDnode *pDnode) {
dndReleaseQnode(pDnode, pQnode); dndReleaseQnode(pDnode, pQnode);
dndStopQnodeWorker(pDnode); dndStopQnodeWorker(pDnode);
pMgmt->deployed = 0;
dndWriteQnodeFile(pDnode);
qndClose(pQnode); qndClose(pQnode);
pMgmt->pQnode = NULL; pMgmt->pQnode = NULL;
......
...@@ -140,20 +140,22 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { ...@@ -140,20 +140,22 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) {
fclose(fp); fclose(fp);
free(content); free(content);
if (taosRenameFile(file, file) != 0) { char realfile[PATH_MAX + 20];
snprintf(realfile, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode);
if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR; terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR;
dError("failed to rename %s since %s", file, terrstr()); dError("failed to rename %s since %s", file, terrstr());
return -1; return -1;
} }
dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
return 0; return 0;
} }
static int32_t dndStartSnodeWorker(SDnode *pDnode) { static int32_t dndStartSnodeWorker(SDnode *pDnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt; SSnodeMgmt *pMgmt = &pDnode->smgmt;
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) {
(FProcessItem)dndProcessSnodeQueue) != 0) {
dError("failed to start snode write worker since %s", terrstr()); dError("failed to start snode write worker since %s", terrstr());
return -1; return -1;
} }
...@@ -202,7 +204,9 @@ static int32_t dndOpenSnode(SDnode *pDnode) { ...@@ -202,7 +204,9 @@ static int32_t dndOpenSnode(SDnode *pDnode) {
return -1; return -1;
} }
pMgmt->deployed = 1;
if (dndWriteSnodeFile(pDnode) != 0) { if (dndWriteSnodeFile(pDnode) != 0) {
pMgmt->deployed = 0;
dError("failed to write snode file since %s", terrstr()); dError("failed to write snode file since %s", terrstr());
dndStopSnodeWorker(pDnode); dndStopSnodeWorker(pDnode);
sndClose(pSnode); sndClose(pSnode);
...@@ -211,7 +215,6 @@ static int32_t dndOpenSnode(SDnode *pDnode) { ...@@ -211,7 +215,6 @@ static int32_t dndOpenSnode(SDnode *pDnode) {
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->pSnode = pSnode; pMgmt->pSnode = pSnode;
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
dInfo("snode open successfully"); dInfo("snode open successfully");
...@@ -243,6 +246,8 @@ static int32_t dndDropSnode(SDnode *pDnode) { ...@@ -243,6 +246,8 @@ static int32_t dndDropSnode(SDnode *pDnode) {
dndReleaseSnode(pDnode, pSnode); dndReleaseSnode(pDnode, pSnode);
dndStopSnodeWorker(pDnode); dndStopSnodeWorker(pDnode);
pMgmt->deployed = 0;
dndWriteSnodeFile(pDnode);
sndClose(pSnode); sndClose(pSnode);
pMgmt->pSnode = NULL; pMgmt->pSnode = NULL;
sndDestroy(pDnode->dir.snode); sndDestroy(pDnode->dir.snode);
...@@ -328,7 +333,12 @@ int32_t dndInitSnode(SDnode *pDnode) { ...@@ -328,7 +333,12 @@ int32_t dndInitSnode(SDnode *pDnode) {
return -1; return -1;
} }
if (pMgmt->dropped) return 0; if (pMgmt->dropped) {
dInfo("snode has been deployed and needs to be deleted");
sndDestroy(pDnode->dir.snode);
return 0;
}
if (!pMgmt->deployed) return 0; if (!pMgmt->deployed) return 0;
return dndOpenSnode(pDnode); return dndOpenSnode(pDnode);
......
...@@ -73,7 +73,7 @@ void dndCleanupWorker(SDnodeWorker *pWorker) { ...@@ -73,7 +73,7 @@ void dndCleanupWorker(SDnodeWorker *pWorker) {
tWorkerCleanup(&pWorker->pool); tWorkerCleanup(&pWorker->pool);
tWorkerFreeQueue(&pWorker->pool, pWorker->queue); tWorkerFreeQueue(&pWorker->pool, pWorker->queue);
} else if (pWorker->type == DND_WORKER_MULTI) { } else if (pWorker->type == DND_WORKER_MULTI) {
tWorkerCleanup(&pWorker->mpool); tMWorkerCleanup(&pWorker->mpool);
tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue); tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue);
} else { } else {
} }
...@@ -85,16 +85,23 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) ...@@ -85,16 +85,23 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen)
return -1; return -1;
} }
void *pMsg = taosAllocateQitem(contLen); void *pMsg = NULL;
if (contLen != 0) {
pMsg = taosAllocateQitem(contLen);
if (pMsg != NULL) {
memcpy(pMsg, pCont, contLen);
}
} else {
pMsg = pCont;
}
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
memcpy(pMsg, pCont, contLen); if (taosWriteQitem(pWorker->queue, pMsg) != 0) {
taosFreeQitem(pMsg);
if (taosWriteQitem(pWorker, pMsg) != 0) {
taosFreeItem(pMsg);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
......
...@@ -162,7 +162,7 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { ...@@ -162,7 +162,7 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost"); strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(904); pReq->port = htonl(9044);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
......
...@@ -388,7 +388,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * ...@@ -388,7 +388,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
dnodeObj.updateTime = dnodeObj.createdTime; dnodeObj.updateTime = dnodeObj.createdTime;
dnodeObj.port = pCreate->port; dnodeObj.port = pCreate->port;
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, "%s:%u", dnodeObj.fqdn, dnodeObj.port); snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
......
...@@ -390,7 +390,7 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { ...@@ -390,7 +390,7 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void mndProcessRpcMsg(SMnodeMsg *pMsg) { void mndProcessMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
int32_t code = 0; int32_t code = 0;
tmsg_t msgType = pMsg->rpcMsg.msgType; tmsg_t msgType = pMsg->rpcMsg.msgType;
...@@ -451,12 +451,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { ...@@ -451,12 +451,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
} }
} }
void mndProcessReadMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
uint64_t mndGenerateUid(char *name, int32_t len) { uint64_t mndGenerateUid(char *name, int32_t len) {
int64_t us = taosGetTimestampUs(); int64_t us = taosGetTimestampUs();
int32_t hashval = MurmurHash3_32(name, len); int32_t hashval = MurmurHash3_32(name, len);
......
...@@ -428,6 +428,7 @@ SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf ...@@ -428,6 +428,7 @@ SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf
char* end = NULL; char* end = NULL;
SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg)); SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg));
pDrop->dnodeId = strtoll(pzName->z, &end, 10); pDrop->dnodeId = strtoll(pzName->z, &end, 10);
pDrop->dnodeId = htonl(pDrop->dnodeId);
*len = sizeof(SDropDnodeMsg); *len = sizeof(SDropDnodeMsg);
if (end - pzName->z != pzName->n) { if (end - pzName->z != pzName->n) {
......
...@@ -94,5 +94,16 @@ if $rows != 2 then ...@@ -94,5 +94,16 @@ if $rows != 2 then
return -1 return -1
endi endi
print =============== drop dnode
sql drop dnode 2;
sql show dnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册