diff --git a/CMakeLists.txt b/CMakeLists.txt index 7443f1a9110e68d57f85f6241a76bb9f7f473aea..5a9a42cc3627d18af7fd48064b35f5abd6b15601 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,8 @@ PROJECT(TDengine) SET(TD_CLUSTER FALSE) SET(TD_ACCOUNT FALSE) +SET(TD_VPEER FALSE) +SET(TD_MPEER FALSE) SET(TD_GRANT FALSE) SET(TD_COVER FALSE) SET(TD_PAGMODE_LITE FALSE) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a2af7ea16b21263b561f00d2bb6fd7bc8ccfcca2..327120e0ed937034db4bd4ce63ee11c8904ba6c7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1835,7 +1835,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { } for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode); + pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId); } SSchema* pSchema = pMetaMsg->schema; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b93ec0b6d381520c8d693f78f72f39685e13d329..d616a8bffa402ed8f2bcab6d8fa392c5de9c950c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1454,7 +1454,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex); + trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex); pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -1470,7 +1470,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort if (vnodeInfo != NULL) { tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, + vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId, trsupport->subqueryIndex, pState->code); } else { tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql, @@ -1481,7 +1481,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } else { // success, proceed to retrieve data from dnode if (vnodeInfo != NULL) { tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, + vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId, trsupport->subqueryIndex); } else { tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 8a70107629e54b14a41e511cf332ea8eea107916..4bd89e238e7d1e08d9a0f4b223b72f5ed1dee369 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -26,6 +26,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF (TD_CLUSTER) TARGET_LINK_LIBRARIES(taosd cluster) ENDIF () + + IF (TD_VPEER) + TARGET_LINK_LIBRARIES(taosd balance) + ENDIF () SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 36f3a0e6127c4b288fb19bd7c88d9c1c0925438c..e2de9bf5860b7e6d4f759c868eefb51a71018b5a 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -32,6 +32,7 @@ static int32_t dnodeInitSystem(); static int32_t dnodeInitStorage(); +extern void grantParseParameter(); static void dnodeCleanupStorage(); static void dnodeCleanUpSystem(); static void dnodeSetRunStatus(SDnodeRunStatus status); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 8e523eaf46701981a5469acfc19135a3fc4013d4..5f1e7a7a9431f4e6a16b7f61981a42aa36042fcf 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -64,7 +64,7 @@ int32_t dnodeInitMgmt() { dError("failed to init dnode timer"); return -1; } - + int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -104,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static int dnodeGetVnodeList(int32_t vnodeList[]) { +static int32_t dnodeGetVnodeList(int32_t vnodeList[]) { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { return TSDB_CODE_NO_WRITE_ACCESS; @@ -129,47 +129,59 @@ static int dnodeGetVnodeList(int32_t vnodeList[]) { } static int32_t dnodeOpenVnodes() { - char vnodeDir[TSDB_FILENAME_LEN * 3]; - int failed = 0; + char vnodeDir[TSDB_FILENAME_LEN * 3]; + int32_t failed = 0; - int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); - int numOfVnodes = dnodeGetVnodeList(vnodeList); - - for (int i=0; ipCont; - pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); - pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); - pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - pCreate->cfg.commitLog = pCreate->cfg.commitLog; - + pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); + pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); + pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize); + pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1); + pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2); + pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep); + pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); + pCreate->cfg.rowsInFileBlock = htonl(pCreate->cfg.rowsInFileBlock); + pCreate->cfg.blocksPerTable = htons(pCreate->cfg.blocksPerTable); + pCreate->cfg.cacheNumOfBlocks.totalBlocks = htonl(pCreate->cfg.cacheNumOfBlocks.totalBlocks); + + for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { + pCreate->vpeerDesc[j].vgId = htonl(pCreate->vpeerDesc[j].vgId); + pCreate->vpeerDesc[j].dnodeId = htonl(pCreate->vpeerDesc[j].dnodeId); + pCreate->vpeerDesc[j].ip = htonl(pCreate->vpeerDesc[j].ip); + } + return vnodeCreate(pCreate); } static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { - SMDDropVnodeMsg *pDrop = rpcMsg->pCont; pDrop->vgId = htonl(pDrop->vgId); @@ -177,7 +189,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { } static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { - SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); @@ -206,7 +217,7 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { static void dnodeSendStatusMsg(void *handle, void *tmrId) { if (tsDnodeTmr == NULL) { - dError("dnode timer is already released"); + dError("dnode timer is already released"); return; } diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index f71729c87a1c3a73cec1ae1df744c0046c73c6c3..31296e0201fbdf9a0fecb0e4cf5668926b2fd316 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -192,6 +192,7 @@ static void dnodeHandleIdleReadWorker() { } } +UNUSED_FUNC static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { SRpcContext *pRpcContext = pRead->pRpcContext; int32_t code = 0; diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 951d43c34b88a2da659aa7429f37defb50837fb8..3681ef22c488a61825c97d6c6be29c079d9d3a73 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -37,7 +37,7 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead; - int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; + int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); if (numOfThreads < 1) { numOfThreads = 1; diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index aee14ed48414aa68590e246f8563806cd1df6a20..2e0ec5f95b1f00ae5ba60b978e7b00136bbf8efd 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -52,7 +52,6 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker); SWriteWorkerPool wWorkerPool; int32_t dnodeInitWrite() { - wWorkerPool.max = tsNumOfCores; wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); if (wWorkerPool.writeWorker == NULL) return -1; @@ -71,7 +70,7 @@ void dnodeCleanupWrite() { } void dnodeWrite(SRpcMsg *pMsg) { - char *pCont = (char *) pMsg->pCont; + char *pCont = (char *)pMsg->pCont; if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { SMsgDesc *pDesc = (SMsgDesc *)pCont; @@ -80,16 +79,16 @@ void dnodeWrite(SRpcMsg *pMsg) { } SMsgHead *pHead = (SMsgHead *) pCont; - pHead->vgId = htonl(pHead->vgId); - pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + pHead->contLen = htonl(pHead->contLen); taos_queue queue = vnodeGetWqueue(pHead->vgId); if (queue) { // put message into queue SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); - pWrite->rpcMsg = *pMsg; - pWrite->pCont = pCont; - pWrite->contLen = pHead->contLen; + pWrite->rpcMsg = *pMsg; + pWrite->pCont = pCont; + pWrite->contLen = pHead->contLen; taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); } else { @@ -227,4 +226,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { pthread_exit(NULL); } } - diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 2fbceb06dfccf5ebac4d7c461698248bbfa043ef..3f66c46d7b31ce4e0e690c539bb5f273aab649c6 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -98,7 +98,6 @@ typedef struct { typedef struct { int32_t dnodeId; - int32_t vnode; uint32_t privateIp; uint32_t publicIp; } SVnodeGid; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 54899678ccccfc4b4ce4b3664d177a9f87ef89e0..841c5b658f1d37e31cc620872bf59f9790cb3c23 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -156,28 +156,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 110, "query cancelled TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value") -// TAOS_DEFINE_ERROR(TSDB_CODE_SYNC_REQUIRED, 0, 99, "sync required") -// TAOS_DEFINE_ERROR(TSDB_CODE_UNSYNCED, 0, 100, "unsyned") -// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_ALREADY_IMPORTED, 0, 75, "data already imported") -// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_COMMIT_LOG, 0, 109, "invalid commit log") // commit log init failed -// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode status") -// TAOS_DEFINE_ERROR(TSDB_CODE_TIMESTAMP_OUT_OF_RANGE, 0, 105, "timestamp out of range") -// TAOS_DEFINE_ERROR(TSDB_CODE_DUPLICATE_TAGS, 0, 112, "duplicate tags") // tags value for join not unique -// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SUBMIT_MSG, 0, 113, "invalid submit message") -// TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources") -// TAOS_DEFINE_ERROR(TSDB_CODE_FILE_BLOCK_TS_DISORDERED, 0, 108, "file block ts disordered") // time stamp in file block is disordered -// TAOS_DEFINE_ERROR(TSDB_CODE_BATCH_SIZE_TOO_BIG, 0, 104, "batch size too big") -// TAOS_DEFINE_ERROR(TSDB_CODE_WRONG_SCHEMA, 0, 53, "wrong schema") -// TAOS_DEFINE_ERROR(TSDB_CODE_NO_QSUMMARY, 0, 68, "no qsummery") -// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_METER_ID, 0, 27, "invalid meter id") -// TAOS_DEFINE_ERROR(TSDB_CODE_METRICMETA_EXPIRED, 0, 63, "metricmeta expired") // local cached metric-meta expired causes error in metric query -// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_ALREADY_EXIST, 0, 67, "session already exist") -// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_NOT_READY, 0, 103, "session not ready") // table NOT in ready state -// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_OVERFLOW, 0, 82, "data overflow") -// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_TRANS_NOT_FINISHED, 0, 17, "action transaction not finished") -// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NOT_ONLINE, 0, 18, "action not online") -// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_SEND_FAILD, 0, 19, "action send failed") -// TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_SESSION, 0, 20, "not active session") +// others +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 120, "invalid file format") + #ifdef TAOS_ERROR_C }; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index d4fd452952a93a6151b6fe6e4888627135f2e180..5550ebf1774b4877520a171e77fbc84b3a354dca 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -241,7 +241,8 @@ typedef struct SSchema { } SSchema; typedef struct { - int32_t vnode; // the index of vnode + int32_t vgId; + int32_t dnodeId; uint32_t ip; } SVnodeDesc; diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index 401074d1713b6ebe1ce95c3c882eaaf1499a85fe..05f2ed94a7f77a0c5314fe008106de62ff7b07c8 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -23,7 +23,7 @@ extern "C" { int32_t mgmtInitBalance(); void mgmtCleanupBalance(); -void mgmtStartBalanceTimer(int32_t afterMs) ; +void mgmtBalanceNotify() ; int32_t mgmtAllocVnodes(SVgObj *pVgroup); #ifdef __cplusplus diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 3a7ae8dcde4f9cfecf5367fd8faddf917921339a..e697d70d5808f98c87652089f08570bfcee9b7c8 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -18,11 +18,35 @@ #include "mgmtBalance.h" #include "mgmtDnode.h" -int32_t mgmtInitBalance() { return 0; } -void mgmtCleanupBalance() {} -void mgmtStartBalanceTimer(int32_t afterMs) {} +extern int32_t balanceInit(); +extern void balanceCleanUp(); +extern void balanceNotify(); +extern int32_t balanceAllocVnodes(SVgObj *pVgroup); + +int32_t mgmtInitBalance() { +#ifdef _VPEER + return balanceInit(); +#else + return 0; +#endif +} + +void mgmtCleanupBalance() { +#ifdef _VPEER + balanceCleanUp(); +#endif +} + +void mgmtBalanceNotify() { +#ifdef _VPEER + balanceNotify(); +#endif +} int32_t mgmtAllocVnodes(SVgObj *pVgroup) { +#ifdef _VPEER + return balanceAllocVnodes(pVgroup); +#else void * pNode = NULL; SDnodeObj *pDnode = NULL; SDnodeObj *pSelDnode = NULL; @@ -53,4 +77,5 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes); return TSDB_CODE_SUCCESS; +#endif } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 14e23bad9f037fb0f3d13f82abdbe2c9220dbadc..ada0bce2e99881a7b91c17ccdbdf00384cdc1cc5 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -251,7 +251,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pDnode->status != TSDB_DN_STATUS_READY) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TSDB_DN_STATUS_READY; - mgmtStartBalanceTimer(200); + mgmtBalanceNotify(); } mgmtDecDnodeRef(pDnode); diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index cf20c47646f9c5e9d9a4779e830fd879e171a0f8..c4996fc4bb02592e8f3dfb914cfd24d47b0ae6fc 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -1512,7 +1512,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { } else { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; } - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); + pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 87f3872b0aaacba15c49a5e3678d94993518ad8a..bbf960203a533285e857a14b9299d155791cd0a5 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -32,7 +32,7 @@ #include "mgmtVgroup.h" void *tsVgroupSdb = NULL; -static int32_t tsVgUpdateSize = 0; +int32_t tsVgUpdateSize = 0; static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -93,11 +93,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); - pVgroup->vnodeGid[i].privateIp = pDnode->privateIp; - pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; - pVgroup->vnodeGid[i].vnode = pVgroup->vgId; - atomic_add_fetch_32(&pDnode->openVnodes, 1); - mgmtDecDnodeRef(pDnode); + if (pDnode != NULL) { + pVgroup->vnodeGid[i].privateIp = pDnode->privateIp; + pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; + atomic_add_fetch_32(&pDnode->openVnodes, 1); + mgmtDecDnodeRef(pDnode); + } } mgmtAddVgroupIntoDb(pVgroup); @@ -236,7 +237,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode); + mPrint("vgroup:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); } pMsg->ahandle = pVgroup; @@ -312,27 +313,21 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { } for (int32_t i = 0; i < maxReplica; ++i) { - pShow->bytes[cols] = 16; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "ip"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "vnode"); + strcpy(pSchema[cols].name, "dnode"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = 9; + pShow->bytes[cols] = 16; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "vnode status"); + strcpy(pSchema[cols].name, "ip"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = 16; + pShow->bytes[cols] = 9; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "public ip"); + strcpy(pSchema[cols].name, "vstatus"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; } @@ -416,13 +411,13 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo cols++; for (int32_t i = 0; i < maxReplica; ++i) { - tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, ipstr); + *(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId; cols++; + tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode; + strcpy(pWrite, ipstr); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -433,11 +428,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo strcpy(pWrite, "null"); } cols++; - - tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, ipstr); - cols++; } numOfRows++; @@ -490,15 +480,15 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); pCfg->daysToKeep = htonl(pCfg->daysToKeep); pCfg->commitTime = htonl(pCfg->commitTime); - pCfg->commitLog = pCfg->commitLog; - pCfg->blocksPerTable = htons(pCfg->blocksPerTable); - pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - + pCfg->blocksPerTable = htons(pCfg->blocksPerTable); + pCfg->replications = (int8_t) pVgroup->numOfVnodes; + SVnodeDesc *vpeerDesc = pVnode->vpeerDesc; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { - vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); - vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp); + vpeerDesc[j].vgId = htonl(pVgroup->vgId); + vpeerDesc[j].dnodeId = htonl(pVgroup->vnodeGid[j].dnodeId); + vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp); } return pVnode; diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/main/inc/vnodeInt.h index 5ee1d5c18b658ebb333ed24436eaaa5738abdf84..7e6caf6168a7ad758f4eb4e2c4858b4db2d639f5 100644 --- a/src/vnode/main/inc/vnodeInt.h +++ b/src/vnode/main/inc/vnodeInt.h @@ -41,6 +41,8 @@ typedef struct { void *sync; void *events; void *cq; // continuous query + int32_t replicas; + SVnodeDesc vpeers[TSDB_MAX_MPEERS]; } SVnodeObj; int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 31004d3033f97b6d1af35be341524791b362fdbd..e7081351765ec02c53509bfc60ae43a38c8db564 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -29,16 +29,17 @@ #include "vnode.h" #include "vnodeInt.h" -static void *tsDnodeVnodesHash; -static void vnodeCleanUp(SVnodeObj *pVnode); -static void vnodeBuildVloadMsg(char *pNode, void * param); -static int vnodeWALCallback(void *arg); +static void *tsDnodeVnodesHash; +static void vnodeCleanUp(SVnodeObj *pVnode); +static void vnodeBuildVloadMsg(char *pNode, void * param); +static int vnodeWALCallback(void *arg); +static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); +static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int tsOpennedVnodes; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static void vnodeInit() { - vnodeInitWriteFp(); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt); @@ -51,12 +52,12 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; pthread_once(&vnodeModuleInit, vnodeInit); - SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); + SVnodeObj *pTemp = (SVnodeObj *)taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); if (pTemp != NULL) { dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp); return TSDB_CODE_SUCCESS; - } + } STsdbCfg tsdbCfg = {0}; tsdbCfg.precision = pVnodeCfg->cfg.precision; @@ -81,12 +82,18 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { } } + code = vnodeSaveCfg(pVnodeCfg); + if (code != TSDB_CODE_SUCCESS) { + dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code)); + return code; + } + char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); - if (code <0) { + if (code != TSDB_CODE_SUCCESS) { dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno)); - return code; + return terrno; } dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog); @@ -121,6 +128,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->version = 0; taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode)); + int32_t code = vnodeReadCfg(pVnode); + if (code != TSDB_CODE_SUCCESS) { + dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return code; + } + pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); @@ -258,7 +272,63 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { vnodeRelease(pVnode); } +// TODO: this is a simple implement static int vnodeWALCallback(void *arg) { SVnodeObj *pVnode = arg; return walRenew(pVnode->wal); +} + +static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { + char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; + sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId); + + FILE *fp = fopen(cfgFile, "w"); + if (!fp) return errno; + + fprintf(fp, "replicas %d\n", pVnodeCfg->cfg.replications); + for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { + fprintf(fp, "index%d dnode %d ip %u\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip); + } + + fclose(fp); + dTrace("vgId:%d, save vnode cfg successed", pVnodeCfg, pVnodeCfg->cfg.vgId); + + return TSDB_CODE_SUCCESS; +} + +// TODO: this is a simple implement +static int32_t vnodeReadCfg(SVnodeObj *pVnode) { + char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; + sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId); + + FILE *fp = fopen(cfgFile, "r"); + if (!fp) return errno; + + char option[3][32] = {0}; + int32_t replicas = 0; + int32_t num = fscanf(fp, "%s %d", option[0], &replicas); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "replicas") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (replicas == 0) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->replicas = replicas; + + for (int32_t i = 0; i < replicas; ++i) { + int32_t dnodeId = 0; + uint32_t dnodeIp = 0; + num = fscanf(fp, "%s %s %d %s %u", option[0], option[1], &dnodeId, option[2], &dnodeIp); + if (num != 5) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[1], "dnode") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[2], "ip") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (dnodeId == 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (dnodeIp == 0) return TSDB_CODE_INVALID_FILE_FORMAT; + + pVnode->vpeers[i].dnodeId = dnodeId; + pVnode->vpeers[i].ip = dnodeIp; + pVnode->vpeers[i].vgId = pVnode->vgId; + } + + fclose(fp); + dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId); + + return TSDB_CODE_SUCCESS; } \ No newline at end of file