未验证 提交 ab75e61b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9434 from taosdata/feature/dnode3

add show dnodes test case
......@@ -826,7 +826,7 @@ typedef struct SShowRsp {
} SShowRsp;
typedef struct {
char ep[TSDB_FQDN_LEN]; // end point, hostname:port
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
int32_t port;
} SCreateDnodeMsg;
......
......@@ -78,24 +78,14 @@ void qndClose(SQnode *pQnode);
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad);
/**
* @brief Process a query message.
* @brief Process a query or fetch message.
*
* @param pQnode The qnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int32_t 0 for success, -1 for failure
*/
int32_t qndProcessQueryReq(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a fetch message.
*
* @param pQnode The qnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int32_t 0 for success, -1 for failure
*/
int32_t qndProcessFetchReq(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
#ifdef __cplusplus
}
......
......@@ -69,6 +69,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107)
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108)
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x010A)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
......
......@@ -54,8 +54,20 @@ extern int32_t dDebugFlag;
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EDndWorkerType;
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
typedef struct {
EDndWorkerType type;
const char *name;
int32_t minNum;
int32_t maxNum;
FProcessItem fp;
SDnode *pDnode;
taos_queue queue;
SWorkerPool pool;
} SDnodeWorker;
typedef struct {
char *dnode;
char *mnode;
......@@ -100,16 +112,13 @@ typedef struct {
} SMnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
char *file;
SQnode *pQnode;
SRWLatch latch;
taos_queue pQueryQ;
taos_queue pFetchQ;
SWorkerPool queryPool;
SWorkerPool fetchPool;
int32_t refCount;
int8_t deployed;
int8_t dropped;
SQnode *pQnode;
SRWLatch latch;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
} SQnodeMgmt;
typedef struct {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_WORKER_H_
#define _TD_DND_WORKER_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dndInt.h"
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, FProcessItem fp);
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_WORKER_H_*/
\ No newline at end of file
......@@ -17,30 +17,9 @@
#include "dndQnode.h"
#include "dndDnode.h"
#include "dndTransport.h"
#include "dndWorker.h"
static int32_t dndInitQnodeQueryWorker(SDnode *pDnode);
static int32_t dndInitQnodeFetchWorker(SDnode *pDnode);
static void dndCleanupQnodeQueryWorker(SDnode *pDnode);
static void dndCleanupQnodeFetchWorker(SDnode *pDnode);
static int32_t dndAllocQnodeQueryQueue(SDnode *pDnode);
static int32_t dndAllocQnodeFetchQueue(SDnode *pDnode);
static void dndFreeQnodeQueryQueue(SDnode *pDnode);
static void dndFreeQnodeFetchQueue(SDnode *pDnode);
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndWriteQnodeMsgToQueue(SQnode *pQnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
static int32_t dndStartQnodeWorker(SDnode *pDnode);
static void dndStopQnodeWorker(SDnode *pDnode);
static SQnode *dndAcquireQnode(SDnode *pDnode);
static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode);
static int32_t dndReadQnodeFile(SDnode *pDnode);
static int32_t dndWriteQnodeFile(SDnode *pDnode);
static int32_t dndOpenQnode(SDnode *pDnode);
static int32_t dndDropQnode(SDnode *pDnode);
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
static SQnode *dndAcquireQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
......@@ -85,44 +64,47 @@ static int32_t dndReadQnodeFile(SDnode *pDnode) {
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = fopen(pMgmt->file, "r");
char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
FILE *fp = fopen(file, "r");
if (fp == NULL) {
dDebug("file %s not exist", pMgmt->file);
dDebug("file %s not exist", file);
code = 0;
goto PRASE_MNODE_OVER;
goto PRASE_QNODE_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", pMgmt->file);
goto PRASE_MNODE_OVER;
dError("failed to read %s since content is null", file);
goto PRASE_QNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", pMgmt->file);
goto PRASE_MNODE_OVER;
dError("failed to read %s since invalid json format", file);
goto PRASE_QNODE_OVER;
}
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_Number) {
dError("failed to read %s since deployed not found", pMgmt->file);
goto PRASE_MNODE_OVER;
dError("failed to read %s since deployed not found", file);
goto PRASE_QNODE_OVER;
}
pMgmt->deployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", pMgmt->file);
goto PRASE_MNODE_OVER;
dError("failed to read %s since dropped not found", file);
goto PRASE_QNODE_OVER;
}
pMgmt->dropped = dropped->valueint;
code = 0;
dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
PRASE_MNODE_OVER:
PRASE_QNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
......@@ -134,8 +116,8 @@ PRASE_MNODE_OVER:
static int32_t dndWriteQnodeFile(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
char file[PATH_MAX + 20] = {0};
snprintf(file, sizeof(file), "%s.bak", pMgmt->file);
char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
FILE *fp = fopen(file, "w");
if (fp == NULL) {
......@@ -154,41 +136,34 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) {
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFfetchFile(fileno(fp));
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
if (taosRenameFile(file, pMgmt->file) != 0) {
if (taosRenameFile(file, file) != 0) {
terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR;
dError("failed to rename %s since %s", pMgmt->file, terrstr());
dError("failed to rename %s since %s", file, terrstr());
return -1;
}
dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
return 0;
}
static int32_t dndStartQnodeWorker(SDnode *pDnode) {
if (dndInitQnodeQueryWorker(pDnode) != 0) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1,
(FProcessItem)dndProcessQnodeQueue) != 0) {
dError("failed to start qnode query worker since %s", terrstr());
return -1;
}
if (dndInitQnodeFetchWorker(pDnode) != 0) {
if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1,
(FProcessItem)dndProcessQnodeQueue) != 0) {
dError("failed to start qnode fetch worker since %s", terrstr());
return -1;
}
if (dndAllocQnodeQueryQueue(pDnode) != 0) {
dError("failed to alloc qnode query queue since %s", terrstr());
return -1;
}
if (dndAllocQnodeFetchQueue(pDnode) != 0) {
dError("failed to alloc qnode fetch queue since %s", terrstr());
return -1;
}
return 0;
}
......@@ -199,15 +174,12 @@ static void dndStopQnodeWorker(SDnode *pDnode) {
pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 1) taosMsleep(10);
while (!taosQueueEmpty(pMgmt->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pMgmt->pFetchQ)) taosMsleep(10);
dndCleanupQnodeQueryWorker(pDnode);
dndCleanupQnodeFetchWorker(pDnode);
while (pMgmt->refCount > 1) {
taosMsleep(10);
}
dndFreeQnodeQueryQueue(pDnode);
dndFreeQnodeFetchQueue(pDnode);
dndCleanupWorker(&pMgmt->queryWorker);
dndCleanupWorker(&pMgmt->fetchWorker);
}
static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
......@@ -230,28 +202,17 @@ static int32_t dndOpenQnode(SDnode *pDnode) {
dError("failed to open qnode since %s", terrstr());
return -1;
}
pMgmt->deployed = 1;
int32_t code = dndWriteQnodeFile(pDnode);
if (code != 0) {
dError("failed to write qnode file since %s", terrstr());
code = terrno;
pMgmt->deployed = 0;
if (dndStartQnodeWorker(pDnode) != 0) {
dError("failed to start qnode worker since %s", terrstr());
qndClose(pQnode);
// qndDestroy(pDnode->dir.qnode);
terrno = code;
return -1;
}
code = dndStartQnodeWorker(pDnode);
if (code != 0) {
dError("failed to start qnode worker since %s", terrstr());
code = terrno;
pMgmt->deployed = 0;
if (dndWriteQnodeFile(pDnode) != 0) {
dError("failed to write qnode file since %s", terrstr());
dndStopQnodeWorker(pDnode);
qndClose(pQnode);
// qndDestroy(pDnode->dir.qnode);
terrno = code;
return -1;
}
......@@ -289,7 +250,6 @@ static int32_t dndDropQnode(SDnode *pDnode) {
dndReleaseQnode(pDnode, pQnode);
dndStopQnodeWorker(pDnode);
dndWriteQnodeFile(pDnode);
qndClose(pQnode);
pMgmt->pQnode = NULL;
// qndDestroy(pDnode->dir.qnode);
......@@ -324,13 +284,11 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SRpcMsg *pRsp = NULL;
int32_t code = 0;
int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode == NULL) {
code = -1;
} else {
code = qndProcessQueryReq(pQnode, pMsg, &pRsp);
if (pQnode != NULL) {
code = qndProcessMsg(pQnode, pMsg, &pRsp);
}
if (pRsp != NULL) {
......@@ -347,135 +305,36 @@ static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t dndWriteQnodeMsgToQueue(SQnode *pQnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0;
static void dndWriteQnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
if (pQnode == NULL || pQueue == NULL) {
code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
} else {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
*pMsg = *pRpcMsg;
if (taosWriteQitem(pQueue, pMsg) != 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode != NULL) {
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
dndReleaseQnode(pDnode, pQnode);
if (code != 0) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pRpcMsg->pCont);
rpcFreeCont(pMsg->pCont);
}
}
void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SQnode *pQnode = dndAcquireQnode(pDnode);
dndWriteQnodeMsgToQueue(pQnode, pMgmt->pQueryQ, pMsg);
dndReleaseQnode(pDnode, pQnode);
dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg);
}
void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SQnode *pQnode = dndAcquireQnode(pDnode);
dndWriteQnodeMsgToQueue(pQnode, pMgmt->pFetchQ, pMsg);
dndReleaseQnode(pDnode, pQnode);
}
static int32_t dndAllocQnodeQueryQueue(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
pMgmt->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pDnode, (FProcessItem)dndProcessQnodeQueue);
if (pMgmt->pQueryQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeQnodeQueryQueue(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
tWorkerFreeQueue(&pMgmt->queryPool, pMgmt->pQueryQ);
pMgmt->pQueryQ = NULL;
}
static int32_t dndInitQnodeQueryWorker(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SWorkerPool *pPool = &pMgmt->queryPool;
pPool->name = "qnode-query";
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("qnode query worker is initialized");
return 0;
}
static void dndCleanupQnodeQueryWorker(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
tWorkerCleanup(&pMgmt->queryPool);
dDebug("qnode query worker is closed");
}
static int32_t dndAllocQnodeFetchQueue(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
pMgmt->pFetchQ = tWorkerAllocQueue(&pMgmt->queryPool, pDnode, (FProcessItem)dndProcessQnodeQueue);
if (pMgmt->pFetchQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeQnodeFetchQueue(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
tWorkerFreeQueue(&pMgmt->fetchPool, pMgmt->pFetchQ);
pMgmt->pFetchQ = NULL;
}
static int32_t dndInitQnodeFetchWorker(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SWorkerPool *pPool = &pMgmt->fetchPool;
pPool->name = "qnode-fetch";
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("qnode fetch worker is initialized");
return 0;
}
static void dndCleanupQnodeFetchWorker(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
tWorkerCleanup(&pMgmt->fetchPool);
dDebug("qnode fetch worker is closed");
dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg);
}
int32_t dndInitQnode(SDnode *pDnode) {
dInfo("dnode-qnode start to init");
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
taosInitRWLatch(&pMgmt->latch);
char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/qnode.json", pDnode->dir.dnode);
pMgmt->file = strdup(path);
if (pMgmt->file == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndReadQnodeFile(pDnode) != 0) {
return -1;
}
......@@ -488,11 +347,9 @@ int32_t dndInitQnode(SDnode *pDnode) {
void dndCleanupQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
dInfo("dnode-qnode start to clean up");
if (pMgmt->pQnode) dndStopQnodeWorker(pDnode);
tfree(pMgmt->file);
qndClose(pMgmt->pQnode);
pMgmt->pQnode = NULL;
dInfo("dnode-qnode is cleaned up");
if (pMgmt->pQnode) {
dndStopQnodeWorker(pDnode);
qndClose(pMgmt->pQnode);
pMgmt->pQnode = NULL;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndWorker.h"
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, FProcessItem fp) {
if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || fp == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
pWorker->type = type;
pWorker->name = name;
pWorker->minNum = minNum;
pWorker->maxNum = maxNum;
pWorker->fp = fp;
pWorker->pDnode = pDnode;
if (pWorker->type == DND_WORKER_SINGLE) {
SWorkerPool *pPool = &pWorker->pool;
pPool->min = minNum;
pPool->max = maxNum;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tWorkerAllocQueue(&pPool, pDnode, fp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
} else {
terrno = TSDB_CODE_INVALID_PARA;
}
return 0;
}
void dndCleanupWorker(SDnodeWorker *pWorker) {
if (pWorker->type == DND_WORKER_SINGLE) {
while (!taosQueueEmpty(pWorker->queue)) {
taosMsleep(10);
}
tWorkerCleanup(&pWorker->pool);
tWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}
}
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) {
if (pWorker == NULL || pWorker->queue == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
void *pMsg = taosAllocateQitem(contLen);
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(pMsg, pCont, contLen);
if (taosWriteQitem(pWorker, pMsg) != 0) {
taosFreeItem(pMsg);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
\ No newline at end of file
......@@ -91,7 +91,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9042");
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9042);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
......@@ -148,7 +149,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9043");
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9043);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
......@@ -159,7 +161,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9044");
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(904);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
......@@ -170,7 +173,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9045");
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9045);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
......
......@@ -102,7 +102,8 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9062");
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9062);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
......
......@@ -386,20 +386,16 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
dnodeObj.createdTime = taosGetTimestampMs();
dnodeObj.updateTime = dnodeObj.createdTime;
taosGetFqdnPortFromEp(pCreate->ep, dnodeObj.fqdn, &dnodeObj.port);
if (dnodeObj.fqdn[0] == 0 || dnodeObj.port <= 0) {
terrno = TSDB_CODE_MND_INVALID_DNODE_EP;
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
return terrno;
}
dnodeObj.port = pCreate->port;
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
if (pTrans == NULL) {
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1;
}
mDebug("trans:%d, used to create dnode:%s", pTrans->id, pCreate->ep);
mDebug("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
SSdbRaw *pRedoRaw = mndDnodeActionEncode(&dnodeObj);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
......@@ -423,17 +419,20 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont;
mDebug("dnode:%s, start to create", pCreate->ep);
mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port);
if (pCreate->ep[0] == 0) {
pCreate->port = htonl(pCreate->port);
if (pCreate->fqdn[0] == 0 || pCreate->port <= 0 || pCreate->port > UINT16_MAX) {
terrno = TSDB_CODE_MND_INVALID_DNODE_EP;
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr());
return -1;
}
SDnodeObj *pDnode = mndAcquireDnodeByEp(pMnode, pCreate->ep);
char ep[TSDB_EP_LEN];
snprintf(ep, TSDB_EP_LEN, "%s:%d", pCreate->fqdn, pCreate->port);
SDnodeObj *pDnode = mndAcquireDnodeByEp(pMnode, ep);
if (pDnode != NULL) {
mError("dnode:%d, already exist", pDnode->id);
mError("dnode:%d, already exist, %s:%u", pDnode->id, pCreate->fqdn, pCreate->port);
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
return -1;
......@@ -442,7 +441,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
int32_t code = mndCreateDnode(pMnode, pMsg, pCreate);
if (code != 0) {
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr());
return -1;
}
......
......@@ -414,8 +414,8 @@ SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMs
return NULL;
}
strncpy(pCreate->ep, id->z, id->n);
pCreate->port = val;
strncpy(pCreate->fqdn, id->z, id->n);
pCreate->port = htonl(val);
*len = sizeof(SCreateDnodeMsg);
return pCreate;
......
......@@ -79,6 +79,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PARA, "Invalid parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed")
......
......@@ -33,7 +33,9 @@ if $data02 != master then
endi
print =============== create dnodes
sql create dnode $hostname2
sql create dnode $hostname port 7200
sleep 2000
sql show dnodes;
if $rows != 2 then
return -1
......@@ -47,6 +49,22 @@ if $data10 != 2 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data12 != 0 then
return -1
endi
if $data04 != ready then
return -1
endi
if $data14 != ready then
return -1
endi
sql show mnodes;
if $rows != 1 then
return -1
......@@ -56,7 +74,7 @@ if $data00 != 1 then
return -1
endi
if $data01 != master then
if $data02 != master then
return -1
endi
......
......@@ -42,41 +42,7 @@ char *simParseArbitratorName(char *varName) {
char *simParseHostName(char *varName) {
static char hostName[140];
int32_t index = atoi(varName + 8);
int32_t port = 7100;
switch (index) {
case 1:
port = 7100;
break;
case 2:
port = 7200;
break;
case 3:
port = 7300;
break;
case 4:
port = 7400;
break;
case 5:
port = 7500;
break;
case 6:
port = 7600;
break;
case 7:
port = 7700;
break;
case 8:
port = 7800;
break;
case 9:
port = 7900;
break;
}
sprintf(hostName, "'%s:%d'", simHostName, port);
// simInfo("hostName:%s", hostName);
sprintf(hostName, "%s", simHostName);
return hostName;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册