diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index a81f8c0c9d6c4634bb1dbd2416774906009f1566..ee05403a613d9b7bc1bb35758fbf9e9746f593a4 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -31,6 +31,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(taosd balance sync) ENDIF () + IF (TD_MPEER) + TARGET_LINK_LIBRARIES(taosd mpeer sync) + ENDIF () + SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} diff --git a/src/dnode/inc/dnodeMClient.h b/src/dnode/inc/dnodeMClient.h index ba638946318edcb38f45b0db75767f240ad5f23d..594fb84d3b4cd30e550ed41b79adff10a18589b0 100644 --- a/src/dnode/inc/dnodeMClient.h +++ b/src/dnode/inc/dnodeMClient.h @@ -25,6 +25,7 @@ void dnodeCleanupMClient(); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); uint32_t dnodeGetMnodeMasteIp(); void * dnodeGetMpeerInfos(); +int32_t dnodeGetDnodeId(); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index f944bd5adda6a2e9532036b1e4ad6788ec792b1d..b8d01916fe0938676a09656813a91765e4b9507d 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -23,7 +23,6 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); void dnodeMgmt(SRpcMsg *rpcMsg); -void dnodeUpdateDnodeId(int32_t dnodeId); void* dnodeGetVnode(int32_t vgId); int32_t dnodeGetVnodeStatus(void *pVnode); diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 5dd015313ac4e0a21962a9dd00564a693c46e429..85454af095aed7782c4fbbd3f2f3be0bf3210024 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -21,30 +21,52 @@ #include "trpc.h" #include "tutil.h" #include "tsync.h" +#include "ttime.h" +#include "ttimer.h" #include "dnode.h" #include "dnodeMClient.h" #include "dnodeModule.h" #include "dnodeMgmt.h" +#include "vnode.h" +#include "mpeer.h" #define MPEER_CONTENT_LEN 2000 static bool dnodeReadMnodeIpList(); static void dnodeSaveMnodeIpList(); +static void dnodeReadDnodeInfo(); +static void dnodeUpdateDnodeInfo(int32_t dnodeId); static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); static void dnodeProcessStatusRsp(SRpcMsg *pMsg); +static void dnodeSendStatusMsg(void *handle, void *tmrId); static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); + static void *tsDnodeMClientRpc = NULL; static SRpcIpSet tsMnodeIpList = {0}; static SDMNodeInfos tsMnodeInfos = {0}; +static void *tsDnodeTmr = NULL; +static void *tsStatusTimer = NULL; +static uint32_t tsRebootTime; +static int32_t tsDnodeId = 0; +static char tsDnodeName[TSDB_NODE_NAME_LEN]; int32_t dnodeInitMClient() { + dnodeReadDnodeInfo(); + tsRebootTime = taosGetTimestampSec(); + + tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); + if (tsDnodeTmr == NULL) { + dError("failed to init dnode timer"); + return -1; + } + if (!dnodeReadMnodeIpList()) { memset(&tsMnodeIpList, 0, sizeof(SRpcIpSet)); memset(&tsMnodeInfos, 0, sizeof(SDMNodeInfos)); tsMnodeIpList.port = tsMnodeDnodePort; tsMnodeIpList.numOfIps = 1; tsMnodeIpList.ip[0] = inet_addr(tsMasterIp); - if (tsSecondIp[0]) { + if (strcmp(tsSecondIp, tsMasterIp) != 0) { tsMnodeIpList.numOfIps = 2; tsMnodeIpList.ip[1] = inet_addr(tsSecondIp); } @@ -57,8 +79,6 @@ int32_t dnodeInitMClient() { } } - tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; - SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; @@ -79,11 +99,24 @@ int32_t dnodeInitMClient() { return -1; } + tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; + taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); + dPrint("mnode rpc client is opened"); return 0; } void dnodeCleanupMClient() { + if (tsStatusTimer != NULL) { + taosTmrStopA(&tsStatusTimer); + tsStatusTimer = NULL; + } + + if (tsDnodeTmr != NULL) { + taosTmrCleanUp(tsDnodeTmr); + tsDnodeTmr = NULL; + } + if (tsDnodeMClientRpc) { rpcClose(tsDnodeMClientRpc); tsDnodeMClientRpc = NULL; @@ -104,6 +137,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { if (pMsg->code != TSDB_CODE_SUCCESS) { dError("status rsp is received, error:%s", tstrerror(pMsg->code)); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); return; } @@ -111,9 +145,19 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { SDMNodeInfos *mpeers = &pStatusRsp->mpeers; if (mpeers->nodeNum <= 0) { dError("status msg is invalid, num of ips is %d", mpeers->nodeNum); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); return; } + SDnodeState *pState = &pStatusRsp->dnodeState; + pState->numOfVnodes = htonl(pState->numOfVnodes); + pState->moduleStatus = htonl(pState->moduleStatus); + pState->createdTime = htonl(pState->createdTime); + pState->dnodeId = htonl(pState->dnodeId); + + dnodeProcessModuleStatus(pState->moduleStatus); + dnodeUpdateDnodeInfo(pState->dnodeId); + SRpcIpSet mgmtIpSet = {0}; mgmtIpSet.inUse = mpeers->inUse; mgmtIpSet.numOfIps = mpeers->nodeNum; @@ -122,29 +166,25 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { mgmtIpSet.ip[i] = htonl(mpeers->nodeInfos[i].nodeIp); } - if (memcmp(&mgmtIpSet, &tsMnodeIpList, sizeof(SRpcIpSet)) != 0) { + if (memcmp(&mgmtIpSet, &tsMnodeIpList, sizeof(SRpcIpSet)) != 0 || tsMnodeInfos.nodeNum == 0) { memcpy(&tsMnodeIpList, &mgmtIpSet, sizeof(SRpcIpSet)); - memcpy(&tsMnodeInfos, mpeers, sizeof(SDMNodeInfos)); + tsMnodeInfos.inUse = mpeers->inUse; + tsMnodeInfos.nodeNum = mpeers->nodeNum; dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse); for (int32_t i = 0; i < mpeers->nodeNum; i++) { tsMnodeInfos.nodeInfos[i].nodeId = htonl(mpeers->nodeInfos[i].nodeId); tsMnodeInfos.nodeInfos[i].nodeIp = htonl(mpeers->nodeInfos[i].nodeIp); tsMnodeInfos.nodeInfos[i].nodePort = htons(mpeers->nodeInfos[i].nodePort); + strcpy(tsMnodeInfos.nodeInfos[i].nodeName, mpeers->nodeInfos[i].nodeName); dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId, - taosIpStr(tsMnodeInfos.nodeInfos[i].nodeId), tsMnodeInfos.nodeInfos[i].nodePort, + taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp), tsMnodeInfos.nodeInfos[i].nodePort, tsMnodeInfos.nodeInfos[i].nodeName); } dnodeSaveMnodeIpList(); + mpeerUpdateSync(); } - SDnodeState *pState = &pStatusRsp->dnodeState; - pState->numOfVnodes = htonl(pState->numOfVnodes); - pState->moduleStatus = htonl(pState->moduleStatus); - pState->createdTime = htonl(pState->createdTime); - pState->dnodeId = htonl(pState->dnodeId); - - dnodeProcessModuleStatus(pState->moduleStatus); - dnodeUpdateDnodeId(pState->dnodeId); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); } void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { @@ -294,4 +334,93 @@ uint32_t dnodeGetMnodeMasteIp() { void* dnodeGetMpeerInfos() { return &tsMnodeInfos; +} + +static void dnodeSendStatusMsg(void *handle, void *tmrId) { + if (tsDnodeTmr == NULL) { + dError("dnode timer is already released"); + return; + } + + if (tsStatusTimer == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + dError("failed to start status timer"); + return; + } + + int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SDMStatusMsg *pStatus = rpcMallocCont(contLen); + if (pStatus == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + dError("failed to malloc status message"); + return; + } + + strcpy(pStatus->dnodeName, tsDnodeName); + pStatus->version = htonl(tsVersion); + pStatus->dnodeId = htonl(tsDnodeId); + pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); + pStatus->publicIp = htonl(inet_addr(tsPublicIp)); + pStatus->lastReboot = htonl(tsRebootTime); + pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); + pStatus->numOfCores = htons((uint16_t) tsNumOfCores); + pStatus->diskAvailable = tsAvailDataDirGB; + pStatus->alternativeRole = (uint8_t) tsAlternativeRole; + + vnodeBuildStatusMsg(pStatus); + contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); + pStatus->openVnodes = htons(pStatus->openVnodes); + + SRpcMsg rpcMsg = { + .pCont = pStatus, + .contLen = contLen, + .msgType = TSDB_MSG_TYPE_DM_STATUS + }; + + dnodeSendMsgToMnode(&rpcMsg); +} + +static void dnodeReadDnodeInfo() { + char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; + sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); + + FILE *fp = fopen(dnodeIdFile, "r"); + if (!fp) return; + + char option[32] = {0}; + int32_t value = 0; + int32_t num = 0; + + num = fscanf(fp, "%s %d", option, &value); + if (num != 2) return; + if (strcmp(option, "dnodeId") != 0) return; + tsDnodeId = value;; + + fclose(fp); + dPrint("read dnodeId:%d successed", tsDnodeId); +} + +static void dnodeSaveDnodeInfo() { + char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; + sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); + + FILE *fp = fopen(dnodeIdFile, "w"); + if (!fp) return; + + fprintf(fp, "dnodeId %d\n", tsDnodeId); + + fclose(fp); + dPrint("save dnodeId successed"); +} + +void dnodeUpdateDnodeInfo(int32_t dnodeId) { + if (tsDnodeId == 0) { + dPrint("dnodeId is set to %d", dnodeId); + tsDnodeId = dnodeId; + dnodeSaveDnodeInfo(); + } +} + +int32_t dnodeGetDnodeId() { + return tsDnodeId; } \ No newline at end of file diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 01e2c4dfcc3d2e5b5980871e5efb005cfc4d85a6..5fba941788965f1d60fc47fb4e7abc7eef09df5a 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -159,10 +159,10 @@ static int32_t dnodeInitSystem() { dPrint("starting to initialize TDengine ..."); if (dnodeInitStorage() != 0) return -1; - if (dnodeInitModules() != 0) return -1; if (dnodeInitRead() != 0) return -1; if (dnodeInitWrite() != 0) return -1; if (dnodeInitMClient() != 0) return -1; + if (dnodeInitModules() != 0) return -1; if (dnodeInitMnode() != 0) return -1; if (dnodeInitMgmt() != 0) return -1; if (dnodeInitShell() != 0) return -1; @@ -177,7 +177,6 @@ static int32_t dnodeInitSystem() { static void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { - tclearModuleStatus(TSDB_MOD_MGMT); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeCleanupShell(); dnodeCleanupMnode(); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 0ec769c0af96f14e56833274ff446d05ffe3a825..abfee2239bc7e8b2a6d9f339928ef51a63bc14fe 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -21,8 +21,6 @@ #include "tlog.h" #include "trpc.h" #include "tsdb.h" -#include "ttime.h" -#include "ttimer.h" #include "twal.h" #include "dnodeMClient.h" #include "dnodeMgmt.h" @@ -38,52 +36,23 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); -static void dnodeSendStatusMsg(void *handle, void *tmrId); -static void dnodeReadDnodeId(); - -static void *tsDnodeTmr = NULL; -static void *tsStatusTimer = NULL; -static uint32_t tsRebootTime; -static int32_t tsDnodeId = 0; -static char tsDnodeName[TSDB_NODE_NAME_LEN]; int32_t dnodeInitMgmt() { - dnodeReadDnodeId(); - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; - tsRebootTime = taosGetTimestampSec(); - - tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); - if (tsDnodeTmr == NULL) { - dError("failed to init dnode timer"); - return -1; - } - int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; } - taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); return TSDB_CODE_SUCCESS; } void dnodeCleanupMgmt() { - if (tsStatusTimer != NULL) { - taosTmrStopA(&tsStatusTimer); - tsStatusTimer = NULL; - } - - if (tsDnodeTmr != NULL) { - taosTmrCleanUp(tsDnodeTmr); - tsDnodeTmr = NULL; - } - dnodeCloseVnodes(); } @@ -213,89 +182,3 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; return tsCfgDynamicOptions(pCfg->config); } - -static void dnodeSendStatusMsg(void *handle, void *tmrId) { - if (tsDnodeTmr == NULL) { - dError("dnode timer is already released"); - return; - } - - if (tsStatusTimer == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - dError("failed to start status timer"); - return; - } - - int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); - SDMStatusMsg *pStatus = rpcMallocCont(contLen); - if (pStatus == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - dError("failed to malloc status message"); - return; - } - - strcpy(pStatus->dnodeName, tsDnodeName); - pStatus->version = htonl(tsVersion); - pStatus->dnodeId = htonl(tsDnodeId); - pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); - pStatus->publicIp = htonl(inet_addr(tsPublicIp)); - pStatus->lastReboot = htonl(tsRebootTime); - pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); - pStatus->numOfCores = htons((uint16_t) tsNumOfCores); - pStatus->diskAvailable = tsAvailDataDirGB; - pStatus->alternativeRole = (uint8_t) tsAlternativeRole; - - vnodeBuildStatusMsg(pStatus); - contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); - pStatus->openVnodes = htons(pStatus->openVnodes); - - SRpcMsg rpcMsg = { - .pCont = pStatus, - .contLen = contLen, - .msgType = TSDB_MSG_TYPE_DM_STATUS - }; - - dnodeSendMsgToMnode(&rpcMsg); - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); -} - -static void dnodeReadDnodeId() { - char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; - sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); - - FILE *fp = fopen(dnodeIdFile, "r"); - if (!fp) return; - - char option[32] = {0}; - int32_t value = 0; - int32_t num = 0; - - num = fscanf(fp, "%s %d", option, &value); - if (num != 2) return; - if (strcmp(option, "dnodeId") != 0) return; - tsDnodeId = value;; - - fclose(fp); - dPrint("read dnodeId:%d successed", tsDnodeId); -} - -static void dnodeSaveDnodeId() { - char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; - sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); - - FILE *fp = fopen(dnodeIdFile, "w"); - if (!fp) return; - - fprintf(fp, "dnodeId %d\n", tsDnodeId); - - fclose(fp); - dPrint("save dnodeId successed"); -} - -void dnodeUpdateDnodeId(int32_t dnodeId) { - if (tsDnodeId == 0) { - dPrint("dnodeId is set to %d", dnodeId); - tsDnodeId = dnodeId; - dnodeSaveDnodeId(); - } -} diff --git a/src/inc/mnode.h b/src/inc/mnode.h index f2c072453f8d552ba93486948e177d9c6773de83..e8a0ba3bcc551cb6dfc620dd54f3d3328411d70b 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -45,16 +45,14 @@ struct _mnode_obj; typedef struct _mnode_obj { int32_t mnodeId; - int32_t dnodeId; int64_t createdTime; int8_t reserved[14]; int8_t updateEnd[1]; int32_t refCount; - int8_t role; - int8_t status; - uint16_t port; uint32_t privateIp; uint32_t publicIp; + uint16_t port; + int8_t role; char mnodeName[TSDB_NODE_NAME_LEN + 1]; } SMnodeObj; diff --git a/src/inc/mpeer.h b/src/inc/mpeer.h index 157ea40119963062da7b8ba1070c8e2034f0d051..ba1b7d32cf27a22540d3d3cbfd7388c617e24880 100644 --- a/src/inc/mpeer.h +++ b/src/inc/mpeer.h @@ -28,22 +28,29 @@ enum _TAOS_MN_STATUS { TAOS_MN_STATUS_READY }; +// general implementation int32_t mpeerInit(); void mpeerCleanup(); + +// special implementation +int32_t mpeerInitMnodes(); +void mpeerCleanupMnodes(); +int32_t mpeerAddMnode(int32_t dnodeId); +int32_t mpeerRemoveMnode(int32_t dnodeId); + +void * mpeerGetMnode(int32_t mnodeId); int32_t mpeerGetMnodesNum(); void * mpeerGetNextMnode(void *pNode, struct _mnode_obj **pMnode); void mpeerReleaseMnode(struct _mnode_obj *pMnode); -bool mpeerInServerStatus(); bool mpeerIsMaster(); -bool mpeerCheckRedirect(); void mpeerGetPrivateIpList(SRpcIpSet *ipSet); void mpeerGetPublicIpList(SRpcIpSet *ipSet); void mpeerGetMpeerInfos(void *mpeers); -char * mpeerGetMnodeStatusStr(int32_t status); -char * mpeerGetMnodeRoleStr(int32_t role); +int32_t mpeerForwardReqToPeer(void *pHead); +void mpeerUpdateSync(); #ifdef __cplusplus } diff --git a/src/inc/tbalance.h b/src/inc/tbalance.h index 8cf8cb9fb9c517630f1baedf894ef0cd2b812928..c73d6a91a93bb03fc188fa72889d4e8a23c6cc85 100644 --- a/src/inc/tbalance.h +++ b/src/inc/tbalance.h @@ -31,6 +31,7 @@ struct _dnode_obj; int32_t balanceInit(); void balanceCleanUp(); void balanceNotify(); +void balanceReset(); int32_t balanceAllocVnodes(struct _vg_obj *pVgroup); int32_t balanceDropDnode(struct _dnode_obj *pDnode); diff --git a/src/inc/tcluster.h b/src/inc/tcluster.h index 769a819b90fcb915b01f9a81740d02cda6c7f7de..a56285fe1c29f9b18c175ed360a448351017f097 100644 --- a/src/inc/tcluster.h +++ b/src/inc/tcluster.h @@ -37,6 +37,7 @@ int32_t clusterInit(); void clusterCleanUp(); char* clusterGetDnodeStatusStr(int32_t dnodeStatus); bool clusterCheckModuleInDnode(struct _dnode_obj *pDnode, int moduleType); +void clusterMonitorDnodeModule(); int32_t clusterInitDnodes(); void clusterCleanupDnodes(); diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index 83afa2a0816087570430c4f2ffbc05e7706c519e..8ecb5ef1524060f3081e667ea132f934e37ba250 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -21,8 +21,8 @@ extern "C" { #endif typedef enum { - SDB_TABLE_MNODE = 0, - SDB_TABLE_DNODE = 1, + SDB_TABLE_DNODE = 0, + SDB_TABLE_MNODE = 1, SDB_TABLE_ACCOUNT = 2, SDB_TABLE_USER = 3, SDB_TABLE_DB = 4, @@ -34,6 +34,7 @@ typedef enum { typedef enum { SDB_KEY_STRING, + SDB_KEY_INT, SDB_KEY_AUTO } ESdbKeyType; @@ -63,14 +64,22 @@ typedef struct { int32_t (*encodeFp)(SSdbOperDesc *pOper); int32_t (*decodeFp)(SSdbOperDesc *pDesc); int32_t (*destroyFp)(SSdbOperDesc *pDesc); - int32_t (*updateAllFp)(); + int32_t (*restoredFp)(); } SSdbTableDesc; +typedef struct { + int64_t version; + void * wal; + pthread_mutex_t mutex; +} SSdbObject; + int32_t sdbInit(); void sdbCleanUp(); +SSdbObject *sdbGetObj(); void * sdbOpenTable(SSdbTableDesc *desc); void sdbCloseTable(void *handle); +int sdbProcessWrite(void *param, void *data, int type); int32_t sdbInsertRow(SSdbOperDesc *pOper); int32_t sdbDeleteRow(SSdbOperDesc *pOper); @@ -81,7 +90,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow); void sdbIncRef(void *thandle, void *pRow); void sdbDecRef(void *thandle, void *pRow); int64_t sdbGetNumOfRows(void *handle); -int64_t sdbGetId(void *handle); +int32_t sdbGetId(void *handle); uint64_t sdbGetVersion(); #ifdef __cplusplus diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 946ec29d8c042606fc9f0954fa20da44da1e6662..089bf494e7b26c4613da5d6b47ac84bca21edc23 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -102,7 +102,7 @@ static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionUpdateAll() { +static int32_t mgmtDbActionRestored() { return 0; } @@ -123,7 +123,7 @@ int32_t mgmtInitDbs() { .encodeFp = mgmtDbActionEncode, .decodeFp = mgmtDbActionDecode, .destroyFp = mgmtDbActionDestroy, - .updateAllFp = mgmtDbActionUpdateAll + .restoredFp = mgmtDbActionRestored }; tsDbSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 51ac4e842dd8309667625218d020374f97059769..d13d37586ac18d27fb34daff79c6fe97bd1efa55 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -77,6 +77,7 @@ void * clusterGetDnode(int32_t dnodeId) { return dnodeId == 1 ? &tsDnodeObj : N void * clusterGetDnodeByIp(uint32_t ip) { return &tsDnodeObj; } void clusterReleaseDnode(struct _dnode_obj *pDnode) {} void clusterUpdateDnode(struct _dnode_obj *pDnode) {} +void clusterMonitorDnodeModule() {} #endif @@ -208,6 +209,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; balanceNotify(); + clusterMonitorDnodeModule(); } clusterReleaseDnode(pDnode); diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index a074060f52190fb668ac30dda9a966a7f92f75c8..b6fb1ba42546ef4345f060e954feb144ba2456dd 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -109,6 +109,11 @@ int32_t mgmtStartSystem() { return -1; } + if (mpeerInit() < 0) { + mError("failed to init mpeers"); + return -1; + } + if (sdbInit() < 0) { mError("failed to init sdb"); return -1; @@ -122,11 +127,6 @@ int32_t mgmtStartSystem() { return -1; } - if (mpeerInit() < 0) { - mError("failed to init mpeers"); - return -1; - } - if (balanceInit() < 0) { mError("failed to init dnode balance") } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 82da45479380d04f5ea089c7ca32af988068aa10..e2edb201b99a88189b6d0da32f9f1af4520084bf 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -19,12 +19,9 @@ #include "trpc.h" #include "tsync.h" #include "mpeer.h" -#include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtUser.h" -extern int32_t mpeerInitMnodes(); -extern void mpeerCleanupMnodes(); static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -34,18 +31,25 @@ static SMnodeObj tsMnodeObj = {0}; int32_t mpeerInitMnodes() { tsMnodeObj.mnodeId = 1; - tsMnodeObj.dnodeId = 1; tsMnodeObj.privateIp = inet_addr(tsPrivateIp); tsMnodeObj.publicIp = inet_addr(tsPublicIp); tsMnodeObj.createdTime = taosGetTimestampMs(); tsMnodeObj.role = TAOS_SYNC_ROLE_MASTER; - tsMnodeObj.status = TAOS_MN_STATUS_READY; tsMnodeObj.port = tsMnodeDnodePort; sprintf(tsMnodeObj.mnodeName, "m%d", tsMnodeObj.mnodeId); return TSDB_CODE_SUCCESS; } +void mpeerCleanupMnodes() {} +int32_t mpeerAddMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } +int32_t mpeerRemoveMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } +void * mpeerGetMnode(int32_t mnodeId) { return &tsMnodeObj; } +int32_t mpeerGetMnodesNum() { return 1; } +void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} +bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } +void mpeerUpdateSync() {} + void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { if (*pMnode == NULL) { *pMnode = &tsMnodeObj; @@ -58,20 +62,21 @@ void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { void mpeerGetPrivateIpList(SRpcIpSet *ipSet) { ipSet->inUse = 0; - ipSet->port = htons(tsMnodeDnodePort); ipSet->numOfIps = 1; + ipSet->port = htons(tsMnodeObj.port); ipSet->ip[0] = htonl(tsMnodeObj.privateIp); } void mpeerGetPublicIpList(SRpcIpSet *ipSet) { ipSet->inUse = 0; - ipSet->port = htons(tsMnodeDnodePort); ipSet->numOfIps = 1; + ipSet->port = htons(tsMnodeObj.port); ipSet->ip[0] = htonl(tsMnodeObj.publicIp); } void mpeerGetMpeerInfos(void *param) { SDMNodeInfos *mpeers = param; + mpeers->inUse = 0; mpeers->nodeNum = 1; mpeers->nodeInfos[0].nodeId = htonl(tsMnodeObj.mnodeId); mpeers->nodeInfos[0].nodeIp = htonl(tsMnodeObj.privateIp); @@ -79,40 +84,23 @@ void mpeerGetMpeerInfos(void *param) { strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName); } -void mpeerCleanupDnodes() {} -int32_t mpeerGetMnodesNum() { return 1; } -void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} -bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; } -bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } -bool mpeerCheckRedirect() { return false; } +int32_t mpeerForwardReqToPeer(void *pHead) { + return TSDB_CODE_SUCCESS; +} #endif int32_t mpeerInit() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); - return mpeerInitMnodes(); } void mpeerCleanup() { - mpeerCleanupDnodes(); -} - -char *mpeerGetMnodeStatusStr(int32_t status) { - switch (status) { - case TAOS_MN_STATUS_OFFLINE: - return "offline"; - case TAOS_MN_STATUS_DROPPING: - return "dropping"; - case TAOS_MN_STATUS_READY: - return "ready"; - default: - return "undefined"; - } + mpeerCleanupMnodes(); } -char *mpeerGetMnodeRoleStr(int32_t role) { +static char *mpeerGetMnodeRoleStr(int32_t role) { switch (role) { case TAOS_SYNC_ROLE_OFFLINE: return "offline"; @@ -160,12 +148,6 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = 10; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "status"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - pShow->bytes[cols] = 10; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "role"); @@ -220,14 +202,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, mpeerGetMnodeStatusStr(pMnode->status)); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; strcpy(pWrite, mpeerGetMnodeRoleStr(pMnode->role)); cols++; numOfRows++; + + mpeerReleaseMnode(pMnode); } pShow->numOfReads += numOfRows; diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index bc0d1b81f2e5c9aa7e0e9938d824019f1ff16899..70e2be5f4c8b643bc199a056796f56d126134ac0 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -15,28 +15,16 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosdef.h" #include "taoserror.h" -#include "tchecksum.h" -#include "tglobalcfg.h" #include "tlog.h" #include "trpc.h" -#include "tutil.h" +#include "tqueue.h" #include "twal.h" -#include "tsync.h" #include "hashint.h" #include "hashstr.h" +#include "mpeer.h" #include "mgmtSdb.h" -typedef struct { - int32_t code; - int64_t version; - void * sync; - void * wal; - sem_t sem; - pthread_mutex_t mutex; -} SSdbSync; - typedef struct _SSdbTable { char tableName[TSDB_DB_NAME_LEN + 1]; ESdbTable tableId; @@ -47,13 +35,13 @@ typedef struct _SSdbTable { int32_t autoIndex; int64_t numOfRows; void * iHandle; - int32_t (*insertFp)(SSdbOperDesc *pDesc); - int32_t (*deleteFp)(SSdbOperDesc *pOper); - int32_t (*updateFp)(SSdbOperDesc *pOper); - int32_t (*decodeFp)(SSdbOperDesc *pOper); - int32_t (*encodeFp)(SSdbOperDesc *pOper); - int32_t (*destroyFp)(SSdbOperDesc *pOper); - int32_t (*updateAllFp)(); + int32_t (*insertFp)(SSdbOperDesc *pDesc); + int32_t (*deleteFp)(SSdbOperDesc *pOper); + int32_t (*updateFp)(SSdbOperDesc *pOper); + int32_t (*decodeFp)(SSdbOperDesc *pOper); + int32_t (*encodeFp)(SSdbOperDesc *pOper); + int32_t (*destroyFp)(SSdbOperDesc *pOper); + int32_t (*restoredFp)(); pthread_mutex_t mutex; } SSdbTable; @@ -70,18 +58,17 @@ typedef enum { static SSdbTable *tsSdbTableList[SDB_TABLE_MAX] = {0}; static int32_t tsSdbNumOfTables = 0; -static SSdbSync * tsSdbSync; - -static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash}; -static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash}; -static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash}; -static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData}; -static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash}; -static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData}; -static int sdbProcessWrite(void *param, void *data, int type); - -uint64_t sdbGetVersion() { return tsSdbSync->version; } -int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } +static SSdbObject * tsSdbObj; + +static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash}; +static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash, sdbAddIntHash}; +static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash, sdbDeleteIntHash}; +static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData, sdbGetIntHashData}; +static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash}; +static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; + +uint64_t sdbGetVersion() { return tsSdbObj->version; } +int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } static char *sdbGetActionStr(int32_t action) { @@ -101,6 +88,7 @@ static char *sdbGetkeyStr(SSdbTable *pTable, void *row) { switch (pTable->keyType) { case SDB_KEY_STRING: return (char *)row; + case SDB_KEY_INT: case SDB_KEY_AUTO: sprintf(str, "%d", *(int32_t *)row); return str; @@ -113,44 +101,27 @@ static void *sdbGetTableFromId(int32_t tableId) { return tsSdbTableList[tableId]; } -// static void mpeerConfirmForward(void *ahandle, void *param, int32_t code) { -// sem_post(&tsSdbSync->sem); -// mPrint("mpeerConfirmForward"); -// } - -static int32_t sdbForwardDbReqToPeer(SWalHead *pHead) { - // int32_t code = syncForwardToPeer(NULL, pHead, NULL); - // if (code < 0) { - // return code; - // } - - // sem_wait(&tsSdbSync->sem); - // return tsSdbSync->code; - return TSDB_CODE_SUCCESS; -} - int32_t sdbInit() { - tsSdbSync = calloc(1, sizeof(SSdbSync)); - sem_init(&tsSdbSync->sem, 0, 0); - pthread_mutex_init(&tsSdbSync->mutex, NULL); + tsSdbObj = calloc(1, sizeof(SSdbObject)); + pthread_mutex_init(&tsSdbObj->mutex, NULL); SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1}; - tsSdbSync->wal = walOpen(tsMnodeDir, &walCfg); - if (tsSdbSync->wal == NULL) { + tsSdbObj->wal = walOpen(tsMnodeDir, &walCfg); + if (tsSdbObj->wal == NULL) { sdbError("failed to open sdb in %s", tsMnodeDir); return -1; } sdbTrace("open sdb file for read"); - walRestore(tsSdbSync->wal, tsSdbSync, sdbProcessWrite); + walRestore(tsSdbObj->wal, tsSdbObj, sdbProcessWrite); int32_t totalRows = 0; int32_t numOfTables = 0; for (int32_t tableId = SDB_TABLE_DNODE; tableId < SDB_TABLE_MAX; ++tableId) { SSdbTable *pTable = sdbGetTableFromId(tableId); if (pTable == NULL) continue; - if (pTable->updateAllFp) { - (*pTable->updateAllFp)(); + if (pTable->restoredFp) { + (*pTable->restoredFp)(); } totalRows += pTable->numOfRows; @@ -158,20 +129,26 @@ int32_t sdbInit() { sdbTrace("table:%s, is initialized, numOfRows:%d", pTable->tableName, pTable->numOfRows); } - sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbSync->version, totalRows, numOfTables); + sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables); + + mpeerUpdateSync(); + return TSDB_CODE_SUCCESS; } void sdbCleanUp() { - if (tsSdbSync) { - sem_destroy(&tsSdbSync->sem); - pthread_mutex_destroy(&tsSdbSync->mutex); - walClose(tsSdbSync->wal); - free(tsSdbSync); - tsSdbSync = NULL; + if (tsSdbObj) { + pthread_mutex_destroy(&tsSdbObj->mutex); + walClose(tsSdbObj->wal); + free(tsSdbObj); + tsSdbObj = NULL; } } +SSdbObject *sdbGetObj() { + return tsSdbObj; +} + void sdbIncRef(void *handle, void *pRow) { if (pRow) { SSdbTable *pTable = handle; @@ -241,6 +218,11 @@ static int32_t sdbInsertLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj, &rowMeta); sdbIncRef(pTable, pOper->pObj); pTable->numOfRows++; + + if (pTable->keyType == SDB_KEY_AUTO) { + pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj)); + } + pthread_mutex_unlock(&pTable->mutex); sdbTrace("table:%s, insert record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), @@ -278,20 +260,20 @@ static int32_t sdbUpdateLocal(SSdbTable *pTable, SSdbOperDesc *pOper) { static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_t action) { int32_t code = 0; - pthread_mutex_lock(&tsSdbSync->mutex); - tsSdbSync->version++; - pHead->version = tsSdbSync->version; + pthread_mutex_lock(&tsSdbObj->mutex); + tsSdbObj->version++; + pHead->version = tsSdbObj->version; - code = sdbForwardDbReqToPeer(pHead); + code = mpeerForwardReqToPeer(pHead); if (code != TSDB_CODE_SUCCESS) { - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code)); return code; } - code = walWrite(tsSdbSync->wal, pHead); - pthread_mutex_unlock(&tsSdbSync->mutex); + code = walWrite(tsSdbObj->wal, pHead); + pthread_mutex_unlock(&tsSdbObj->mutex); if (code < 0) { sdbError("table:%s, failed to %s record:%s to file, version:%" PRId64 ", reason:%s", pTable->tableName, @@ -301,26 +283,25 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ sdbGetkeyStr(pTable, pHead->cont), pHead->version); } - walFsync(tsSdbSync->wal); - free(pHead); - + walFsync(tsSdbObj->wal); + taosFreeQitem(pHead); return code; } static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_t action) { - pthread_mutex_lock(&tsSdbSync->mutex); - if (pHead->version <= tsSdbSync->version) { - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_lock(&tsSdbObj->mutex); + if (pHead->version <= tsSdbObj->version) { + pthread_mutex_unlock(&tsSdbObj->mutex); return TSDB_CODE_SUCCESS; - } else if (pHead->version != tsSdbSync->version + 1) { - pthread_mutex_unlock(&tsSdbSync->mutex); + } else if (pHead->version != tsSdbObj->version + 1) { + pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to restore %s record:%s from file, version:%" PRId64 " too large, sdb version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, - tsSdbSync->version); + tsSdbObj->version); return TSDB_CODE_OTHERS; } - tsSdbSync->version = pHead->version; + tsSdbObj->version = pHead->version; sdbTrace("table:%s, success to restore %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); @@ -335,7 +316,7 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_ if (code < 0) { sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } @@ -369,17 +350,17 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_ if (code < 0) { sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version); - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } code = sdbInsertLocal(pTable, &oper2); } - pthread_mutex_unlock(&tsSdbSync->mutex); + pthread_mutex_unlock(&tsSdbObj->mutex); return code; } -static int sdbProcessWrite(void *param, void *data, int type) { +int sdbProcessWrite(void *param, void *data, int type) { SWalHead *pHead = data; int32_t tableId = pHead->msgType / 10; int32_t action = pHead->msgType % 10; @@ -417,7 +398,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { if (pOper->type == SDB_OPER_GLOBAL) { int32_t size = sizeof(SWalHead) + pTable->maxRowSize; - SWalHead *pHead = calloc(1, size); + SWalHead *pHead = taosAllocateQitem(size); pHead->version = 0; pHead->len = pOper->rowSize; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; @@ -426,7 +407,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; } @@ -453,6 +434,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { case SDB_KEY_STRING: rowSize = strlen((char *)pOper->pObj) + 1; break; + case SDB_KEY_INT: case SDB_KEY_AUTO: rowSize = sizeof(uint64_t); break; @@ -461,13 +443,13 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { } int32_t size = sizeof(SWalHead) + rowSize; - SWalHead *pHead = calloc(1, size); + SWalHead *pHead = taosAllocateQitem(size); pHead->version = 0; pHead->len = rowSize; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; memcpy(pHead->cont, pOper->pObj, rowSize); - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; } @@ -489,7 +471,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { if (pOper->type == SDB_OPER_GLOBAL) { int32_t size = sizeof(SWalHead) + pTable->maxRowSize; - SWalHead *pHead = calloc(1, size); + SWalHead *pHead = taosAllocateQitem(size); pHead->version = 0; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; @@ -497,7 +479,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; - int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType); + int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType); if (code < 0) return code; } @@ -522,6 +504,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { void *sdbOpenTable(SSdbTableDesc *pDesc) { SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable)); + if (pTable == NULL) return NULL; strcpy(pTable->tableName, pDesc->tableName); @@ -536,7 +519,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->encodeFp = pDesc->encodeFp; pTable->decodeFp = pDesc->decodeFp; pTable->destroyFp = pDesc->destroyFp; - pTable->updateAllFp = pDesc->updateAllFp; + pTable->restoredFp = pDesc->restoredFp; if (sdbInitIndexFp[pTable->keyType] != NULL) { pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SRowMeta)); @@ -575,7 +558,7 @@ void sdbCloseTable(void *handle) { } pthread_mutex_destroy(&pTable->mutex); - + sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbNumOfTables); free(pTable); } diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index dbd7627d3f1a81c41e9341c0daef954911445e43..5010429db39e781735149265072556a4dbb74162 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -42,7 +42,6 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *sec static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); -static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg); static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg); @@ -142,19 +141,13 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } - if (mpeerCheckRedirect()) { + if (!mpeerIsMaster()) { // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER); rpcFreeCont(rpcMsg->pCont); return; } - if (!mpeerInServerStatus()) { - mgmtProcessMsgWhileNotReady(rpcMsg); - rpcFreeCont(rpcMsg->pCont); - return; - } - if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED); rpcFreeCont(rpcMsg->pCont); @@ -501,18 +494,6 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { rpcSendResponse(&rpcRsp); } -static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg) { - mTrace("%s is ignored since SDB is not ready", taosMsg[rpcMsg->msgType]); - SRpcMsg rpcRsp = { - .msgType = 0, - .pCont = 0, - .contLen = 0, - .code = TSDB_CODE_NOT_READY, - .handle = rpcMsg->handle - }; - rpcSendResponse(&rpcRsp); -} - void mgmtSendSimpleResp(void *thandle, int32_t code) { SRpcMsg rpcRsp = { .msgType = 0, diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index feed12f97ee6f40ad1e042389c09efe20b0eea47..1c384fdfdf94c3911403411a0cdabd8bd6f13dbc 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -220,7 +220,7 @@ static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtChildTableActionUpdateAll() { +static int32_t mgmtChildTableActionRestored() { void *pNode = NULL; void *pLastNode = NULL; SChildTableObj *pTable = NULL; @@ -320,7 +320,7 @@ static int32_t mgmtInitChildTables() { .encodeFp = mgmtChildTableActionEncode, .decodeFp = mgmtChildTableActionDecode, .destroyFp = mgmtChildTableActionDestroy, - .updateAllFp = mgmtChildTableActionUpdateAll + .restoredFp = mgmtChildTableActionRestored }; tsChildTableSdb = sdbOpenTable(&tableDesc); @@ -414,7 +414,7 @@ static int32_t mgmtSuperTableActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtSuperTableActionUpdateAll() { +static int32_t mgmtSuperTableActionRestored() { return 0; } @@ -435,7 +435,7 @@ static int32_t mgmtInitSuperTables() { .encodeFp = mgmtSuperTableActionEncode, .decodeFp = mgmtSuperTableActionDecode, .destroyFp = mgmtSuperTableActionDestroy, - .updateAllFp = mgmtSuperTableActionUpdateAll + .restoredFp = mgmtSuperTableActionRestored }; tsSuperTableSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 3a49e5633198d1e8dd668fb74adcc2e8447d8540..7fa1a13bfd728c2b63dcd7673225396958349b14 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -84,12 +84,14 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtUserActionUpdateAll() { - SAcctObj *pAcct = acctGetAcct("root"); - mgmtCreateUser(pAcct, "root", "taosdata"); - mgmtCreateUser(pAcct, "monitor", tsInternalPass); - mgmtCreateUser(pAcct, "_root", tsInternalPass); - acctReleaseAcct(pAcct); +static int32_t mgmtUserActionRestored() { + if (strcmp(tsMasterIp, tsPrivateIp) == 0) { + SAcctObj *pAcct = acctGetAcct("root"); + mgmtCreateUser(pAcct, "root", "taosdata"); + mgmtCreateUser(pAcct, "monitor", tsInternalPass); + mgmtCreateUser(pAcct, "_root", tsInternalPass); + acctReleaseAcct(pAcct); + } return 0; } @@ -111,7 +113,7 @@ int32_t mgmtInitUsers() { .encodeFp = mgmtUserActionEncode, .decodeFp = mgmtUserActionDecode, .destroyFp = mgmtUserActionDestroy, - .updateAllFp = mgmtUserActionUpdateAll + .restoredFp = mgmtUserActionRestored }; tsUserSdb = sdbOpenTable(&tableDesc); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 19468dc54797c109d3b3a9f38d0a68b8b5679fae..cc8dba52dd48a91faf91014960552ddbcdcf75db 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -152,7 +152,7 @@ static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtVgroupActionUpdateAll() { +static int32_t mgmtVgroupActionRestored() { return 0; } @@ -173,7 +173,7 @@ int32_t mgmtInitVgroups() { .encodeFp = mgmtVgroupActionEncode, .decodeFp = mgmtVgroupActionDecode, .destroyFp = mgmtVgroupActionDestroy, - .updateAllFp = mgmtVgroupActionUpdateAll, + .restoredFp = mgmtVgroupActionRestored, }; tsVgroupSdb = sdbOpenTable(&tableDesc); diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 171f811b7d4f07d6ad2a5836669e9c3782ec0e41..5c321beffb0ad6e07d4f6cd4438c62f395cba6ef 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -270,7 +270,7 @@ void httpCleanUpConnect(HttpServer *pServer) { for (i = 0; i < pServer->numOfThreads; ++i) { pThread = pServer->pThreads + i; - taosCloseSocket(pThread->pollFd); + //taosCloseSocket(pThread->pollFd); while (pThread->pHead) { httpCleanUpContext(pThread->pHead, 0); @@ -591,7 +591,6 @@ void httpAcceptHttpConnection(void *arg) { bool httpInitConnect(HttpServer *pServer) { int i; - pthread_attr_t thattr; HttpThread * pThread; pServer->pThreads = (HttpThread *)malloc(sizeof(HttpThread) * (size_t)pServer->numOfThreads); @@ -601,8 +600,6 @@ bool httpInitConnect(HttpServer *pServer) { } memset(pServer->pThreads, 0, sizeof(HttpThread) * (size_t)pServer->numOfThreads); - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pThread = pServer->pThreads; for (i = 0; i < pServer->numOfThreads; ++i) { sprintf(pThread->label, "%s%d", pServer->label, i); @@ -626,21 +623,27 @@ bool httpInitConnect(HttpServer *pServer) { return false; } + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) { httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", pThread->label, strerror(errno)); return false; } + pthread_attr_destroy(&thattr); httpTrace("http thread:%p:%s, initialized", pThread, pThread->label); pThread++; } + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pServer->thread), &thattr, (void *)httpAcceptHttpConnection, (void *)(pServer)) != 0) { httpError("http server:%s, failed to create Http accept thread, reason:%s", pServer->label, strerror(errno)); return false; } - pthread_attr_destroy(&thattr); httpTrace("http server:%s, initialized, ip:%s:%u, numOfThreads:%d", pServer->label, pServer->serverIp, diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 46f31a12d6e9b67f9d3ef772ab7c43bf8dc5e483..2a118cc2b14d01a8600d85970318bbcd3676131e 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -54,7 +54,7 @@ static HttpServer *httpServer = NULL; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); int httpInitSystem() { - taos_init(); + // taos_init(); httpServer = (HttpServer *)malloc(sizeof(HttpServer)); memset(httpServer, 0, sizeof(HttpServer)); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 4d77e007ad5aced0815b93d10252fa9936ec077d..7ec9b0fef79d387e553d0c0f000bbe1d3187cb88 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -160,7 +160,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; - pVnode->sync = syncStart(&syncInfo);; + pVnode->sync = syncStart(&syncInfo); pVnode->events = NULL; pVnode->cq = NULL; diff --git a/tests/script/tmp/dnode2.sim b/tests/script/tmp/dnode2.sim deleted file mode 100644 index 6d9a844fb6562ff2af26d52c2de387cb52d0dd2a..0000000000000000000000000000000000000000 --- a/tests/script/tmp/dnode2.sim +++ /dev/null @@ -1,6 +0,0 @@ -system sh/stop_dnodes.sh -system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 -system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 -system sh/exec_up.sh -n dnode1 -s start -system sh/exec_up.sh -n dnode2 -s start -sql connect \ No newline at end of file diff --git a/tests/script/tmp/mnodes.sim b/tests/script/tmp/mnodes.sim new file mode 100644 index 0000000000000000000000000000000000000000..32e72f16ffdf5b11a9a7c57e145aa09c6e348756 --- /dev/null +++ b/tests/script/tmp/mnodes.sim @@ -0,0 +1,7 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 +system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 +system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3 +system sh/cfg.sh -n dnode1 -c numOfMPeers -v 3 +system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3 +system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3