diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f51075a89a11d8446e97c80cccb6aba29f9aa05d..f57b753d931b3baf2486155ad30b7c6e6f2d1cee 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -826,7 +826,7 @@ typedef struct SShowRsp { } SShowRsp; typedef struct { - char ep[TSDB_FQDN_LEN]; // end point, hostname:port + char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port int32_t port; } SCreateDnodeMsg; diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 29a3d1af205cfdec8edf1a4c5b56e168b5f4bf87..8084175a907ff4c98b52a5f343956b8fbcc834fd 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -78,24 +78,14 @@ void qndClose(SQnode *pQnode); int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); /** - * @brief Process a query message. + * @brief Process a query or fetch message. * * @param pQnode The qnode object. * @param pMsg The request message * @param pRsp The response message * @return int32_t 0 for success, -1 for failure */ -int32_t qndProcessQueryReq(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -/** - * @brief Process a fetch message. - * - * @param pQnode The qnode object. - * @param pMsg The request message - * @param pRsp The response message - * @return int32_t 0 for success, -1 for failure - */ -int32_t qndProcessFetchReq(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 408a656e83ed7a77f72ab7ca665e06e9d89235b3..ba3e122db3411c5930a49b89a53f6ce4cbf0ee15 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -69,6 +69,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107) #define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108) #define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109) +#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x010A) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111) #define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112) diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 9679a03c3ebce805126bc6562457637494aa2e8c..0d37828ecd2f8d38deb2a596014389afa9e87ac0 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -54,8 +54,20 @@ extern int32_t dDebugFlag; #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat; +typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EDndWorkerType; typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); +typedef struct { + EDndWorkerType type; + const char *name; + int32_t minNum; + int32_t maxNum; + FProcessItem fp; + SDnode *pDnode; + taos_queue queue; + SWorkerPool pool; +} SDnodeWorker; + typedef struct { char *dnode; char *mnode; @@ -100,16 +112,13 @@ typedef struct { } SMnodeMgmt; typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - char *file; - SQnode *pQnode; - SRWLatch latch; - taos_queue pQueryQ; - taos_queue pFetchQ; - SWorkerPool queryPool; - SWorkerPool fetchPool; + int32_t refCount; + int8_t deployed; + int8_t dropped; + SQnode *pQnode; + SRWLatch latch; + SDnodeWorker queryWorker; + SDnodeWorker fetchWorker; } SQnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndWorker.h b/source/dnode/mgmt/impl/inc/dndWorker.h new file mode 100644 index 0000000000000000000000000000000000000000..237c0518e8a0e1b10d1970b766ce08ec00864f68 --- /dev/null +++ b/source/dnode/mgmt/impl/inc/dndWorker.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_WORKER_H_ +#define _TD_DND_WORKER_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "dndInt.h" + +int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum, + int32_t maxNum, FProcessItem fp); +void dndCleanupWorker(SDnodeWorker *pWorker); +int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_WORKER_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 08044ea2636cd24271015748172a18e7ab43cfdd..8c76bf95a60a54f2f6786e1d844a84ed219c6dcc 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -17,30 +17,9 @@ #include "dndQnode.h" #include "dndDnode.h" #include "dndTransport.h" +#include "dndWorker.h" -static int32_t dndInitQnodeQueryWorker(SDnode *pDnode); -static int32_t dndInitQnodeFetchWorker(SDnode *pDnode); -static void dndCleanupQnodeQueryWorker(SDnode *pDnode); -static void dndCleanupQnodeFetchWorker(SDnode *pDnode); -static int32_t dndAllocQnodeQueryQueue(SDnode *pDnode); -static int32_t dndAllocQnodeFetchQueue(SDnode *pDnode); -static void dndFreeQnodeQueryQueue(SDnode *pDnode); -static void dndFreeQnodeFetchQueue(SDnode *pDnode); - -static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); -static int32_t dndWriteQnodeMsgToQueue(SQnode *pQnode, taos_queue pQueue, SRpcMsg *pRpcMsg); - -static int32_t dndStartQnodeWorker(SDnode *pDnode); -static void dndStopQnodeWorker(SDnode *pDnode); - -static SQnode *dndAcquireQnode(SDnode *pDnode); -static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode); - -static int32_t dndReadQnodeFile(SDnode *pDnode); -static int32_t dndWriteQnodeFile(SDnode *pDnode); - -static int32_t dndOpenQnode(SDnode *pDnode); -static int32_t dndDropQnode(SDnode *pDnode); +static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); static SQnode *dndAcquireQnode(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; @@ -85,44 +64,47 @@ static int32_t dndReadQnodeFile(SDnode *pDnode) { char *content = calloc(1, maxLen + 1); cJSON *root = NULL; - FILE *fp = fopen(pMgmt->file, "r"); + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "r"); if (fp == NULL) { - dDebug("file %s not exist", pMgmt->file); + dDebug("file %s not exist", file); code = 0; - goto PRASE_MNODE_OVER; + goto PRASE_QNODE_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", pMgmt->file); - goto PRASE_MNODE_OVER; + dError("failed to read %s since content is null", file); + goto PRASE_QNODE_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", pMgmt->file); - goto PRASE_MNODE_OVER; + dError("failed to read %s since invalid json format", file); + goto PRASE_QNODE_OVER; } cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); if (!deployed || deployed->type != cJSON_Number) { - dError("failed to read %s since deployed not found", pMgmt->file); - goto PRASE_MNODE_OVER; + dError("failed to read %s since deployed not found", file); + goto PRASE_QNODE_OVER; } pMgmt->deployed = deployed->valueint; cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", pMgmt->file); - goto PRASE_MNODE_OVER; + dError("failed to read %s since dropped not found", file); + goto PRASE_QNODE_OVER; } pMgmt->dropped = dropped->valueint; code = 0; - dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); -PRASE_MNODE_OVER: +PRASE_QNODE_OVER: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); @@ -134,8 +116,8 @@ PRASE_MNODE_OVER: static int32_t dndWriteQnodeFile(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; - char file[PATH_MAX + 20] = {0}; - snprintf(file, sizeof(file), "%s.bak", pMgmt->file); + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); FILE *fp = fopen(file, "w"); if (fp == NULL) { @@ -154,41 +136,34 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) { len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); - taosFfetchFile(fileno(fp)); + taosFsyncFile(fileno(fp)); fclose(fp); free(content); - if (taosRenameFile(file, pMgmt->file) != 0) { + if (taosRenameFile(file, file) != 0) { terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR; - dError("failed to rename %s since %s", pMgmt->file, terrstr()); + dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartQnodeWorker(SDnode *pDnode) { - if (dndInitQnodeQueryWorker(pDnode) != 0) { + SQnodeMgmt *pMgmt = &pDnode->qmgmt; + if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, + (FProcessItem)dndProcessQnodeQueue) != 0) { dError("failed to start qnode query worker since %s", terrstr()); return -1; } - if (dndInitQnodeFetchWorker(pDnode) != 0) { + if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, + (FProcessItem)dndProcessQnodeQueue) != 0) { dError("failed to start qnode fetch worker since %s", terrstr()); return -1; } - if (dndAllocQnodeQueryQueue(pDnode) != 0) { - dError("failed to alloc qnode query queue since %s", terrstr()); - return -1; - } - - if (dndAllocQnodeFetchQueue(pDnode) != 0) { - dError("failed to alloc qnode fetch queue since %s", terrstr()); - return -1; - } - return 0; } @@ -199,15 +174,12 @@ static void dndStopQnodeWorker(SDnode *pDnode) { pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); - while (pMgmt->refCount > 1) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pQueryQ)) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pFetchQ)) taosMsleep(10); - - dndCleanupQnodeQueryWorker(pDnode); - dndCleanupQnodeFetchWorker(pDnode); + while (pMgmt->refCount > 1) { + taosMsleep(10); + } - dndFreeQnodeQueryQueue(pDnode); - dndFreeQnodeFetchQueue(pDnode); + dndCleanupWorker(&pMgmt->queryWorker); + dndCleanupWorker(&pMgmt->fetchWorker); } static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { @@ -230,28 +202,17 @@ static int32_t dndOpenQnode(SDnode *pDnode) { dError("failed to open qnode since %s", terrstr()); return -1; } - pMgmt->deployed = 1; - int32_t code = dndWriteQnodeFile(pDnode); - if (code != 0) { - dError("failed to write qnode file since %s", terrstr()); - code = terrno; - pMgmt->deployed = 0; + if (dndStartQnodeWorker(pDnode) != 0) { + dError("failed to start qnode worker since %s", terrstr()); qndClose(pQnode); - // qndDestroy(pDnode->dir.qnode); - terrno = code; return -1; } - code = dndStartQnodeWorker(pDnode); - if (code != 0) { - dError("failed to start qnode worker since %s", terrstr()); - code = terrno; - pMgmt->deployed = 0; + if (dndWriteQnodeFile(pDnode) != 0) { + dError("failed to write qnode file since %s", terrstr()); dndStopQnodeWorker(pDnode); qndClose(pQnode); - // qndDestroy(pDnode->dir.qnode); - terrno = code; return -1; } @@ -289,7 +250,6 @@ static int32_t dndDropQnode(SDnode *pDnode) { dndReleaseQnode(pDnode, pQnode); dndStopQnodeWorker(pDnode); - dndWriteQnodeFile(pDnode); qndClose(pQnode); pMgmt->pQnode = NULL; // qndDestroy(pDnode->dir.qnode); @@ -324,13 +284,11 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; SRpcMsg *pRsp = NULL; - int32_t code = 0; + int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED; SQnode *pQnode = dndAcquireQnode(pDnode); - if (pQnode == NULL) { - code = -1; - } else { - code = qndProcessQueryReq(pQnode, pMsg, &pRsp); + if (pQnode != NULL) { + code = qndProcessMsg(pQnode, pMsg, &pRsp); } if (pRsp != NULL) { @@ -347,135 +305,36 @@ static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t dndWriteQnodeMsgToQueue(SQnode *pQnode, taos_queue pQueue, SRpcMsg *pRpcMsg) { - int32_t code = 0; +static void dndWriteQnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED; - if (pQnode == NULL || pQueue == NULL) { - code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED; - } else { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); - if (pMsg == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - } else { - *pMsg = *pRpcMsg; - if (taosWriteQitem(pQueue, pMsg) != 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } + SQnode *pQnode = dndAcquireQnode(pDnode); + if (pQnode != NULL) { + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); } + dndReleaseQnode(pDnode, pQnode); if (code != 0) { - if (pRpcMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; rpcSendResponse(&rsp); } - rpcFreeCont(pRpcMsg->pCont); + rpcFreeCont(pMsg->pCont); } } void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SQnode *pQnode = dndAcquireQnode(pDnode); - dndWriteQnodeMsgToQueue(pQnode, pMgmt->pQueryQ, pMsg); - dndReleaseQnode(pDnode, pQnode); + dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg); } void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SQnode *pQnode = dndAcquireQnode(pDnode); - dndWriteQnodeMsgToQueue(pQnode, pMgmt->pFetchQ, pMsg); - dndReleaseQnode(pDnode, pQnode); -} - -static int32_t dndAllocQnodeQueryQueue(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - pMgmt->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pDnode, (FProcessItem)dndProcessQnodeQueue); - if (pMgmt->pQueryQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeQnodeQueryQueue(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - tWorkerFreeQueue(&pMgmt->queryPool, pMgmt->pQueryQ); - pMgmt->pQueryQ = NULL; -} - -static int32_t dndInitQnodeQueryWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SWorkerPool *pPool = &pMgmt->queryPool; - pPool->name = "qnode-query"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("qnode query worker is initialized"); - return 0; -} - -static void dndCleanupQnodeQueryWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - tWorkerCleanup(&pMgmt->queryPool); - dDebug("qnode query worker is closed"); -} - -static int32_t dndAllocQnodeFetchQueue(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - pMgmt->pFetchQ = tWorkerAllocQueue(&pMgmt->queryPool, pDnode, (FProcessItem)dndProcessQnodeQueue); - if (pMgmt->pFetchQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeQnodeFetchQueue(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - tWorkerFreeQueue(&pMgmt->fetchPool, pMgmt->pFetchQ); - pMgmt->pFetchQ = NULL; -} - -static int32_t dndInitQnodeFetchWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SWorkerPool *pPool = &pMgmt->fetchPool; - pPool->name = "qnode-fetch"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("qnode fetch worker is initialized"); - return 0; -} - -static void dndCleanupQnodeFetchWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - tWorkerCleanup(&pMgmt->fetchPool); - dDebug("qnode fetch worker is closed"); + dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg); } int32_t dndInitQnode(SDnode *pDnode) { - dInfo("dnode-qnode start to init"); SQnodeMgmt *pMgmt = &pDnode->qmgmt; taosInitRWLatch(&pMgmt->latch); - char path[PATH_MAX]; - snprintf(path, PATH_MAX, "%s/qnode.json", pDnode->dir.dnode); - pMgmt->file = strdup(path); - if (pMgmt->file == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - if (dndReadQnodeFile(pDnode) != 0) { return -1; } @@ -488,11 +347,9 @@ int32_t dndInitQnode(SDnode *pDnode) { void dndCleanupQnode(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; - - dInfo("dnode-qnode start to clean up"); - if (pMgmt->pQnode) dndStopQnodeWorker(pDnode); - tfree(pMgmt->file); - qndClose(pMgmt->pQnode); - pMgmt->pQnode = NULL; - dInfo("dnode-qnode is cleaned up"); + if (pMgmt->pQnode) { + dndStopQnodeWorker(pDnode); + qndClose(pMgmt->pQnode); + pMgmt->pQnode = NULL; + } } diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c new file mode 100644 index 0000000000000000000000000000000000000000..da0e3a93193515fc01af92d180c47f17beae1e2a --- /dev/null +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndWorker.h" + +int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum, + int32_t maxNum, FProcessItem fp) { + if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || fp == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } + + pWorker->type = type; + pWorker->name = name; + pWorker->minNum = minNum; + pWorker->maxNum = maxNum; + pWorker->fp = fp; + pWorker->pDnode = pDnode; + + if (pWorker->type == DND_WORKER_SINGLE) { + SWorkerPool *pPool = &pWorker->pool; + pPool->min = minNum; + pPool->max = maxNum; + if (tWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pWorker->queue = tWorkerAllocQueue(&pPool, pDnode, fp); + if (pWorker->queue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } else { + terrno = TSDB_CODE_INVALID_PARA; + } + + return 0; +} + +void dndCleanupWorker(SDnodeWorker *pWorker) { + if (pWorker->type == DND_WORKER_SINGLE) { + while (!taosQueueEmpty(pWorker->queue)) { + taosMsleep(10); + } + tWorkerCleanup(&pWorker->pool); + tWorkerFreeQueue(&pWorker->pool, pWorker->queue); + } +} + +int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) { + if (pWorker == NULL || pWorker->queue == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } + + void *pMsg = taosAllocateQitem(contLen); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + memcpy(pMsg, pCont, contLen); + + if (taosWriteQitem(pWorker, pMsg) != 0) { + taosFreeItem(pMsg); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index 4009a83cd00b30bf021d7b20e298fa531e522f65..dc352c5a3ff1cc2ad9bac940dee4971f8594534a 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -91,7 +91,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); - strcpy(pReq->ep, "localhost:9042"); + strcpy(pReq->fqdn, "localhost"); + pReq->port = htonl(9042); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); @@ -148,7 +149,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); - strcpy(pReq->ep, "localhost:9043"); + strcpy(pReq->fqdn, "localhost"); + pReq->port = htonl(9043); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); @@ -159,7 +161,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); - strcpy(pReq->ep, "localhost:9044"); + strcpy(pReq->fqdn, "localhost"); + pReq->port = htonl(904); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); @@ -170,7 +173,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); - strcpy(pReq->ep, "localhost:9045"); + strcpy(pReq->fqdn, "localhost"); + pReq->port = htonl(9045); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); diff --git a/source/dnode/mgmt/impl/test/mnode/mnode.cpp b/source/dnode/mgmt/impl/test/mnode/mnode.cpp index 0d89542288b4978c37b8d09282f59bea52424d76..a6cec935395f7461556d06a2ab8919800e3b44ed 100644 --- a/source/dnode/mgmt/impl/test/mnode/mnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/mnode.cpp @@ -102,7 +102,8 @@ TEST_F(DndTestMnode, 04_Create_Mnode) { int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); - strcpy(pReq->ep, "localhost:9062"); + strcpy(pReq->fqdn, "localhost"); + pReq->port = htonl(9062); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 153d75ffd071f36f1ced09acc1d4f02613b53d4e..56559cbea1eeef39be8156bb7ded43b31c14eb4e 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -386,20 +386,16 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE); dnodeObj.createdTime = taosGetTimestampMs(); dnodeObj.updateTime = dnodeObj.createdTime; - taosGetFqdnPortFromEp(pCreate->ep, dnodeObj.fqdn, &dnodeObj.port); - - if (dnodeObj.fqdn[0] == 0 || dnodeObj.port <= 0) { - terrno = TSDB_CODE_MND_INVALID_DNODE_EP; - mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); - return terrno; - } + dnodeObj.port = pCreate->port; + memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); + snprintf(dnodeObj.ep, "%s:%u", dnodeObj.fqdn, dnodeObj.port); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { - mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); + mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr()); return -1; } - mDebug("trans:%d, used to create dnode:%s", pTrans->id, pCreate->ep); + mDebug("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep); SSdbRaw *pRedoRaw = mndDnodeActionEncode(&dnodeObj); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { @@ -423,17 +419,20 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont; - mDebug("dnode:%s, start to create", pCreate->ep); + mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port); - if (pCreate->ep[0] == 0) { + pCreate->port = htonl(pCreate->port); + if (pCreate->fqdn[0] == 0 || pCreate->port <= 0 || pCreate->port > UINT16_MAX) { terrno = TSDB_CODE_MND_INVALID_DNODE_EP; - mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); + mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr()); return -1; } - SDnodeObj *pDnode = mndAcquireDnodeByEp(pMnode, pCreate->ep); + char ep[TSDB_EP_LEN]; + snprintf(ep, TSDB_EP_LEN, "%s:%d", pCreate->fqdn, pCreate->port); + SDnodeObj *pDnode = mndAcquireDnodeByEp(pMnode, ep); if (pDnode != NULL) { - mError("dnode:%d, already exist", pDnode->id); + mError("dnode:%d, already exist, %s:%u", pDnode->id, pCreate->fqdn, pCreate->port); mndReleaseDnode(pMnode, pDnode); terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST; return -1; @@ -442,7 +441,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { int32_t code = mndCreateDnode(pMnode, pMsg, pCreate); if (code != 0) { - mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); + mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr()); return -1; } diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 48bdf8d3e405937a0252fa436bf3112f3327102b..2f80af225a79b8c3a909c486e3c518308bfb53de 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -414,8 +414,8 @@ SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMs return NULL; } - strncpy(pCreate->ep, id->z, id->n); - pCreate->port = val; + strncpy(pCreate->fqdn, id->z, id->n); + pCreate->port = htonl(val); *len = sizeof(SCreateDnodeMsg); return pCreate; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 451b7e4fa4b1280da98da33f5c3d8ea3cec49d48..3a923c6653c15bdd2f33f0bbc0fc9f38b72fa9f6 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -79,6 +79,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PARA, "Invalid parameters") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed") diff --git a/tests/script/unique/dnode/basic1.sim b/tests/script/unique/dnode/basic1.sim index 0d1b75f3c0ebbf8632363729f54aa524a2721f37..730864ef263a8f4597d3d5ea44a2cd037067edd1 100644 --- a/tests/script/unique/dnode/basic1.sim +++ b/tests/script/unique/dnode/basic1.sim @@ -33,7 +33,9 @@ if $data02 != master then endi print =============== create dnodes -sql create dnode $hostname2 +sql create dnode $hostname port 7200 +sleep 2000 + sql show dnodes; if $rows != 2 then return -1 @@ -47,6 +49,22 @@ if $data10 != 2 then return -1 endi +if $data02 != 0 then + return -1 +endi + +if $data12 != 0 then + return -1 +endi + +if $data04 != ready then + return -1 +endi + +if $data14 != ready then + return -1 +endi + sql show mnodes; if $rows != 1 then return -1 @@ -56,7 +74,7 @@ if $data00 != 1 then return -1 endi -if $data01 != master then +if $data02 != master then return -1 endi diff --git a/tests/tsim/src/simSystem.c b/tests/tsim/src/simSystem.c index cb61e6b81454fc1828105081b9d646f4159c8564..016b6500ed381c06bb5b155d165518acccc5bcf6 100644 --- a/tests/tsim/src/simSystem.c +++ b/tests/tsim/src/simSystem.c @@ -42,41 +42,7 @@ char *simParseArbitratorName(char *varName) { char *simParseHostName(char *varName) { static char hostName[140]; - - int32_t index = atoi(varName + 8); - int32_t port = 7100; - switch (index) { - case 1: - port = 7100; - break; - case 2: - port = 7200; - break; - case 3: - port = 7300; - break; - case 4: - port = 7400; - break; - case 5: - port = 7500; - break; - case 6: - port = 7600; - break; - case 7: - port = 7700; - break; - case 8: - port = 7800; - break; - case 9: - port = 7900; - break; - } - - sprintf(hostName, "'%s:%d'", simHostName, port); - // simInfo("hostName:%s", hostName); + sprintf(hostName, "%s", simHostName); return hostName; }