未验证 提交 bddfff72 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9225 from taosdata/feature/dnode3

create vnode in dnode
......@@ -591,8 +591,8 @@ typedef struct {
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t minRows;
int32_t maxRows;
int32_t commitTime;
int32_t fsyncPeriod;
int8_t walLevel;
......@@ -706,7 +706,7 @@ typedef struct {
SVnodeLoad data[];
} SVnodeLoads;
typedef struct SStatusMsg {
typedef struct {
int32_t sver;
int32_t dnodeId;
int32_t clusterId;
......@@ -756,6 +756,7 @@ typedef struct {
int32_t dnodeId;
char db[TSDB_FULL_DB_NAME_LEN];
uint64_t dbUid;
int32_t vgVersion;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
......
......@@ -197,8 +197,8 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type);
// int32_t daysToKeep0;
// int32_t daysToKeep1;
// int32_t daysToKeep2;
// int32_t minRowsPerFileBlock;
// int32_t maxRowsPerFileBlock;
// int32_t minRows;
// int32_t maxRows;
// int8_t precision; // time resolution
// int8_t compression;
// int8_t cacheLastRow;
......
......@@ -24,8 +24,8 @@ extern "C" {
#include "tdef.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param);
// destory thread
pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param);
// destory thread
bool taosDestoryThread(pthread_t* pthread);
// thread running return true
bool taosThreadRunning(pthread_t* pthread);
......
......@@ -17,11 +17,23 @@
#include "dndVnodes.h"
#include "dndTransport.h"
typedef struct {
int32_t vgId;
int32_t vgVersion;
int8_t dropped;
uint64_t dbUid;
char db[TSDB_FULL_DB_NAME_LEN];
char path[PATH_MAX + 20];
} SWrapperCfg;
typedef struct {
int32_t vgId;
int32_t refCount;
int32_t vgVersion;
int8_t dropped;
int8_t accessState;
uint64_t dbUid;
char *db;
char *path;
SVnode *pImpl;
taos_queue pWriteQ;
......@@ -32,13 +44,13 @@ typedef struct {
} SVnodeObj;
typedef struct {
int32_t vnodeNum;
int32_t opened;
int32_t failed;
int32_t threadIndex;
pthread_t *pThreadId;
SVnodeObj *pVnodes;
SDnode * pDnode;
int32_t vnodeNum;
int32_t opened;
int32_t failed;
int32_t threadIndex;
pthread_t thread;
SDnode *pDnode;
SWrapperCfg *pCfgs;
} SVnodeThread;
static int32_t dndInitVnodeReadWorker(SDnode *pDnode);
......@@ -73,16 +85,14 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg);
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl);
static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes);
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
static int32_t dndWriteVnodesToFile(SDnode *pDnode);
static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg);
static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndOpenVnodes(SDnode *pDnode);
static void dndCloseVnodes(SDnode *pDnode);
......@@ -126,22 +136,25 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) {
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj));
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pVnode->vgId = vgId;
pVnode->refCount = 0;
pVnode->vgId = pCfg->vgId;
pVnode->refCount = 1;
pVnode->dropped = 0;
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl;
pVnode->vgVersion = pCfg->vgVersion;
pVnode->dbUid = pCfg->dbUid;
pVnode->db = tstrdup(pCfg->db);
pVnode->path = tstrdup(pCfg->path);
pVnode->path = tstrdup(path);
if (pVnode->path == NULL) {
if (pVnode->path == NULL || pVnode->db == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -167,7 +180,7 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, S
}
taosWLockLatch(&pMgmt->latch);
int32_t code = taosHashPut(pMgmt->hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosWUnLockLatch(&pMgmt->latch);
if (code != 0) {
......@@ -176,7 +189,7 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, S
return code;
}
static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode) {
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
taosWLockLatch(&pMgmt->latch);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
......@@ -195,6 +208,9 @@ static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode) {
dndFreeVnodeWriteQueue(pDnode, pVnode);
dndFreeVnodeApplyQueue(pDnode, pVnode);
dndFreeVnodeSyncQueue(pDnode, pVnode);
free(pVnode->path);
free(pVnode->db);
free(pVnode);
}
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
......@@ -208,16 +224,16 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
SVnodeObj * pVnode = *ppVnode;
if (pVnode) {
SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
pVnodes[num] = (*ppVnode);
num++;
if (num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
pVnodes[num] = (*ppVnode);
}
pIter = taosHashIterate(pMgmt->hash, pIter);
} else {
taosHashCancelIterate(pMgmt->hash, pIter);
}
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
......@@ -226,15 +242,15 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
return pVnodes;
}
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes) {
int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
char file[PATH_MAX + 20] = {0};
SVnodeObj *pVnodes = NULL;
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
char file[PATH_MAX + 20] = {0};
SWrapperCfg *pCfgs = NULL;
snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);
......@@ -270,31 +286,55 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_
goto PRASE_VNODE_OVER;
}
pVnodes = calloc(vnodesNum, sizeof(SVnodeObj));
if (pVnodes == NULL) {
pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
if (pCfgs == NULL) {
dError("failed to read %s since out of memory", file);
goto PRASE_VNODE_OVER;
}
for (int32_t i = 0; i < vnodesNum; ++i) {
cJSON * vnode = cJSON_GetArrayItem(vnodes, i);
SVnodeObj *pVnode = &pVnodes[i];
cJSON *vnode = cJSON_GetArrayItem(vnodes, i);
SWrapperCfg *pCfg = &pCfgs[i];
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
if (!vgId || vgId->type != cJSON_String) {
if (!vgId || vgId->type != cJSON_Number) {
dError("failed to read %s since vgId not found", file);
goto PRASE_VNODE_OVER;
}
pVnode->vgId = atoi(vgId->valuestring);
pCfg->vgId = vgId->valueint;
snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);
cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
if (!dropped || dropped->type != cJSON_String) {
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto PRASE_VNODE_OVER;
}
pVnode->dropped = atoi(vnode->valuestring);
pCfg->dropped = dropped->valueint;
cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
if (!vgVersion || vgVersion->type != cJSON_Number) {
dError("failed to read %s since vgVersion not found", file);
goto PRASE_VNODE_OVER;
}
pCfg->vgVersion = vgVersion->valueint;
cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
if (!dbUid || dbUid->type != cJSON_String) {
dError("failed to read %s since dbUid not found", file);
goto PRASE_VNODE_OVER;
}
pCfg->dbUid = atoll(dbUid->valuestring);
cJSON *db = cJSON_GetObjectItem(vnode, "db");
if (!db || db->type != cJSON_String) {
dError("failed to read %s since db not found", file);
goto PRASE_VNODE_OVER;
}
tstrncpy(pCfg->db, db->valuestring, TSDB_FULL_DB_NAME_LEN);
}
*ppCfgs = pCfgs;
*numOfVnodes = vnodesNum;
code = 0;
dInfo("succcessed to read file %s", file);
......@@ -313,30 +353,35 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);
FILE *fp = fopen(file, "w");
if (fp != NULL) {
if (fp == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write %s since %s", file, terrstr());
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
int32_t len = 0;
int32_t maxLen = 65536;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n");
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped);
len += snprintf(content + len, maxLen - len, " {\n");
len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped);
len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d,\n", pVnode->vgVersion);
len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db);
if (i < numOfVnodes - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
len += snprintf(content + len, maxLen - len, " },\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
len += snprintf(content + len, maxLen - len, " }\n");
}
}
len += snprintf(content + len, maxLen - len, " ]\n");
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
......@@ -358,74 +403,29 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
return taosRenameFile(file, realfile);
}
static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) {
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId);
// SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
SVnode *pImpl = vnodeOpen(path, NULL);
if (pImpl == NULL) {
return -1;
}
int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
return 0;
}
static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) {
pVnode->dropped = 1;
if (dndWriteVnodesToFile(pDnode) != 0) {
pVnode->dropped = 0;
return -1;
}
dndDropVnodeWrapper(pDnode, pVnode);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);
return 0;
}
static void *dnodeOpenVnodeFunc(void *param) {
SVnodeThread *pThread = param;
SDnode * pDnode = pThread->pDnode;
SVnodesMgmt * pMgmt = &pDnode->vmgmt;
SDnode *pDnode = pThread->pDnode;
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SVnodeObj *pVnode = &pThread->pVnodes[v];
SWrapperCfg *pCfg = &pThread->pCfgs[v];
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId,
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc);
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, pVnode->vgId);
SVnode *pImpl = vnodeOpen(path, NULL);
SVnode *pImpl = vnodeOpen(pCfg->path, NULL);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++;
} else {
dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex);
dndOpenVnode(pDnode, pCfg, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++;
}
......@@ -448,9 +448,9 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
return -1;
}
SVnodeObj *pVnodes = NULL;
int32_t numOfVnodes = 0;
if (dndGetVnodesFromFile(pDnode, &pVnodes, &numOfVnodes) != 0) {
SWrapperCfg *pCfgs = NULL;
int32_t numOfVnodes = 0;
if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) {
dInfo("failed to get vnode list from disk since %s", terrstr());
return -1;
}
......@@ -463,13 +463,14 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj));
threads[t].pDnode = pDnode;
threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SVnodeThread *pThread = &threads[t];
pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v];
pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
}
dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);
......@@ -478,19 +479,25 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pThread->pThreadId = taosCreateThread(dnodeOpenVnodeFunc, pThread);
if (pThread->pThreadId == NULL) {
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
pthread_attr_destroy(&thAttr);
}
for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t];
taosDestoryThread(pThread->pThreadId);
pThread->pThreadId = NULL;
free(pThread->pVnodes);
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
pthread_join(pThread->thread, NULL);
}
free(pThread->pCfgs);
}
free(threads);
free(pCfgs);
if (pMgmt->openVnodes != pMgmt->totalVnodes) {
dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
......@@ -508,7 +515,8 @@ static void dndCloseVnodes(SDnode *pDnode) {
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
for (int32_t i = 0; i < numOfVnodes; ++i) {
dndDropVnodeWrapper(pDnode, pVnodes[i]);
dndReleaseVnode(pDnode, pVnodes[i]);
dndCloseVnode(pDnode, pVnodes[i]);
}
if (pVnodes != NULL) {
......@@ -523,11 +531,12 @@ static void dndCloseVnodes(SDnode *pDnode) {
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) {
static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->vgId = htonl(pCreate->vgId);
pCreate->dnodeId = htonl(pCreate->dnodeId);
pCreate->dbUid = htobe64(pCreate->dbUid);
pCreate->vgVersion = htonl(pCreate->vgVersion);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->totalBlocks = htonl(pCreate->totalBlocks);
pCreate->daysPerFile = htonl(pCreate->daysPerFile);
......@@ -544,9 +553,10 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg
pReplica->port = htons(pReplica->port);
}
*vgId = pCreate->vgId;
return pCreate;
}
#if 0
static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) {
pCfg->wsize = pCreate->cacheBlockSize;
pCfg->ssize = pCreate->cacheBlockSize;
pCfg->wsize = pCreate->cacheBlockSize;
......@@ -567,8 +577,15 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg
pCfg->walCfg.rollPeriod = 128;
pCfg->walCfg.segSize = 128;
pCfg->walCfg.vgId = pCreate->vgId;
#endif
return 0;
}
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) {
memcpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN);
pCfg->dbUid = pCreate->dbUid;
pCfg->dropped = 0;
snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
pCfg->vgId = pCreate->vgId;
pCfg->vgVersion = pCreate->vgVersion;
}
static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) {
......@@ -584,48 +601,83 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) {
}
static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg);
dDebug("vgId:%d, create vnode req is received", pCreate->vgId);
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
dndGenerateVnodeCfg(pCreate, &vnodeCfg);
dndParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
dDebug("vgId:%d, create vnode req is received", vgId);
SWrapperCfg wrapperCfg = {0};
dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", vgId);
dDebug("vgId:%d, already exist, return success", pCreate->vgId);
dndReleaseVnode(pDnode, pVnode);
return 0;
}
if (dndCreateVnode(pDnode, vgId, &vnodeCfg) != 0) {
dError("vgId:%d, failed to create vnode since %s", vgId, terrstr());
return terrno;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/);
if (pImpl == NULL) {
return -1;
}
int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(wrapperCfg.path);
terrno = code;
return code;
}
code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(wrapperCfg.path);
terrno = code;
return code;
}
return 0;
}
static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg);
dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
dndGenerateVnodeCfg(pAlter, &vnodeCfg);
dndParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
dDebug("vgId:%d, alter vnode req is received", vgId);
SWrapperCfg wrapperCfg = {0};
dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
if (pVnode == NULL) {
dDebug("vgId:%d, failed to alter vnode since %s", vgId, terrstr());
dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
return terrno;
}
if (wrapperCfg.vgVersion == pVnode->vgVersion) {
dndReleaseVnode(pDnode, pVnode);
dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
return 0;
}
if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
dError("vgId:%d, failed to alter vnode since %s", vgId, terrstr());
dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
dndReleaseVnode(pDnode, pVnode);
return terrno;
}
int32_t oldVersion = pVnode->vgVersion;
pVnode->vgVersion = wrapperCfg.vgVersion;
int32_t code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
pVnode->vgVersion = oldVersion;
}
dndReleaseVnode(pDnode, pVnode);
return 0;
return code;
}
static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
......@@ -637,15 +689,21 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) {
dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
return terrno;
return 0;
}
if (dndDropVnode(pDnode, pVnode) != 0) {
dError("vgId:%d, failed to drop vnode since %s", vgId, terrstr());
dndReleaseVnode(pDnode, pVnode);
pVnode->dropped = 1;
if (dndWriteVnodesToFile(pDnode) != 0) {
pVnode->dropped = 0;
return terrno;
}
dndReleaseVnode(pDnode, pVnode);
dndCloseVnode(pDnode, pVnode);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);
return 0;
}
......@@ -738,12 +796,10 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
break;
}
if (code != 0) {
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
......@@ -756,7 +812,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs);
SRpcMsg * pRpcMsg = NULL;
SRpcMsg *pRpcMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
......@@ -1029,7 +1085,7 @@ static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) {
}
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt * pMgmt = &pDnode->vmgmt;
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SMWorkerPool *pPool = &pMgmt->writePool;
pPool->name = "vnode-write";
pPool->max = pDnode->opt.numOfCores;
......@@ -1137,12 +1193,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
pLoads->num = taosHashGetSize(pMgmt->hash);
int32_t v = 0;
void * pIter = taosHashIterate(pMgmt->hash, NULL);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj * pVnode = *ppVnode;
SVnodeObj *pVnode = *ppVnode;
SVnodeLoad *pLoad = &pLoads->data[v++];
vnodeGetLoad(pVnode->pImpl, pLoad);
......
......@@ -176,6 +176,12 @@ SDnode *dndInit(SDnodeOpt *pOption) {
return NULL;
}
if (vnodeInit(1) != 0) {
dError("failed to init vnode env");
dndCleanup(pDnode);
return NULL;
}
if (dndInitDnode(pDnode) != 0) {
dError("failed to init dnode");
dndCleanup(pDnode);
......@@ -222,8 +228,10 @@ void dndCleanup(SDnode *pDnode) {
dndCleanupMnode(pDnode);
dndCleanupVnodes(pDnode);
dndCleanupDnode(pDnode);
vnodeClear();
walCleanUp();
rpcCleanup();
dndCleanupEnv(pDnode);
free(pDnode);
dInfo("TDengine is cleaned up successfully");
......
......@@ -210,8 +210,8 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRowsPerFileBlock = htonl(100);
pReq->maxRowsPerFileBlock = htonl(4096);
pReq->minRows = htonl(100);
pReq->maxRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
......@@ -375,8 +375,8 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRowsPerFileBlock = htonl(100);
pReq->maxRowsPerFileBlock = htonl(4096);
pReq->minRows = htonl(100);
pReq->maxRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
......
......@@ -187,8 +187,8 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRowsPerFileBlock = htonl(100);
pReq->maxRowsPerFileBlock = htonl(4096);
pReq->minRows = htonl(100);
pReq->maxRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
......
......@@ -16,7 +16,7 @@
#include "deploy.h"
void initLog(const char* path) {
dDebugFlag = 143;
dDebugFlag = 207;
vDebugFlag = 0;
mDebugFlag = 207;
cDebugFlag = 0;
......
......@@ -176,49 +176,112 @@ SServer* DndTestVgroup::pServer;
SClient* DndTestVgroup::pClient;
int32_t DndTestVgroup::connId;
TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
{
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg));
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9150);
for (int i = 0; i < 3; ++i) {
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg));
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(1);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9150);
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
taosMsleep(1000000);
{
for (int i = 0; i < 3; ++i) {
SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(sizeof(SAlterVnodeMsg));
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(2);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9150);
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
}
{
for (int i = 0; i < 3; ++i) {
SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(sizeof(SDropVnodeMsg));
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
}
}
......@@ -61,15 +61,14 @@ typedef enum {
} EAuthOp;
typedef enum {
TRN_STAGE_PREPARE = 1,
TRN_STAGE_EXECUTE = 2,
TRN_STAGE_PREPARE = 0,
TRN_STAGE_EXECUTE = 1,
TRN_STAGE_ROLLBACK = 2,
TRN_STAGE_COMMIT = 3,
TRN_STAGE_ROLLBACK = 4,
TRN_STAGE_RETRY = 5,
TRN_STAGE_OVER = 6,
TRN_STAGE_OVER = 4,
} ETrnStage;
typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy;
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
typedef enum {
DND_STATUS_OFFLINE = 0,
......
......@@ -70,6 +70,7 @@ typedef struct SMnode {
tmr_h timer;
char *path;
SMnodeCfg cfg;
int64_t checkTime;
SSdb *pSdb;
SDnode *pDnode;
SArray *pSteps;
......
......@@ -22,6 +22,16 @@
extern "C" {
#endif
typedef struct {
SEpSet epSet;
int8_t msgType;
int8_t msgSent;
int8_t msgReceived;
int32_t errCode;
int32_t contLen;
void *pCont;
} STransAction;
int32_t mndInitTrans(SMnode *pMnode);
void mndCleanupTrans(SMnode *pMnode);
......@@ -30,12 +40,15 @@ void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code);
char *mndTransStageStr(ETrnStage stage);
char *mndTransPolicyStr(ETrnPolicy policy);
void mndTransHandleActionRsp(SMnodeMsg *pMsg);
char *mndTransStageStr(ETrnStage stage);
char *mndTransPolicyStr(ETrnPolicy policy);
#ifdef __cplusplus
}
......
......@@ -16,13 +16,12 @@
#define _DEFAULT_SOURCE
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#define TSDB_DB_VER_NUM 1
#define TSDB_DB_VER_NUMBER 1
#define TSDB_DB_RESERVE_SIZE 64
static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
......@@ -66,7 +65,7 @@ int32_t mndInitDb(SMnode *pMnode) {
void mndCleanupDb(SMnode *pMnode) {}
static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_DB_VER_NUM, sizeof(SDbObj) + TSDB_DB_RESERVE_SIZE);
SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_DB_VER_NUMBER, sizeof(SDbObj) + TSDB_DB_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
......@@ -106,7 +105,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != TSDB_DB_VER_NUM) {
if (sver != TSDB_DB_VER_NUMBER) {
mError("failed to decode db since %s", terrstr());
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL;
......@@ -160,7 +159,7 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) {
mTrace("db:%s, perform update action", pOldDb->name);
pOldDb->updateTime = pNewDb->createdTime;
pOldDb->updateTime = pNewDb->updateTime;
pOldDb->cfgVersion = pNewDb->cfgVersion;
pOldDb->vgVersion = pNewDb->vgVersion;
memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg));
......@@ -168,8 +167,12 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) {
}
SDbObj *mndAcquireDb(SMnode *pMnode, char *db) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_DB, db);
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
}
return pDb;
}
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
......@@ -242,68 +245,74 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
}
static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING);
if (pDbRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1;
}
return 0;
}
static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
static int32_t mndSetCreateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED);
if (pDbRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED) != 0) return -1;
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED);
if (pVgRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED) != 0) return -1;
}
return 0;
}
static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
static int32_t mndSetCreateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_READY);
if (pDbRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_READY) != 0) return -1;
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
if (pVgRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1;
}
return 0;
}
static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SVgObj *pVgroup = pVgroups + v;
static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
SVgObj *pVgroup = pVgroups + vg;
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) {
return -1;
}
STransAction action = {0};
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SEpSet epset = mndGetDnodeEpset(pDnode);
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) {
return -1;
}
if (pMsg == NULL) return -1;
if (mndTransAppendRedoAction(pTrans, &epset, TSDB_MSG_TYPE_ALTER_VNODE_IN, sizeof(SCreateVnodeMsg), pMsg) != 0) {
action.pCont = pMsg;
action.contLen = sizeof(SCreateVnodeMsg);
action.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
return -1;
}
......@@ -313,26 +322,26 @@ static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV
return 0;
}
static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SVgObj *pVgroup = pVgroups + v;
static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
SVgObj *pVgroup = pVgroups + vg;
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) {
return -1;
}
STransAction action = {0};
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SEpSet epset = mndGetDnodeEpset(pDnode);
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) {
return -1;
}
if (pMsg == NULL) return -1;
if (mndTransAppendUndoAction(pTrans, &epset, TSDB_MSG_TYPE_DROP_VNODE_IN, sizeof(SDropVnodeMsg), pMsg) != 0) {
action.pCont = pMsg;
action.contLen = sizeof(SDropVnodeMsg);
action.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
return -1;
}
......@@ -344,14 +353,14 @@ static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
SDbObj dbObj = {0};
tstrncpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN);
tstrncpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
memcpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN);
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
dbObj.createdTime = taosGetTimestampMs();
dbObj.updateTime = dbObj.createdTime;
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN);
dbObj.hashMethod = 1;
dbObj.cfgVersion = 1;
dbObj.vgVersion = 1;
dbObj.hashMethod = 1;
dbObj.cfg = (SDbCfg){.numOfVgroups = pCreate->numOfVgroups,
.cacheBlockSize = pCreate->cacheBlockSize,
.totalBlocks = pCreate->totalBlocks,
......@@ -359,8 +368,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
.daysToKeep0 = pCreate->daysToKeep0,
.daysToKeep1 = pCreate->daysToKeep1,
.daysToKeep2 = pCreate->daysToKeep2,
.minRows = pCreate->minRowsPerFileBlock,
.maxRows = pCreate->maxRowsPerFileBlock,
.minRows = pCreate->minRows,
.maxRows = pCreate->maxRows,
.fsyncPeriod = pCreate->fsyncPeriod,
.commitTime = pCreate->commitTime,
.precision = pCreate->precision,
......@@ -396,29 +405,30 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
goto CREATE_DB_OVER;
}
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
if (mndSetRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
......@@ -447,8 +457,8 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) {
pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
pCreate->minRows = htonl(pCreate->minRows);
pCreate->maxRows = htonl(pCreate->maxRows);
pCreate->commitTime = htonl(pCreate->commitTime);
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
......@@ -456,7 +466,7 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) {
SDbObj *pDb = mndAcquireDb(pMnode, pCreate->db);
if (pDb != NULL) {
sdbRelease(pMnode->pSdb, pDb);
mndReleaseDb(pMnode, pDb);
if (pCreate->ignoreExist) {
mDebug("db:%s, already exist, ignore exist is set", pCreate->db);
return 0;
......@@ -485,57 +495,77 @@ static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) {
}
static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
bool changed = false;
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
if (pAlter->totalBlocks >= 0 && pAlter->totalBlocks != pDb->cfg.totalBlocks) {
pDb->cfg.totalBlocks = pAlter->totalBlocks;
changed = true;
terrno = 0;
}
if (pAlter->daysToKeep0 >= 0 && pAlter->daysToKeep0 != pDb->cfg.daysToKeep0) {
pDb->cfg.daysToKeep0 = pAlter->daysToKeep0;
changed = true;
terrno = 0;
}
if (pAlter->daysToKeep1 >= 0 && pAlter->daysToKeep1 != pDb->cfg.daysToKeep1) {
pDb->cfg.daysToKeep1 = pAlter->daysToKeep1;
changed = true;
terrno = 0;
}
if (pAlter->daysToKeep2 >= 0 && pAlter->daysToKeep2 != pDb->cfg.daysToKeep2) {
pDb->cfg.daysToKeep2 = pAlter->daysToKeep2;
changed = true;
terrno = 0;
}
if (pAlter->fsyncPeriod >= 0 && pAlter->fsyncPeriod != pDb->cfg.fsyncPeriod) {
pDb->cfg.fsyncPeriod = pAlter->fsyncPeriod;
changed = true;
terrno = 0;
}
if (pAlter->walLevel >= 0 && pAlter->walLevel != pDb->cfg.walLevel) {
pDb->cfg.walLevel = pAlter->walLevel;
changed = true;
terrno = 0;
}
if (pAlter->quorum >= 0 && pAlter->quorum != pDb->cfg.quorum) {
pDb->cfg.quorum = pAlter->quorum;
changed = true;
terrno = 0;
}
if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) {
pDb->cfg.cacheLastRow = pAlter->cacheLastRow;
changed = true;
terrno = 0;
}
if (!changed) {
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
return -1;
}
return terrno;
}
static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetUpdateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; }
static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; }
static int32_t mndSetUpdateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; }
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) {
mError("db:%s, failed to update since %s", pOldDb->name, terrstr());
......@@ -544,30 +574,41 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
mDebug("trans:%d, used to update db:%s", pTrans->id, pOldDb->name);
SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
if (mndSetUpdateDbUndoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
if (mndSetUpdateDbUndoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
goto UPDATE_DB_OVER;
}
code = 0;
UPDATE_DB_OVER:
mndTransDrop(pTrans);
return 0;
return code;
}
static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) {
......@@ -610,46 +651,82 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
return 0;
}
static int32_t mndSetDropDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pUndoRaw = mndDbActionEncode(pDb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pCommitRaw = mndDbActionEncode(pDb);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; }
static int32_t mndSetDropDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; }
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) {
mError("db:%s, failed to drop since %s", pDb->name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
SSdbRaw *pUndoRaw = mndDbActionEncode(pDb);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
if (mndSetDropDbUndoLogs(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
SSdbRaw *pCommitRaw = mndDbActionEncode(pDb);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndSetDropDbUndoActions(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
goto DROP_DB_OVER;
}
code = 0;
DROP_DB_OVER:
mndTransDrop(pTrans);
return 0;
return code;
}
static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) {
......@@ -751,29 +828,27 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SSyncDbMsg *pSync = pMsg->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
if (pDb == NULL) {
mError("db:%s, failed to process sync db msg since %s", pMsg->db, terrstr());
return -1;
} else {
mndReleaseDb(pMnode, pDb);
return 0;
}
mndReleaseDb(pMnode, pDb);
return 0;
}
static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
if (pDb == NULL) {
mError("db:%s, failed to process compact db msg since %s", pMsg->db, terrstr());
return -1;
} else {
mndReleaseDb(pMnode, pDb);
return 0;
}
mndReleaseDb(pMnode, pDb);
return 0;
}
static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
......
......@@ -18,10 +18,10 @@
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "ttime.h"
#include "tep.h"
#include "ttime.h"
#define TSDB_DNODE_VER 1
#define TSDB_DNODE_VER_NUMBER 1
#define TSDB_DNODE_RESERVE_SIZE 64
#define TSDB_CONFIG_OPTION_LEN 16
#define TSDB_CONIIG_VALUE_LEN 48
......@@ -101,14 +101,14 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1;
mDebug("dnode:%d, will be created while deploy sdb", dnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
}
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
......@@ -127,7 +127,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != TSDB_DNODE_VER) {
if (sver != TSDB_DNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode dnode since %s", terrstr());
return NULL;
......@@ -150,21 +150,8 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
mTrace("dnode:%d, perform insert action", pDnode->id);
pDnode->rebootTime = 0;
pDnode->lastAccessTime = 0;
pDnode->accessTimes = 0;
pDnode->numOfMnodes = 0;
pDnode->numOfVnodes = 0;
pDnode->numOfQnodes = 0;
pDnode->numOfSupportMnodes = 0;
pDnode->numOfSupportVnodes = 0;
pDnode->numOfSupportQnodes = 0;
pDnode->numOfCores = 0;
pDnode->status = DND_STATUS_OFFLINE;
pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port);
return 0;
}
......@@ -225,7 +212,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) {
int64_t ms = taosGetTimestampMs();
int64_t interval = ABS(pDnode->lastAccessTime - ms);
if (interval > 3000 * pMnode->cfg.statusInterval) {
if (interval > 3500 * pMnode->cfg.statusInterval) {
return false;
}
return true;
......@@ -267,12 +254,9 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
}
int64_t checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
if ((0 != strcasecmp(pCfg->timezone, pMnode->cfg.timezone)) && (checkTime != pCfg->checkTime)) {
if ((0 != strcasecmp(pCfg->timezone, pMnode->cfg.timezone)) && (pMnode->checkTime != pCfg->checkTime)) {
mError("timezone [%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, pMnode->cfg.timezone,
pCfg->checkTime, checkTime);
pCfg->checkTime, pMnode->checkTime);
return DND_REASON_TIME_ZONE_NOT_MATCH;
}
......
......@@ -21,13 +21,6 @@
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
typedef struct {
SEpSet epSet;
int8_t msgType;
int32_t contLen;
void *pCont;
} STransAction;
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
......@@ -37,11 +30,11 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code);
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray);
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
......@@ -343,10 +336,8 @@ char *mndTransStageStr(ETrnStage stage) {
return "commit";
case TRN_STAGE_ROLLBACK:
return "rollback";
case TRN_STAGE_RETRY:
return "retry";
case TRN_STAGE_OVER:
return "stop";
return "over";
default:
return "undefined";
}
......@@ -388,7 +379,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return NULL;
}
mDebug("trans:%d, data:%p is created", pTrans->id, pTrans);
mDebug("trans:%d, is created", pTrans->id);
return pTrans;
}
......@@ -417,7 +408,7 @@ void mndTransDrop(STrans *pTrans) {
mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions);
mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans);
// mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
tfree(pTrans);
}
......@@ -459,10 +450,8 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
return code;
}
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
STransAction action = {.epSet = *pEpSet, .msgType = msgType, .contLen = contLen, .pCont = pCont};
void *ptr = taosArrayPush(pArray, &action);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
void *ptr = taosArrayPush(pArray, pAction);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -471,16 +460,12 @@ static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgTy
return 0;
}
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, msgType, contLen, pCont);
mTrace("trans:%d, msg:%s len:%d append to redo actions", pTrans->id, taosMsg[msgType], contLen);
return code;
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
return mndTransAppendAction(pTrans->redoActions, pAction);
}
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, msgType, contLen, pCont);
mTrace("trans:%d, msg:%s len:%d append to undo actions", pTrans->id, taosMsg[msgType], contLen);
return code;
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
return mndTransAppendAction(pTrans->undoActions, pAction);
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
......@@ -493,7 +478,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
}
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("trans:%d, start sync", pTrans->id);
mTrace("trans:%d, sync to other nodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw);
if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
......@@ -533,7 +518,7 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
if (taosArrayGetSize(pTrans->commitLogs) != 0) {
mTrace("trans:%d, start sync", pTrans->id);
mTrace("trans:%d, sync to other nodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw);
if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
......@@ -563,7 +548,7 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
}
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
mTrace("trans:%d, start sync", pTrans->id);
mTrace("trans:%d, sync to other nodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw);
if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
......@@ -596,6 +581,50 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code)
// todo
}
void mndTransHandleActionRsp(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
int64_t sig = (int64_t)(pMsg->rpcMsg.ahandle);
int32_t transId = (int32_t)(sig >> 32);
int32_t action = (int32_t)((sig << 32) >> 32);
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans == NULL) {
mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr());
goto HANDLE_ACTION_RSP_OVER;
}
SArray *pArray = NULL;
if (pTrans->stage == TRN_STAGE_EXECUTE) {
pArray = pTrans->redoActions;
} else if (pTrans->stage == TRN_STAGE_ROLLBACK) {
pArray = pTrans->undoActions;
} else {
}
if (pArray == NULL) {
mError("trans:%d, invalid trans stage:%s", transId, mndTransStageStr(pTrans->stage));
goto HANDLE_ACTION_RSP_OVER;
}
int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
if (action < 0 || action > actionNum) {
mError("trans:%d, invalid action:%d", transId, action);
goto HANDLE_ACTION_RSP_OVER;
}
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction != NULL) {
pAction->msgReceived = 1;
pAction->errCode = pMsg->code;
}
mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code);
mndTransExecute(pMnode, pTrans);
HANDLE_ACTION_RSP_OVER:
mndReleaseTrans(pMnode, pTrans);
}
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb;
int32_t arraySize = taosArrayGetSize(pArray);
......@@ -618,7 +647,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
if (code != 0) {
mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr())
} else {
mTrace("trans:%d, execute redo logs finished", pTrans->id)
mDebug("trans:%d, execute redo logs finished", pTrans->id)
}
}
......@@ -632,7 +661,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
if (code != 0) {
mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr())
} else {
mTrace("trans:%d, execute undo logs finished", pTrans->id)
mDebug("trans:%d, execute undo logs finished", pTrans->id)
}
}
......@@ -646,47 +675,70 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
if (code != 0) {
mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr())
} else {
mTrace("trans:%d, execute commit logs finished", pTrans->id)
mDebug("trans:%d, execute commit logs finished", pTrans->id)
}
}
return code;
}
static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) {
#if 0
int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t i = 0; i < arraySize; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
if (numOfActions == 0) return 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent) continue;
int64_t signature = pTrans->id;
signature = (signature << 32);
signature += action;
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen};
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
pAction->msgSent = 1;
pAction->msgReceived = 0;
pAction->errCode = 0;
mDebug("trans:%d, action:%d is sent", pTrans->id, action);
mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg);
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
#else
return 0;
#endif
int32_t numOfReceivedMsgs = 0;
int32_t errorCode = 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent && pAction->msgReceived) {
numOfReceivedMsgs++;
if (pAction->errCode != 0) {
errorCode = pAction->errCode;
}
}
}
if (numOfReceivedMsgs == numOfActions) {
mDebug("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errorCode);
terrno = errorCode;
return errorCode;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
if (taosArrayGetSize(pTrans->redoActions) <= 0) return 0;
mTrace("trans:%d, start to execute redo actions", pTrans->id);
return mndTransExecuteActions(pMnode, pTrans->redoActions);
return mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
}
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
if (taosArrayGetSize(pTrans->undoActions) <= 0) return 0;
mTrace("trans:%d, start to execute undo actions", pTrans->id);
return mndTransExecuteActions(pMnode, pTrans->undoActions);
return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
}
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
......@@ -694,7 +746,7 @@ static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
mTrace("trans:%d, stage from prepare to execute", pTrans->id);
mDebug("trans:%d, stage from prepare to execute", pTrans->id);
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr());
......@@ -708,17 +760,17 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->stage = TRN_STAGE_COMMIT;
mTrace("trans:%d, stage from execute to commit", pTrans->id);
mDebug("trans:%d, stage from execute to commit", pTrans->id);
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mTrace("trans:%d, stage keep on execute since %s", pTrans->id, terrstr(code));
mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code));
return code;
} else {
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr());
} else {
pTrans->stage = TRN_STAGE_RETRY;
mError("trans:%d, stage from execute to retry since %s", pTrans->id, terrstr());
pTrans->stage = TRN_STAGE_EXECUTE;
mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr());
}
}
......@@ -726,29 +778,16 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
}
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_OVER;
mTrace("trans:%d, commit stage finished", pTrans->id);
} else {
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from commit to rollback since %s", pTrans->id, terrstr());
} else {
pTrans->stage = TRN_STAGE_RETRY;
mError("trans:%d, stage from commit to retry since %s", pTrans->id, terrstr());
}
}
return code;
mndTransExecuteCommitLogs(pMnode, pTrans);
pTrans->stage = TRN_STAGE_OVER;
return 0;
}
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
if (code == 0) {
mTrace("trans:%d, rollbacked", pTrans->id);
mDebug("trans:%d, rollbacked", pTrans->id);
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
......@@ -757,20 +796,6 @@ static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
return code;
}
static int32_t mndTransPerformRetryStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_COMMIT;
mTrace("trans:%d, stage from retry to commit", pTrans->id);
} else {
pTrans->stage = TRN_STAGE_RETRY;
mError("trans:%d, stage keep on retry since %s", pTrans->id, terrstr());
}
return code;
}
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
......@@ -785,7 +810,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
case TRN_STAGE_COMMIT:
code = mndTransCommit(pMnode, pTrans);
if (code == 0) {
code = mndTransPerformCommitStage(pMnode, pTrans);
mndTransPerformCommitStage(pMnode, pTrans);
}
break;
case TRN_STAGE_ROLLBACK:
......@@ -794,9 +819,6 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
code = mndTransRollback(pMnode, pTrans);
}
break;
case TRN_STAGE_RETRY:
code = mndTransPerformRetryStage(pMnode, pTrans);
break;
default:
mndTransSendRpcRsp(pTrans, 0);
return;
......
......@@ -21,7 +21,7 @@
#include "mndShow.h"
#include "mndTrans.h"
#define TSDB_VGROUP_VER_NUM 1
#define TSDB_VGROUP_VER_NUMBER 1
#define TSDB_VGROUP_RESERVE_SIZE 64
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
......@@ -70,7 +70,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
void mndCleanupVgroup(SMnode *pMnode) {}
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUM, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
......@@ -98,7 +98,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != TSDB_VGROUP_VER_NUM) {
if (sver != TSDB_VGROUP_VER_NUMBER) {
mError("failed to decode vgroup since %s", terrstr());
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL;
......@@ -142,14 +142,20 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNe
mTrace("vgId:%d, perform update action", pOldVgroup->vgId);
pOldVgroup->updateTime = pNewVgroup->updateTime;
pOldVgroup->version = pNewVgroup->version;
pOldVgroup->hashBegin = pNewVgroup->hashBegin;
pOldVgroup->hashEnd = pNewVgroup->hashEnd;
pOldVgroup->replica = pNewVgroup->replica;
memcpy(pOldVgroup->vnodeGid, pNewVgroup->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
return 0;
}
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_VGROUP, &vgId);
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
if (pVgroup == NULL) {
terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
}
return pVgroup;
}
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
......@@ -158,16 +164,17 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
}
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SCreateVnodeMsg *pCreate = malloc(sizeof(SCreateVnodeMsg));
SCreateVnodeMsg *pCreate = calloc(1, sizeof(SCreateVnodeMsg));
if (pCreate == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pCreate->dnodeId = htonl(pDnode->id);
pCreate->vgId = htonl(pVgroup->vgId);
pCreate->dnodeId = htonl(pDnode->id);
memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN);
pCreate->dbUid = htobe64(pDb->uid);
pCreate->vgVersion = htonl(pVgroup->version);
pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks);
pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile);
......@@ -193,7 +200,6 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
free(pCreate);
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
......@@ -217,7 +223,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
}
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SDropVnodeMsg *pDrop = malloc(sizeof(SDropVnodeMsg));
SDropVnodeMsg *pDrop = calloc(1, sizeof(SDropVnodeMsg));
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -269,7 +275,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
return -1;
}
int32_t alloceVgroups = 0;
int32_t allocedVgroups = 0;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
uint32_t hashMin = 0;
uint32_t hashMax = UINT32_MAX;
......@@ -281,7 +287,6 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroup->createdTime = taosGetTimestampMs();
pVgroup->updateTime = pVgroups->createdTime;
pVgroup->version = 1;
pVgroup->dbUid = pDb->uid;
pVgroup->hashBegin = hashMin + hashInterval * v;
if (v == pDb->cfg.numOfVgroups - 1) {
pVgroup->hashEnd = hashMax;
......@@ -290,6 +295,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
}
memcpy(pVgroup->dbName, pDb->name, TSDB_FULL_DB_NAME_LEN);
pVgroup->dbUid = pDb->uid;
pVgroup->replica = pDb->cfg.replications;
if (mndGetAvailableDnode(pMnode, pVgroup) != 0) {
......@@ -298,22 +304,25 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
return -1;
}
alloceVgroups++;
allocedVgroups++;
}
*ppVgroups = pVgroups;
return 0;
}
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) {
SSdb *pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
......@@ -329,7 +338,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (strcmp(pVgroup->dbName, dbName) == 0) {
if (pVgroup->dbUid == pDb->uid) {
replica = MAX(replica, pVgroup->replica);
numOfVgroups++;
}
......@@ -441,11 +450,25 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
}
static int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
if (dnodeId == 0) {
return 0;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfVnodes = 0;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
numOfVnodes++;
}
}
sdbRelease(pSdb, pVgroup);
}
return 0;
return numOfVnodes;
}
static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
......
......@@ -225,7 +225,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
}
return 0;
}
}
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
mDebug("start to open mnode in %s", path);
......@@ -237,6 +237,9 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL;
}
char timestr[24] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
if (pMnode->pSteps == NULL) {
free(pMnode);
......
......@@ -27,12 +27,12 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
pRaw->sver = sver;
pRaw->dataLen = dataLen;
mTrace("raw:%p, is created, len:%d", pRaw, dataLen);
// mTrace("raw:%p, is created, len:%d", pRaw, dataLen);
return pRaw;
}
void sdbFreeRaw(SSdbRaw *pRaw) {
mTrace("raw:%p, is freed", pRaw);
// mTrace("raw:%p, is freed", pRaw);
free(pRaw);
}
......
......@@ -61,8 +61,8 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRowsPerFileBlock = htonl(100);
pReq->maxRowsPerFileBlock = htonl(4096);
pReq->minRows = htonl(100);
pReq->maxRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
......
......@@ -134,8 +134,8 @@ static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks);
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile);
pMsg->commitTime = htonl((int32_t)pCreateDb->commitTime);
pMsg->minRowsPerFileBlock = htonl(pCreateDb->minRowsPerBlock);
pMsg->maxRowsPerFileBlock = htonl(pCreateDb->maxRowsPerBlock);
pMsg->minRows = htonl(pCreateDb->minRowsPerBlock);
pMsg->maxRows = htonl(pCreateDb->maxRowsPerBlock);
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
pMsg->compression = pCreateDb->compressionLevel;
pMsg->walLevel = (char)pCreateDb->walLevel;
......
......@@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tthread.h"
#include "os.h"
#include "taoserror.h"
#include "tdef.h"
#include "tutil.h"
#include "ulog.h"
#include "taoserror.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t));
pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param) {
pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t));
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
......@@ -36,26 +36,24 @@ pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
return pthread;
}
// destory thread
// destory thread
bool taosDestoryThread(pthread_t* pthread) {
if(pthread == NULL) return false;
if(taosThreadRunning(pthread)) {
if (pthread == NULL) return false;
if (taosThreadRunning(pthread)) {
pthread_cancel(*pthread);
pthread_join(*pthread, NULL);
}
free(pthread);
return true;
}
// thread running return true
bool taosThreadRunning(pthread_t* pthread) {
if(pthread == NULL) return false;
if (pthread == NULL) return false;
int ret = pthread_kill(*pthread, 0);
if(ret == ESRCH)
return false;
if(ret == EINVAL)
return false;
if (ret == ESRCH) return false;
if (ret == EINVAL) return false;
// alive
return true;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册