diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index a788df2e32306e7c1fc5ca97e42bcc78a7acb4b5..aef7d5711fb1b73d2542ebe34b9c77c636827807 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -591,8 +591,8 @@ typedef struct { int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; - int32_t minRowsPerFileBlock; - int32_t maxRowsPerFileBlock; + int32_t minRows; + int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; int8_t walLevel; @@ -706,7 +706,7 @@ typedef struct { SVnodeLoad data[]; } SVnodeLoads; -typedef struct SStatusMsg { +typedef struct { int32_t sver; int32_t dnodeId; int32_t clusterId; @@ -756,6 +756,7 @@ typedef struct { int32_t dnodeId; char db[TSDB_FULL_DB_NAME_LEN]; uint64_t dbUid; + int32_t vgVersion; int32_t cacheBlockSize; int32_t totalBlocks; int32_t daysPerFile; diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 8458ad9da38d009524f8eee25d902ca305c0cdb2..081fcd7f7d04785db19ca3ac5abd162595c364be 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -197,8 +197,8 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type); // int32_t daysToKeep0; // int32_t daysToKeep1; // int32_t daysToKeep2; -// int32_t minRowsPerFileBlock; -// int32_t maxRowsPerFileBlock; +// int32_t minRows; +// int32_t maxRows; // int8_t precision; // time resolution // int8_t compression; // int8_t cacheLastRow; diff --git a/include/util/tthread.h b/include/util/tthread.h index 0ff267dd1fef8e2e52d5032d15441604110655c0..7a5fd1f4c877024f2e3b8ade79a4094c1ef91ebc 100644 --- a/include/util/tthread.h +++ b/include/util/tthread.h @@ -24,8 +24,8 @@ extern "C" { #include "tdef.h" // create new thread -pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param); -// destory thread +pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param); +// destory thread bool taosDestoryThread(pthread_t* pthread); // thread running return true bool taosThreadRunning(pthread_t* pthread); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index d3f1b06a4a88fd5d96f760ed1ae7e2f9a968a3bf..cc4fc5b7d7e81abdbefb4ffcaabcda52b226ee44 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -17,11 +17,23 @@ #include "dndVnodes.h" #include "dndTransport.h" +typedef struct { + int32_t vgId; + int32_t vgVersion; + int8_t dropped; + uint64_t dbUid; + char db[TSDB_FULL_DB_NAME_LEN]; + char path[PATH_MAX + 20]; +} SWrapperCfg; + typedef struct { int32_t vgId; int32_t refCount; + int32_t vgVersion; int8_t dropped; int8_t accessState; + uint64_t dbUid; + char *db; char *path; SVnode *pImpl; taos_queue pWriteQ; @@ -32,13 +44,13 @@ typedef struct { } SVnodeObj; typedef struct { - int32_t vnodeNum; - int32_t opened; - int32_t failed; - int32_t threadIndex; - pthread_t *pThreadId; - SVnodeObj *pVnodes; - SDnode * pDnode; + int32_t vnodeNum; + int32_t opened; + int32_t failed; + int32_t threadIndex; + pthread_t thread; + SDnode *pDnode; + SWrapperCfg *pCfgs; } SVnodeThread; static int32_t dndInitVnodeReadWorker(SDnode *pDnode); @@ -73,16 +85,14 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); -static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId); +static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl); -static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode); +static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl); +static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode); static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes); -static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes); +static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); static int32_t dndWriteVnodesToFile(SDnode *pDnode); -static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg); -static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndOpenVnodes(SDnode *pDnode); static void dndCloseVnodes(SDnode *pDnode); @@ -126,22 +136,25 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } -static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) { +static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj)); + SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pVnode->vgId = vgId; - pVnode->refCount = 0; + pVnode->vgId = pCfg->vgId; + pVnode->refCount = 1; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; + pVnode->vgVersion = pCfg->vgVersion; + pVnode->dbUid = pCfg->dbUid; + pVnode->db = tstrdup(pCfg->db); + pVnode->path = tstrdup(pCfg->path); - pVnode->path = tstrdup(path); - if (pVnode->path == NULL) { + if (pVnode->path == NULL || pVnode->db == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -167,7 +180,7 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, S } taosWLockLatch(&pMgmt->latch); - int32_t code = taosHashPut(pMgmt->hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosWUnLockLatch(&pMgmt->latch); if (code != 0) { @@ -176,7 +189,7 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, S return code; } -static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode) { +static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; taosWLockLatch(&pMgmt->latch); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); @@ -195,6 +208,9 @@ static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode) { dndFreeVnodeWriteQueue(pDnode, pVnode); dndFreeVnodeApplyQueue(pDnode, pVnode); dndFreeVnodeSyncQueue(pDnode, pVnode); + free(pVnode->path); + free(pVnode->db); + free(pVnode); } static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { @@ -208,16 +224,16 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; - SVnodeObj * pVnode = *ppVnode; - if (pVnode) { + SVnodeObj *pVnode = *ppVnode; + if (pVnode && num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); + pVnodes[num] = (*ppVnode); num++; - if (num < size) { - int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); - pVnodes[num] = (*ppVnode); - } + pIter = taosHashIterate(pMgmt->hash, pIter); + } else { + taosHashCancelIterate(pMgmt->hash, pIter); } - pIter = taosHashIterate(pMgmt->hash, pIter); } taosRUnLockLatch(&pMgmt->latch); @@ -226,15 +242,15 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { return pVnodes; } -static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes) { - int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 30000; - char * content = calloc(1, maxLen + 1); - cJSON * root = NULL; - FILE * fp = NULL; - char file[PATH_MAX + 20] = {0}; - SVnodeObj *pVnodes = NULL; +static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { + int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; + char file[PATH_MAX + 20] = {0}; + SWrapperCfg *pCfgs = NULL; snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes); @@ -270,31 +286,55 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_ goto PRASE_VNODE_OVER; } - pVnodes = calloc(vnodesNum, sizeof(SVnodeObj)); - if (pVnodes == NULL) { + pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg)); + if (pCfgs == NULL) { dError("failed to read %s since out of memory", file); goto PRASE_VNODE_OVER; } for (int32_t i = 0; i < vnodesNum; ++i) { - cJSON * vnode = cJSON_GetArrayItem(vnodes, i); - SVnodeObj *pVnode = &pVnodes[i]; + cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + SWrapperCfg *pCfg = &pCfgs[i]; cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); - if (!vgId || vgId->type != cJSON_String) { + if (!vgId || vgId->type != cJSON_Number) { dError("failed to read %s since vgId not found", file); goto PRASE_VNODE_OVER; } - pVnode->vgId = atoi(vgId->valuestring); + pCfg->vgId = vgId->valueint; + snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId); cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); - if (!dropped || dropped->type != cJSON_String) { + if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); goto PRASE_VNODE_OVER; } - pVnode->dropped = atoi(vnode->valuestring); + pCfg->dropped = dropped->valueint; + + cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion"); + if (!vgVersion || vgVersion->type != cJSON_Number) { + dError("failed to read %s since vgVersion not found", file); + goto PRASE_VNODE_OVER; + } + pCfg->vgVersion = vgVersion->valueint; + + cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid"); + if (!dbUid || dbUid->type != cJSON_String) { + dError("failed to read %s since dbUid not found", file); + goto PRASE_VNODE_OVER; + } + pCfg->dbUid = atoll(dbUid->valuestring); + + cJSON *db = cJSON_GetObjectItem(vnode, "db"); + if (!db || db->type != cJSON_String) { + dError("failed to read %s since db not found", file); + goto PRASE_VNODE_OVER; + } + tstrncpy(pCfg->db, db->valuestring, TSDB_FULL_DB_NAME_LEN); } + *ppCfgs = pCfgs; + *numOfVnodes = vnodesNum; code = 0; dInfo("succcessed to read file %s", file); @@ -313,30 +353,35 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes); FILE *fp = fopen(file, "w"); - if (fp != NULL) { + if (fp == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to write %s since %s", file, terrstr()); return -1; } - - int32_t len = 0; - int32_t maxLen = 30000; - char * content = calloc(1, maxLen + 1); int32_t numOfVnodes = 0; SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); + int32_t len = 0; + int32_t maxLen = 65536; + char *content = calloc(1, maxLen + 1); + len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n"); + len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n"); for (int32_t i = 0; i < numOfVnodes; ++i) { SVnodeObj *pVnode = pVnodes[i]; - len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped); + len += snprintf(content + len, maxLen - len, " {\n"); + len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped); + len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d,\n", pVnode->vgVersion); + len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid); + len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db); if (i < numOfVnodes - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); + len += snprintf(content + len, maxLen - len, " },\n"); } else { - len += snprintf(content + len, maxLen - len, " }]\n"); + len += snprintf(content + len, maxLen - len, " }\n"); } } + len += snprintf(content + len, maxLen - len, " ]\n"); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -358,74 +403,29 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { return taosRenameFile(file, realfile); } -static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) { - char path[PATH_MAX + 20] = {0}; - snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId); - // SVnode *pImpl = vnodeCreate(vgId, path, pCfg); - - SVnode *pImpl = vnodeOpen(path, NULL); - if (pImpl == NULL) { - return -1; - } - - int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl); - if (code != 0) { - vnodeClose(pImpl); - vnodeDestroy(path); - terrno = code; - return code; - } - - code = dndWriteVnodesToFile(pDnode); - if (code != 0) { - vnodeClose(pImpl); - vnodeDestroy(path); - terrno = code; - return code; - } - - return 0; -} - -static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) { - pVnode->dropped = 1; - if (dndWriteVnodesToFile(pDnode) != 0) { - pVnode->dropped = 0; - return -1; - } - - dndDropVnodeWrapper(pDnode, pVnode); - vnodeClose(pVnode->pImpl); - vnodeDestroy(pVnode->path); - dndWriteVnodesToFile(pDnode); - return 0; -} - static void *dnodeOpenVnodeFunc(void *param) { SVnodeThread *pThread = param; - SDnode * pDnode = pThread->pDnode; - SVnodesMgmt * pMgmt = &pDnode->vmgmt; + SDnode *pDnode = pThread->pDnode; + SVnodesMgmt *pMgmt = &pDnode->vmgmt; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); setThreadName("open-vnodes"); for (int32_t v = 0; v < pThread->vnodeNum; ++v) { - SVnodeObj *pVnode = &pThread->pVnodes[v]; + SWrapperCfg *pCfg = &pThread->pCfgs[v]; char stepDesc[TSDB_STEP_DESC_LEN] = {0}; - snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId, + snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId, pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - char path[PATH_MAX + 20] = {0}; - snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, pVnode->vgId); - SVnode *pImpl = vnodeOpen(path, NULL); + SVnode *pImpl = vnodeOpen(pCfg->path, NULL); if (pImpl == NULL) { - dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex); + dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; } else { - dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl); - dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex); + dndOpenVnode(pDnode, pCfg, pImpl); + dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; } @@ -448,9 +448,9 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { return -1; } - SVnodeObj *pVnodes = NULL; - int32_t numOfVnodes = 0; - if (dndGetVnodesFromFile(pDnode, &pVnodes, &numOfVnodes) != 0) { + SWrapperCfg *pCfgs = NULL; + int32_t numOfVnodes = 0; + if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) { dInfo("failed to get vnode list from disk since %s", terrstr()); return -1; } @@ -463,13 +463,14 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); for (int32_t t = 0; t < threadNum; ++t) { threads[t].threadIndex = t; - threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj)); + threads[t].pDnode = pDnode; + threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg)); } for (int32_t v = 0; v < numOfVnodes; ++v) { int32_t t = v % threadNum; SVnodeThread *pThread = &threads[t]; - pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v]; + pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v]; } dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes); @@ -478,19 +479,25 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { SVnodeThread *pThread = &threads[t]; if (pThread->vnodeNum == 0) continue; - pThread->pThreadId = taosCreateThread(dnodeOpenVnodeFunc, pThread); - if (pThread->pThreadId == NULL) { + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) { dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); } + + pthread_attr_destroy(&thAttr); } for (int32_t t = 0; t < threadNum; ++t) { SVnodeThread *pThread = &threads[t]; - taosDestoryThread(pThread->pThreadId); - pThread->pThreadId = NULL; - free(pThread->pVnodes); + if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { + pthread_join(pThread->thread, NULL); + } + free(pThread->pCfgs); } free(threads); + free(pCfgs); if (pMgmt->openVnodes != pMgmt->totalVnodes) { dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes); @@ -508,7 +515,8 @@ static void dndCloseVnodes(SDnode *pDnode) { SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { - dndDropVnodeWrapper(pDnode, pVnodes[i]); + dndReleaseVnode(pDnode, pVnodes[i]); + dndCloseVnode(pDnode, pVnodes[i]); } if (pVnodes != NULL) { @@ -523,11 +531,12 @@ static void dndCloseVnodes(SDnode *pDnode) { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) { +static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { SCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->vgId = htonl(pCreate->vgId); pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dbUid = htobe64(pCreate->dbUid); + pCreate->vgVersion = htonl(pCreate->vgVersion); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->totalBlocks = htonl(pCreate->totalBlocks); pCreate->daysPerFile = htonl(pCreate->daysPerFile); @@ -544,9 +553,10 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg pReplica->port = htons(pReplica->port); } - *vgId = pCreate->vgId; + return pCreate; +} -#if 0 +static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { pCfg->wsize = pCreate->cacheBlockSize; pCfg->ssize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize; @@ -567,8 +577,15 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg pCfg->walCfg.rollPeriod = 128; pCfg->walCfg.segSize = 128; pCfg->walCfg.vgId = pCreate->vgId; -#endif - return 0; +} + +static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) { + memcpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN); + pCfg->dbUid = pCreate->dbUid; + pCfg->dropped = 0; + snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId); + pCfg->vgId = pCreate->vgId; + pCfg->vgVersion = pCreate->vgVersion; } static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { @@ -584,48 +601,83 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { } static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { + SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg); + dDebug("vgId:%d, create vnode req is received", pCreate->vgId); + SVnodeCfg vnodeCfg = {0}; - int32_t vgId = 0; + dndGenerateVnodeCfg(pCreate, &vnodeCfg); - dndParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); - dDebug("vgId:%d, create vnode req is received", vgId); + SWrapperCfg wrapperCfg = {0}; + dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg); - SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId); if (pVnode != NULL) { - dDebug("vgId:%d, already exist, return success", vgId); + dDebug("vgId:%d, already exist, return success", pCreate->vgId); dndReleaseVnode(pDnode, pVnode); return 0; } - if (dndCreateVnode(pDnode, vgId, &vnodeCfg) != 0) { - dError("vgId:%d, failed to create vnode since %s", vgId, terrstr()); - return terrno; + SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/); + if (pImpl == NULL) { + return -1; + } + + int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl); + if (code != 0) { + vnodeClose(pImpl); + vnodeDestroy(wrapperCfg.path); + terrno = code; + return code; + } + + code = dndWriteVnodesToFile(pDnode); + if (code != 0) { + vnodeClose(pImpl); + vnodeDestroy(wrapperCfg.path); + terrno = code; + return code; } return 0; } static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { + SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg); + dDebug("vgId:%d, alter vnode req is received", pAlter->vgId); + SVnodeCfg vnodeCfg = {0}; - int32_t vgId = 0; + dndGenerateVnodeCfg(pAlter, &vnodeCfg); - dndParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); - dDebug("vgId:%d, alter vnode req is received", vgId); + SWrapperCfg wrapperCfg = {0}; + dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg); - SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); if (pVnode == NULL) { - dDebug("vgId:%d, failed to alter vnode since %s", vgId, terrstr()); + dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); return terrno; } + if (wrapperCfg.vgVersion == pVnode->vgVersion) { + dndReleaseVnode(pDnode, pVnode); + dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId); + return 0; + } + if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) { - dError("vgId:%d, failed to alter vnode since %s", vgId, terrstr()); + dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); return terrno; } + int32_t oldVersion = pVnode->vgVersion; + pVnode->vgVersion = wrapperCfg.vgVersion; + int32_t code = dndWriteVnodesToFile(pDnode); + if (code != 0) { + pVnode->vgVersion = oldVersion; + } + dndReleaseVnode(pDnode, pVnode); - return 0; + return code; } static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { @@ -637,15 +689,21 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to drop since %s", vgId, terrstr()); - return terrno; + return 0; } - if (dndDropVnode(pDnode, pVnode) != 0) { - dError("vgId:%d, failed to drop vnode since %s", vgId, terrstr()); - dndReleaseVnode(pDnode, pVnode); + pVnode->dropped = 1; + if (dndWriteVnodesToFile(pDnode) != 0) { + pVnode->dropped = 0; return terrno; } + dndReleaseVnode(pDnode, pVnode); + dndCloseVnode(pDnode, pVnode); + vnodeClose(pVnode->pImpl); + vnodeDestroy(pVnode->path); + dndWriteVnodesToFile(pDnode); + return 0; } @@ -738,12 +796,10 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { break; } - if (code != 0) { - SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; - rpcSendResponse(&rsp); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } + SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { @@ -756,7 +812,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); - SRpcMsg * pRpcMsg = NULL; + SRpcMsg *pRpcMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); @@ -1029,7 +1085,7 @@ static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { } static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt * pMgmt = &pDnode->vmgmt; + SVnodesMgmt *pMgmt = &pDnode->vmgmt; SMWorkerPool *pPool = &pMgmt->writePool; pPool->name = "vnode-write"; pPool->max = pDnode->opt.numOfCores; @@ -1137,12 +1193,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { pLoads->num = taosHashGetSize(pMgmt->hash); int32_t v = 0; - void * pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; - SVnodeObj * pVnode = *ppVnode; + SVnodeObj *pVnode = *ppVnode; SVnodeLoad *pLoad = &pLoads->data[v++]; vnodeGetLoad(pVnode->pImpl, pLoad); diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 0dbee0d3379daa55e69ba6ed5e0b2beaa400dec7..a4996ecb3bfc0792add96700c51c4b83dcf6d80e 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -176,6 +176,12 @@ SDnode *dndInit(SDnodeOpt *pOption) { return NULL; } + if (vnodeInit(1) != 0) { + dError("failed to init vnode env"); + dndCleanup(pDnode); + return NULL; + } + if (dndInitDnode(pDnode) != 0) { dError("failed to init dnode"); dndCleanup(pDnode); @@ -222,8 +228,10 @@ void dndCleanup(SDnode *pDnode) { dndCleanupMnode(pDnode); dndCleanupVnodes(pDnode); dndCleanupDnode(pDnode); + vnodeClear(); walCleanUp(); rpcCleanup(); + dndCleanupEnv(pDnode); free(pDnode); dInfo("TDengine is cleaned up successfully"); diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index d465a62f2d5269632c4f5022e0bafca3a1e45670..204afa111fdd509aac44937774410a8165905314 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -210,8 +210,8 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { pReq->daysToKeep0 = htonl(3650); pReq->daysToKeep1 = htonl(3650); pReq->daysToKeep2 = htonl(3650); - pReq->minRowsPerFileBlock = htonl(100); - pReq->maxRowsPerFileBlock = htonl(4096); + pReq->minRows = htonl(100); + pReq->maxRows = htonl(4096); pReq->commitTime = htonl(3600); pReq->fsyncPeriod = htonl(3000); pReq->walLevel = 1; @@ -375,8 +375,8 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { pReq->daysToKeep0 = htonl(3650); pReq->daysToKeep1 = htonl(3650); pReq->daysToKeep2 = htonl(3650); - pReq->minRowsPerFileBlock = htonl(100); - pReq->maxRowsPerFileBlock = htonl(4096); + pReq->minRows = htonl(100); + pReq->maxRows = htonl(4096); pReq->commitTime = htonl(3600); pReq->fsyncPeriod = htonl(3000); pReq->walLevel = 1; diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mgmt/impl/test/stb/stb.cpp index ee8c86bcea3b5764ca3158deabbced027b70d4ee..c12e8eadf45111af660126837fa1157212f8bd57 100644 --- a/source/dnode/mgmt/impl/test/stb/stb.cpp +++ b/source/dnode/mgmt/impl/test/stb/stb.cpp @@ -187,8 +187,8 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { pReq->daysToKeep0 = htonl(3650); pReq->daysToKeep1 = htonl(3650); pReq->daysToKeep2 = htonl(3650); - pReq->minRowsPerFileBlock = htonl(100); - pReq->maxRowsPerFileBlock = htonl(4096); + pReq->minRows = htonl(100); + pReq->maxRows = htonl(4096); pReq->commitTime = htonl(3600); pReq->fsyncPeriod = htonl(3000); pReq->walLevel = 1; diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp index f2010b581388f24359ff2c15f4517a3fded22047..7cf469f8e253f0354b74cbdda7d2e8cf6cc8a2c5 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.cpp +++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp @@ -16,7 +16,7 @@ #include "deploy.h" void initLog(const char* path) { - dDebugFlag = 143; + dDebugFlag = 207; vDebugFlag = 0; mDebugFlag = 207; cDebugFlag = 0; diff --git a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp index 3f16cd87d889902b3e7d34fbbf6a2cb2d83e6771..9149be9cd838be6ddd708a1fa56e85f6dc5f2dae 100644 --- a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp +++ b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp @@ -176,49 +176,112 @@ SServer* DndTestVgroup::pServer; SClient* DndTestVgroup::pClient; int32_t DndTestVgroup::connId; - TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { { - SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg)); - pReq->vgId = htonl(2); - pReq->dnodeId = htonl(1); - strcpy(pReq->db, "1.d1"); - pReq->dbUid = htobe64(9527); - pReq->cacheBlockSize = htonl(16); - pReq->totalBlocks = htonl(10); - pReq->daysPerFile = htonl(10); - pReq->daysToKeep0 = htonl(3650); - pReq->daysToKeep1 = htonl(3650); - pReq->daysToKeep2 = htonl(3650); - pReq->minRows = htonl(100); - pReq->minRows = htonl(4096); - pReq->commitTime = htonl(3600); - pReq->fsyncPeriod = htonl(3000); - pReq->walLevel = 1; - pReq->precision = 0; - pReq->compression = 2; - pReq->replica = 1; - pReq->quorum = 1; - pReq->update = 0; - pReq->cacheLastRow = 0; - pReq->selfIndex = 0; - for (int r = 0; r < pReq->replica; ++r) { - SReplica* pReplica = &pReq->replicas[r]; - pReplica->id = htonl(1); - pReplica->port = htons(9150); + for (int i = 0; i < 3; ++i) { + SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg)); + pReq->vgId = htonl(2); + pReq->dnodeId = htonl(1); + strcpy(pReq->db, "1.d1"); + pReq->dbUid = htobe64(9527); + pReq->vgVersion = htonl(1); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRows = htonl(100); + pReq->minRows = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replica = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->selfIndex = 0; + for (int r = 0; r < pReq->replica; ++r) { + SReplica* pReplica = &pReq->replicas[r]; + pReplica->id = htonl(1); + pReplica->port = htons(9150); + } + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateVnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); } + } - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SCreateVnodeMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN; - - sendMsg(pClient, &rpcMsg); - SRpcMsg* pMsg = pClient->pRsp; - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - taosMsleep(1000000); + { + for (int i = 0; i < 3; ++i) { + SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(sizeof(SAlterVnodeMsg)); + pReq->vgId = htonl(2); + pReq->dnodeId = htonl(1); + strcpy(pReq->db, "1.d1"); + pReq->dbUid = htobe64(9527); + pReq->vgVersion = htonl(2); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRows = htonl(100); + pReq->minRows = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replica = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->selfIndex = 0; + for (int r = 0; r < pReq->replica; ++r) { + SReplica* pReplica = &pReq->replicas[r]; + pReplica->id = htonl(1); + pReplica->port = htons(9150); + } + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SAlterVnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } } + { + for (int i = 0; i < 3; ++i) { + SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(sizeof(SDropVnodeMsg)); + pReq->vgId = htonl(2); + pReq->dnodeId = htonl(1); + strcpy(pReq->db, "1.d1"); + pReq->dbUid = htobe64(9527); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SDropVnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } + } } - diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 36b6725737bb3b4cd6a070da92fe01060f80d17e..da5492057458b09aed332168919ce1b38a46e959 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -61,15 +61,14 @@ typedef enum { } EAuthOp; typedef enum { - TRN_STAGE_PREPARE = 1, - TRN_STAGE_EXECUTE = 2, + TRN_STAGE_PREPARE = 0, + TRN_STAGE_EXECUTE = 1, + TRN_STAGE_ROLLBACK = 2, TRN_STAGE_COMMIT = 3, - TRN_STAGE_ROLLBACK = 4, - TRN_STAGE_RETRY = 5, - TRN_STAGE_OVER = 6, + TRN_STAGE_OVER = 4, } ETrnStage; -typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy; +typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; typedef enum { DND_STATUS_OFFLINE = 0, diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index e9913803bd7f2fe88db87c2ea2b44aafb062f554..ba72d9553721b9220aa520b8bb11b46c1b7baeb5 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -70,6 +70,7 @@ typedef struct SMnode { tmr_h timer; char *path; SMnodeCfg cfg; + int64_t checkTime; SSdb *pSdb; SDnode *pDnode; SArray *pSteps; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 5c15e2f987d89ad00c26201bfcdc85352d2a71b0..a8d37ba655ef52c6f4fdf4b77938cab4bb0a6046 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -22,6 +22,16 @@ extern "C" { #endif +typedef struct { + SEpSet epSet; + int8_t msgType; + int8_t msgSent; + int8_t msgReceived; + int32_t errCode; + int32_t contLen; + void *pCont; +} STransAction; + int32_t mndInitTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode); @@ -30,12 +40,15 @@ void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); +int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); +int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); + int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); -char *mndTransStageStr(ETrnStage stage); -char *mndTransPolicyStr(ETrnPolicy policy); +void mndTransHandleActionRsp(SMnodeMsg *pMsg); + +char *mndTransStageStr(ETrnStage stage); +char *mndTransPolicyStr(ETrnPolicy policy); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index e85edf66da85011fe9b1c32e18a3568fb3ecc477..108896c121eeb1d3d93ffd3a1a9c93cc6f413235 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -16,13 +16,12 @@ #define _DEFAULT_SOURCE #include "mndDb.h" #include "mndDnode.h" -#include "mndMnode.h" #include "mndShow.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" -#define TSDB_DB_VER_NUM 1 +#define TSDB_DB_VER_NUMBER 1 #define TSDB_DB_RESERVE_SIZE 64 static SSdbRaw *mndDbActionEncode(SDbObj *pDb); @@ -66,7 +65,7 @@ int32_t mndInitDb(SMnode *pMnode) { void mndCleanupDb(SMnode *pMnode) {} static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_DB_VER_NUM, sizeof(SDbObj) + TSDB_DB_RESERVE_SIZE); + SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_DB_VER_NUMBER, sizeof(SDbObj) + TSDB_DB_RESERVE_SIZE); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -106,7 +105,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; - if (sver != TSDB_DB_VER_NUM) { + if (sver != TSDB_DB_VER_NUMBER) { mError("failed to decode db since %s", terrstr()); terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; @@ -160,7 +159,7 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { mTrace("db:%s, perform update action", pOldDb->name); - pOldDb->updateTime = pNewDb->createdTime; + pOldDb->updateTime = pNewDb->updateTime; pOldDb->cfgVersion = pNewDb->cfgVersion; pOldDb->vgVersion = pNewDb->vgVersion; memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg)); @@ -168,8 +167,12 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { } SDbObj *mndAcquireDb(SMnode *pMnode, char *db) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_DB, db); + SSdb *pSdb = pMnode->pSdb; + SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + } + return pDb; } void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { @@ -242,68 +245,74 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; } -static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { SSdbRaw *pDbRaw = mndDbActionEncode(pDb); - if (pDbRaw == NULL || mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1; - sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING); + if (pDbRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; - for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); - if (pVgRaw == NULL || mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; - sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING); + if (pVgRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; + if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1; } return 0; } -static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetCreateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { SSdbRaw *pDbRaw = mndDbActionEncode(pDb); - if (pDbRaw == NULL || mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1; - sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED); + if (pDbRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED) != 0) return -1; - for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); - if (pVgRaw == NULL || mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1; - sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED); + if (pVgRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1; + if (sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED) != 0) return -1; } return 0; } -static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetCreateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { SSdbRaw *pDbRaw = mndDbActionEncode(pDb); - if (pDbRaw == NULL || mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1; - sdbSetRawStatus(pDbRaw, SDB_STATUS_READY); + if (pDbRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1; - for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); - if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1; - sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); + if (pVgRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1; + if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1; } return 0; } -static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { - for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { - SVgObj *pVgroup = pVgroups + v; +static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { + for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) { + SVgObj *pVgroup = pVgroups + vg; for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { - SVnodeGid *pVgid = pVgroup->vnodeGid + vn; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pDnode == NULL) { - return -1; - } + STransAction action = {0}; + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; - SEpSet epset = mndGetDnodeEpset(pDnode); + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); - if (pMsg == NULL) { - return -1; - } + if (pMsg == NULL) return -1; - if (mndTransAppendRedoAction(pTrans, &epset, TSDB_MSG_TYPE_ALTER_VNODE_IN, sizeof(SCreateVnodeMsg), pMsg) != 0) { + action.pCont = pMsg; + action.contLen = sizeof(SCreateVnodeMsg); + action.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); return -1; } @@ -313,26 +322,26 @@ static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV return 0; } -static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { - for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { - SVgObj *pVgroup = pVgroups + v; +static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { + for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) { + SVgObj *pVgroup = pVgroups + vg; for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { - SVnodeGid *pVgid = pVgroup->vnodeGid + vn; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pDnode == NULL) { - return -1; - } + STransAction action = {0}; + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; - SEpSet epset = mndGetDnodeEpset(pDnode); + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); - if (pMsg == NULL) { - return -1; - } + if (pMsg == NULL) return -1; - if (mndTransAppendUndoAction(pTrans, &epset, TSDB_MSG_TYPE_DROP_VNODE_IN, sizeof(SDropVnodeMsg), pMsg) != 0) { + action.pCont = pMsg; + action.contLen = sizeof(SDropVnodeMsg); + action.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN; + if (mndTransAppendUndoAction(pTrans, &action) != 0) { free(pMsg); return -1; } @@ -344,14 +353,14 @@ static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) { SDbObj dbObj = {0}; - tstrncpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN); - tstrncpy(dbObj.acct, pUser->acct, TSDB_USER_LEN); + memcpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN); + memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN); dbObj.createdTime = taosGetTimestampMs(); dbObj.updateTime = dbObj.createdTime; dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN); - dbObj.hashMethod = 1; dbObj.cfgVersion = 1; dbObj.vgVersion = 1; + dbObj.hashMethod = 1; dbObj.cfg = (SDbCfg){.numOfVgroups = pCreate->numOfVgroups, .cacheBlockSize = pCreate->cacheBlockSize, .totalBlocks = pCreate->totalBlocks, @@ -359,8 +368,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat .daysToKeep0 = pCreate->daysToKeep0, .daysToKeep1 = pCreate->daysToKeep1, .daysToKeep2 = pCreate->daysToKeep2, - .minRows = pCreate->minRowsPerFileBlock, - .maxRows = pCreate->maxRowsPerFileBlock, + .minRows = pCreate->minRows, + .maxRows = pCreate->maxRows, .fsyncPeriod = pCreate->fsyncPeriod, .commitTime = pCreate->commitTime, .precision = pCreate->precision, @@ -396,29 +405,30 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat mError("db:%s, failed to create since %s", pCreate->db, terrstr()); goto CREATE_DB_OVER; } + mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); - if (mndSetRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } - if (mndSetUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } - if (mndSetCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } - if (mndSetRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } - if (mndSetUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { + if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER; } @@ -447,8 +457,8 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) { pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0); pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1); pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2); - pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); - pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); + pCreate->minRows = htonl(pCreate->minRows); + pCreate->maxRows = htonl(pCreate->maxRows); pCreate->commitTime = htonl(pCreate->commitTime); pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod); @@ -456,7 +466,7 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) { SDbObj *pDb = mndAcquireDb(pMnode, pCreate->db); if (pDb != NULL) { - sdbRelease(pMnode->pSdb, pDb); + mndReleaseDb(pMnode, pDb); if (pCreate->ignoreExist) { mDebug("db:%s, already exist, ignore exist is set", pCreate->db); return 0; @@ -485,57 +495,77 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) { } static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) { - bool changed = false; + terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED; if (pAlter->totalBlocks >= 0 && pAlter->totalBlocks != pDb->cfg.totalBlocks) { pDb->cfg.totalBlocks = pAlter->totalBlocks; - changed = true; + terrno = 0; } if (pAlter->daysToKeep0 >= 0 && pAlter->daysToKeep0 != pDb->cfg.daysToKeep0) { pDb->cfg.daysToKeep0 = pAlter->daysToKeep0; - changed = true; + terrno = 0; } if (pAlter->daysToKeep1 >= 0 && pAlter->daysToKeep1 != pDb->cfg.daysToKeep1) { pDb->cfg.daysToKeep1 = pAlter->daysToKeep1; - changed = true; + terrno = 0; } if (pAlter->daysToKeep2 >= 0 && pAlter->daysToKeep2 != pDb->cfg.daysToKeep2) { pDb->cfg.daysToKeep2 = pAlter->daysToKeep2; - changed = true; + terrno = 0; } if (pAlter->fsyncPeriod >= 0 && pAlter->fsyncPeriod != pDb->cfg.fsyncPeriod) { pDb->cfg.fsyncPeriod = pAlter->fsyncPeriod; - changed = true; + terrno = 0; } if (pAlter->walLevel >= 0 && pAlter->walLevel != pDb->cfg.walLevel) { pDb->cfg.walLevel = pAlter->walLevel; - changed = true; + terrno = 0; } if (pAlter->quorum >= 0 && pAlter->quorum != pDb->cfg.quorum) { pDb->cfg.quorum = pAlter->quorum; - changed = true; + terrno = 0; } if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) { pDb->cfg.cacheLastRow = pAlter->cacheLastRow; - changed = true; + terrno = 0; } - if (!changed) { - terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED; - return -1; - } + return terrno; +} + +static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { + SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1; return 0; } +static int32_t mndSetUpdateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { + SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + +static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + +static int32_t mndSetUpdateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); @@ -544,30 +574,41 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO mDebug("trans:%d, used to update db:%s", pTrans->id, pOldDb->name); - SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); - SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetUpdateDbUndoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; + } + + if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; + } + + if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; + } + + if (mndSetUpdateDbUndoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto UPDATE_DB_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto UPDATE_DB_OVER; } + code = 0; + +UPDATE_DB_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) { @@ -610,46 +651,82 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pRedoRaw = mndDbActionEncode(pDb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pUndoRaw = mndDbActionEncode(pDb); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pCommitRaw = mndDbActionEncode(pDb); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } + +static int32_t mndSetDropDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } + static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to drop since %s", pDb->name, terrstr()); return -1; } + mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); - SSdbRaw *pRedoRaw = mndDbActionEncode(pDb); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); - SSdbRaw *pUndoRaw = mndDbActionEncode(pDb); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropDbUndoLogs(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); - SSdbRaw *pCommitRaw = mndDbActionEncode(pDb); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; + } + + if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; + } + + if (mndSetDropDbUndoActions(pMnode, pTrans, pDb) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_DB_OVER; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto DROP_DB_OVER; } + code = 0; + +DROP_DB_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) { @@ -751,29 +828,27 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SSyncDbMsg *pSync = pMsg->rpcMsg.pCont; - - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); if (pDb == NULL) { mError("db:%s, failed to process sync db msg since %s", pMsg->db, terrstr()); return -1; - } else { - mndReleaseDb(pMnode, pDb); - return 0; } + + mndReleaseDb(pMnode, pDb); + return 0; } static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont; - - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); if (pDb == NULL) { mError("db:%s, failed to process compact db msg since %s", pMsg->db, terrstr()); return -1; - } else { - mndReleaseDb(pMnode, pDb); - return 0; } + + mndReleaseDb(pMnode, pDb); + return 0; } static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 493f20bc9af3319521fd602e2ed8e6648305ca22..9751771773c53fa55ec965a1fed8be02013f7588 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -18,10 +18,10 @@ #include "mndMnode.h" #include "mndShow.h" #include "mndTrans.h" -#include "ttime.h" #include "tep.h" +#include "ttime.h" -#define TSDB_DNODE_VER 1 +#define TSDB_DNODE_VER_NUMBER 1 #define TSDB_DNODE_RESERVE_SIZE 64 #define TSDB_CONFIG_OPTION_LEN 16 #define TSDB_CONIIG_VALUE_LEN 48 @@ -101,14 +101,14 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj); if (pRaw == NULL) return -1; - sdbSetRawStatus(pRaw, SDB_STATUS_READY); + if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1; mDebug("dnode:%d, will be created while deploy sdb", dnodeObj.id); return sdbWrite(pMnode->pSdb, pRaw); } static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE); + SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -127,7 +127,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; - if (sver != TSDB_DNODE_VER) { + if (sver != TSDB_DNODE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; mError("failed to decode dnode since %s", terrstr()); return NULL; @@ -150,21 +150,8 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) { mTrace("dnode:%d, perform insert action", pDnode->id); - - pDnode->rebootTime = 0; - pDnode->lastAccessTime = 0; - pDnode->accessTimes = 0; - pDnode->numOfMnodes = 0; - pDnode->numOfVnodes = 0; - pDnode->numOfQnodes = 0; - pDnode->numOfSupportMnodes = 0; - pDnode->numOfSupportVnodes = 0; - pDnode->numOfSupportQnodes = 0; - pDnode->numOfCores = 0; - pDnode->status = DND_STATUS_OFFLINE; pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED; snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port); - return 0; } @@ -225,7 +212,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) { int64_t ms = taosGetTimestampMs(); int64_t interval = ABS(pDnode->lastAccessTime - ms); - if (interval > 3000 * pMnode->cfg.statusInterval) { + if (interval > 3500 * pMnode->cfg.statusInterval) { return false; } return true; @@ -267,12 +254,9 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { return DND_REASON_STATUS_INTERVAL_NOT_MATCH; } - int64_t checkTime = 0; - char timestr[32] = "1970-01-01 00:00:00.00"; - (void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - if ((0 != strcasecmp(pCfg->timezone, pMnode->cfg.timezone)) && (checkTime != pCfg->checkTime)) { + if ((0 != strcasecmp(pCfg->timezone, pMnode->cfg.timezone)) && (pMnode->checkTime != pCfg->checkTime)) { mError("timezone [%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, pMnode->cfg.timezone, - pCfg->checkTime, checkTime); + pCfg->checkTime, pMnode->checkTime); return DND_REASON_TIME_ZONE_NOT_MATCH; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5caec6c78dc59bd061e96770c7817510c43a0f6f..847f6bcd8bf55b99416febeac8abc651ed66a16d 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -21,13 +21,6 @@ #define TSDB_TRN_ARRAY_SIZE 8 #define TSDB_TRN_RESERVE_SIZE 64 -typedef struct { - SEpSet epSet; - int8_t msgType; - int32_t contLen; - void *pCont; -} STransAction; - static SSdbRaw *mndTransActionEncode(STrans *pTrans); static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); @@ -37,11 +30,11 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle); static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); -static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); +static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); -static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray); +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); @@ -343,10 +336,8 @@ char *mndTransStageStr(ETrnStage stage) { return "commit"; case TRN_STAGE_ROLLBACK: return "rollback"; - case TRN_STAGE_RETRY: - return "retry"; case TRN_STAGE_OVER: - return "stop"; + return "over"; default: return "undefined"; } @@ -388,7 +379,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { return NULL; } - mDebug("trans:%d, data:%p is created", pTrans->id, pTrans); + mDebug("trans:%d, is created", pTrans->id); return pTrans; } @@ -417,7 +408,7 @@ void mndTransDrop(STrans *pTrans) { mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); - mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans); + // mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); tfree(pTrans); } @@ -459,10 +450,8 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return code; } -static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { - STransAction action = {.epSet = *pEpSet, .msgType = msgType, .contLen = contLen, .pCont = pCont}; - - void *ptr = taosArrayPush(pArray, &action); +static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { + void *ptr = taosArrayPush(pArray, pAction); if (ptr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -471,16 +460,12 @@ static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgTy return 0; } -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { - int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, msgType, contLen, pCont); - mTrace("trans:%d, msg:%s len:%d append to redo actions", pTrans->id, taosMsg[msgType], contLen); - return code; +int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { + return mndTransAppendAction(pTrans->redoActions, pAction); } -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { - int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, msgType, contLen, pCont); - mTrace("trans:%d, msg:%s len:%d append to undo actions", pTrans->id, taosMsg[msgType], contLen); - return code; +int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { + return mndTransAppendAction(pTrans->undoActions, pAction); } int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { @@ -493,7 +478,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { } sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mTrace("trans:%d, start sync", pTrans->id); + mTrace("trans:%d, sync to other nodes", pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); @@ -533,7 +518,7 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); if (taosArrayGetSize(pTrans->commitLogs) != 0) { - mTrace("trans:%d, start sync", pTrans->id); + mTrace("trans:%d, sync to other nodes", pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); @@ -563,7 +548,7 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { } sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - mTrace("trans:%d, start sync", pTrans->id); + mTrace("trans:%d, sync to other nodes", pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); @@ -596,6 +581,50 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) // todo } +void mndTransHandleActionRsp(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + int64_t sig = (int64_t)(pMsg->rpcMsg.ahandle); + int32_t transId = (int32_t)(sig >> 32); + int32_t action = (int32_t)((sig << 32) >> 32); + + STrans *pTrans = mndAcquireTrans(pMnode, transId); + if (pTrans == NULL) { + mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr()); + goto HANDLE_ACTION_RSP_OVER; + } + + SArray *pArray = NULL; + if (pTrans->stage == TRN_STAGE_EXECUTE) { + pArray = pTrans->redoActions; + } else if (pTrans->stage == TRN_STAGE_ROLLBACK) { + pArray = pTrans->undoActions; + } else { + } + + if (pArray == NULL) { + mError("trans:%d, invalid trans stage:%s", transId, mndTransStageStr(pTrans->stage)); + goto HANDLE_ACTION_RSP_OVER; + } + + int32_t actionNum = taosArrayGetSize(pTrans->redoActions); + if (action < 0 || action > actionNum) { + mError("trans:%d, invalid action:%d", transId, action); + goto HANDLE_ACTION_RSP_OVER; + } + + STransAction *pAction = taosArrayGet(pArray, action); + if (pAction != NULL) { + pAction->msgReceived = 1; + pAction->errCode = pMsg->code; + } + + mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code); + mndTransExecute(pMnode, pTrans); + +HANDLE_ACTION_RSP_OVER: + mndReleaseTrans(pMnode, pTrans); +} + static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { SSdb *pSdb = pMnode->pSdb; int32_t arraySize = taosArrayGetSize(pArray); @@ -618,7 +647,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { if (code != 0) { mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr()) } else { - mTrace("trans:%d, execute redo logs finished", pTrans->id) + mDebug("trans:%d, execute redo logs finished", pTrans->id) } } @@ -632,7 +661,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { if (code != 0) { mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr()) } else { - mTrace("trans:%d, execute undo logs finished", pTrans->id) + mDebug("trans:%d, execute undo logs finished", pTrans->id) } } @@ -646,47 +675,70 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { if (code != 0) { mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr()) } else { - mTrace("trans:%d, execute commit logs finished", pTrans->id) + mDebug("trans:%d, execute commit logs finished", pTrans->id) } } return code; } -static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) { -#if 0 - int32_t arraySize = taosArrayGetSize(pArray); - for (int32_t i = 0; i < arraySize; ++i) { - STransAction *pAction = taosArrayGet(pArray, i); +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { + int32_t numOfActions = taosArrayGetSize(pArray); + if (numOfActions == 0) return 0; + + for (int32_t action = 0; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pArray, action); + if (pAction == NULL) continue; + if (pAction->msgSent) continue; + + int64_t signature = pTrans->id; + signature = (signature << 32); + signature += action; - SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen}; + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .ahandle = (void *)signature}; rpcMsg.pCont = rpcMallocCont(pAction->contLen); if (rpcMsg.pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + + pAction->msgSent = 1; + pAction->msgReceived = 0; + pAction->errCode = 0; + + mDebug("trans:%d, action:%d is sent", pTrans->id, action); mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); } - return TSDB_CODE_MND_ACTION_IN_PROGRESS; -#else - return 0; -#endif + int32_t numOfReceivedMsgs = 0; + int32_t errorCode = 0; + for (int32_t action = 0; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pArray, action); + if (pAction == NULL) continue; + if (pAction->msgSent && pAction->msgReceived) { + numOfReceivedMsgs++; + if (pAction->errCode != 0) { + errorCode = pAction->errCode; + } + } + } + + if (numOfReceivedMsgs == numOfActions) { + mDebug("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errorCode); + terrno = errorCode; + return errorCode; + } else { + return TSDB_CODE_MND_ACTION_IN_PROGRESS; + } } static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - if (taosArrayGetSize(pTrans->redoActions) <= 0) return 0; - - mTrace("trans:%d, start to execute redo actions", pTrans->id); - return mndTransExecuteActions(pMnode, pTrans->redoActions); + return mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); } static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { - if (taosArrayGetSize(pTrans->undoActions) <= 0) return 0; - - mTrace("trans:%d, start to execute undo actions", pTrans->id); - return mndTransExecuteActions(pMnode, pTrans->undoActions); + return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); } static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { @@ -694,7 +746,7 @@ static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { if (code == 0) { pTrans->stage = TRN_STAGE_EXECUTE; - mTrace("trans:%d, stage from prepare to execute", pTrans->id); + mDebug("trans:%d, stage from prepare to execute", pTrans->id); } else { pTrans->stage = TRN_STAGE_ROLLBACK; mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr()); @@ -708,17 +760,17 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { if (code == 0) { pTrans->stage = TRN_STAGE_COMMIT; - mTrace("trans:%d, stage from execute to commit", pTrans->id); + mDebug("trans:%d, stage from execute to commit", pTrans->id); } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mTrace("trans:%d, stage keep on execute since %s", pTrans->id, terrstr(code)); + mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code)); return code; } else { if (pTrans->policy == TRN_POLICY_ROLLBACK) { pTrans->stage = TRN_STAGE_ROLLBACK; mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr()); } else { - pTrans->stage = TRN_STAGE_RETRY; - mError("trans:%d, stage from execute to retry since %s", pTrans->id, terrstr()); + pTrans->stage = TRN_STAGE_EXECUTE; + mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr()); } } @@ -726,29 +778,16 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { } static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); - - if (code == 0) { - pTrans->stage = TRN_STAGE_OVER; - mTrace("trans:%d, commit stage finished", pTrans->id); - } else { - if (pTrans->policy == TRN_POLICY_ROLLBACK) { - pTrans->stage = TRN_STAGE_ROLLBACK; - mError("trans:%d, stage from commit to rollback since %s", pTrans->id, terrstr()); - } else { - pTrans->stage = TRN_STAGE_RETRY; - mError("trans:%d, stage from commit to retry since %s", pTrans->id, terrstr()); - } - } - - return code; + mndTransExecuteCommitLogs(pMnode, pTrans); + pTrans->stage = TRN_STAGE_OVER; + return 0; } static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); if (code == 0) { - mTrace("trans:%d, rollbacked", pTrans->id); + mDebug("trans:%d, rollbacked", pTrans->id); } else { pTrans->stage = TRN_STAGE_ROLLBACK; mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); @@ -757,20 +796,6 @@ static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { return code; } -static int32_t mndTransPerformRetryStage(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); - - if (code == 0) { - pTrans->stage = TRN_STAGE_COMMIT; - mTrace("trans:%d, stage from retry to commit", pTrans->id); - } else { - pTrans->stage = TRN_STAGE_RETRY; - mError("trans:%d, stage keep on retry since %s", pTrans->id, terrstr()); - } - - return code; -} - static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; @@ -785,7 +810,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { case TRN_STAGE_COMMIT: code = mndTransCommit(pMnode, pTrans); if (code == 0) { - code = mndTransPerformCommitStage(pMnode, pTrans); + mndTransPerformCommitStage(pMnode, pTrans); } break; case TRN_STAGE_ROLLBACK: @@ -794,9 +819,6 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { code = mndTransRollback(pMnode, pTrans); } break; - case TRN_STAGE_RETRY: - code = mndTransPerformRetryStage(pMnode, pTrans); - break; default: mndTransSendRpcRsp(pTrans, 0); return; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e99fea200b944639aca3fb5bcb33623033c7915d..eeca8c9546e22f8f83962e833487e238385b06eb 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -21,7 +21,7 @@ #include "mndShow.h" #include "mndTrans.h" -#define TSDB_VGROUP_VER_NUM 1 +#define TSDB_VGROUP_VER_NUMBER 1 #define TSDB_VGROUP_RESERVE_SIZE 64 static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); @@ -70,7 +70,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { void mndCleanupVgroup(SMnode *pMnode) {} SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUM, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE); + SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -98,7 +98,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; - if (sver != TSDB_VGROUP_VER_NUM) { + if (sver != TSDB_VGROUP_VER_NUMBER) { mError("failed to decode vgroup since %s", terrstr()); terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; @@ -142,14 +142,20 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNe mTrace("vgId:%d, perform update action", pOldVgroup->vgId); pOldVgroup->updateTime = pNewVgroup->updateTime; pOldVgroup->version = pNewVgroup->version; + pOldVgroup->hashBegin = pNewVgroup->hashBegin; + pOldVgroup->hashEnd = pNewVgroup->hashEnd; pOldVgroup->replica = pNewVgroup->replica; memcpy(pOldVgroup->vnodeGid, pNewVgroup->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid)); return 0; } SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_VGROUP, &vgId); + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId); + if (pVgroup == NULL) { + terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; + } + return pVgroup; } void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { @@ -158,16 +164,17 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { } SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SCreateVnodeMsg *pCreate = malloc(sizeof(SCreateVnodeMsg)); + SCreateVnodeMsg *pCreate = calloc(1, sizeof(SCreateVnodeMsg)); if (pCreate == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pCreate->dnodeId = htonl(pDnode->id); pCreate->vgId = htonl(pVgroup->vgId); + pCreate->dnodeId = htonl(pDnode->id); memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN); pCreate->dbUid = htobe64(pDb->uid); + pCreate->vgVersion = htonl(pVgroup->version); pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks); pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile); @@ -193,7 +200,6 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pVgidDnode == NULL) { free(pCreate); - terrno = TSDB_CODE_MND_APP_ERROR; return NULL; } @@ -217,7 +223,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb } SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SDropVnodeMsg *pDrop = malloc(sizeof(SDropVnodeMsg)); + SDropVnodeMsg *pDrop = calloc(1, sizeof(SDropVnodeMsg)); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -269,7 +275,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { return -1; } - int32_t alloceVgroups = 0; + int32_t allocedVgroups = 0; int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); uint32_t hashMin = 0; uint32_t hashMax = UINT32_MAX; @@ -281,7 +287,6 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { pVgroup->createdTime = taosGetTimestampMs(); pVgroup->updateTime = pVgroups->createdTime; pVgroup->version = 1; - pVgroup->dbUid = pDb->uid; pVgroup->hashBegin = hashMin + hashInterval * v; if (v == pDb->cfg.numOfVgroups - 1) { pVgroup->hashEnd = hashMax; @@ -290,6 +295,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { } memcpy(pVgroup->dbName, pDb->name, TSDB_FULL_DB_NAME_LEN); + pVgroup->dbUid = pDb->uid; pVgroup->replica = pDb->cfg.replications; if (mndGetAvailableDnode(pMnode, pVgroup) != 0) { @@ -298,22 +304,25 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { return -1; } - alloceVgroups++; + allocedVgroups++; } *ppVgroups = pVgroups; return 0; } -static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) { - SSdb *pSdb = pMnode->pSdb; - + SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -329,7 +338,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (strcmp(pVgroup->dbName, dbName) == 0) { + if (pVgroup->dbUid == pDb->uid) { replica = MAX(replica, pVgroup->replica); numOfVgroups++; } @@ -441,11 +450,25 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) { } static int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { - if (dnodeId == 0) { - return 0; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfVnodes = 0; + void *pIter = NULL; + + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + for (int32_t v = 0; v < pVgroup->replica; ++v) { + if (pVgroup->vnodeGid[v].dnodeId == dnodeId) { + numOfVnodes++; + } + } + + sdbRelease(pSdb, pVgroup); } - return 0; + return numOfVnodes; } static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 24155a4e5329f73fc950e25ef23b942b55a11bf0..fb0b95dc4a696f9a7b2751f3735d73d059838c60 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -225,7 +225,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { } return 0; -} +} SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { mDebug("start to open mnode in %s", path); @@ -237,6 +237,9 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } + char timestr[24] = "1970-01-01 00:00:00.00"; + (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep)); if (pMnode->pSteps == NULL) { free(pMnode); diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index e37559808e2b5242354be6187c07fc49a732a940..5a0020199f0652376de73ec5c243301b4ac2371f 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -27,12 +27,12 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { pRaw->sver = sver; pRaw->dataLen = dataLen; - mTrace("raw:%p, is created, len:%d", pRaw, dataLen); + // mTrace("raw:%p, is created, len:%d", pRaw, dataLen); return pRaw; } void sdbFreeRaw(SSdbRaw *pRaw) { - mTrace("raw:%p, is freed", pRaw); + // mTrace("raw:%p, is freed", pRaw); free(pRaw); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index e14c58d4122f1f830bcac17c2c358ef6632ef15c..29c80ae9ec05453dd23605c39d43fa562c2aa2d9 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -61,8 +61,8 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { pReq->daysToKeep0 = htonl(3650); pReq->daysToKeep1 = htonl(3650); pReq->daysToKeep2 = htonl(3650); - pReq->minRowsPerFileBlock = htonl(100); - pReq->maxRowsPerFileBlock = htonl(4096); + pReq->minRows = htonl(100); + pReq->maxRows = htonl(4096); pReq->commitTime = htonl(3600); pReq->fsyncPeriod = htonl(3000); pReq->walLevel = 1; diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 3f9d86737fb1fb76e2b2869bcb4cb9cf521e5c96..6a752cc195462479f6d480d3f5856bb4e9ff7854 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -134,8 +134,8 @@ static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) { pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks); pMsg->daysPerFile = htonl(pCreateDb->daysPerFile); pMsg->commitTime = htonl((int32_t)pCreateDb->commitTime); - pMsg->minRowsPerFileBlock = htonl(pCreateDb->minRowsPerBlock); - pMsg->maxRowsPerFileBlock = htonl(pCreateDb->maxRowsPerBlock); + pMsg->minRows = htonl(pCreateDb->minRowsPerBlock); + pMsg->maxRows = htonl(pCreateDb->maxRowsPerBlock); pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod); pMsg->compression = pCreateDb->compressionLevel; pMsg->walLevel = (char)pCreateDb->walLevel; diff --git a/source/util/src/tthread.c b/source/util/src/tthread.c index 5ed7fb5aa00d1a5c5c3fffae2de46dbea57e011f..44fce1c882c4f0d7d66f36d378db1594e59f19ea 100644 --- a/source/util/src/tthread.c +++ b/source/util/src/tthread.c @@ -13,16 +13,16 @@ * along with this program. If not, see . */ -#include "os.h" #include "tthread.h" +#include "os.h" +#include "taoserror.h" #include "tdef.h" #include "tutil.h" #include "ulog.h" -#include "taoserror.h" // create new thread -pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) { - pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t)); +pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param) { + pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t)); pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -36,26 +36,24 @@ pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) { return pthread; } -// destory thread +// destory thread bool taosDestoryThread(pthread_t* pthread) { - if(pthread == NULL) return false; - if(taosThreadRunning(pthread)) { + if (pthread == NULL) return false; + if (taosThreadRunning(pthread)) { pthread_cancel(*pthread); pthread_join(*pthread, NULL); } - + free(pthread); return true; } // thread running return true bool taosThreadRunning(pthread_t* pthread) { - if(pthread == NULL) return false; + if (pthread == NULL) return false; int ret = pthread_kill(*pthread, 0); - if(ret == ESRCH) - return false; - if(ret == EINVAL) - return false; + if (ret == ESRCH) return false; + if (ret == EINVAL) return false; // alive return true; }