提交 a12a4f38 编写于 作者: S Shengliang Guan

add vnode file

上级 97f16ecb
...@@ -223,6 +223,7 @@ int32_t* taosGetErrno(); ...@@ -223,6 +223,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress") #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories") #define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting" #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting"
#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0409) //"Parse vnodes.json error")
// vnode // vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
......
...@@ -212,7 +212,7 @@ do { \ ...@@ -212,7 +212,7 @@ do { \
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024 #define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES 64 #define TSDB_MIN_VNODES 64
#define TSDB_MAX_VNODES 2048 #define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 2 #define TSDB_MIN_VNODES_PER_DB 2
#define TSDB_MAX_VNODES_PER_DB 64 #define TSDB_MAX_VNODES_PER_DB 64
......
...@@ -50,6 +50,7 @@ void *taosAllocateQitem(int size); ...@@ -50,6 +50,7 @@ void *taosAllocateQitem(int size);
void taosFreeQitem(void *pItem); void taosFreeQitem(void *pItem);
int taosWriteQitem(taos_queue, void *pItem); int taosWriteQitem(taos_queue, void *pItem);
int taosReadQitem(taos_queue, void **pItem); int taosReadQitem(taos_queue, void **pItem);
bool taosQueueEmpty(taos_queue);
taos_qall taosAllocateQall(); taos_qall taosAllocateQall();
void taosFreeQall(taos_qall); void taosFreeQall(taos_qall);
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeVnodes.h" #include "dnodeVnodes.h"
#include "dnodeTransport.h" #include "dnodeTransport.h"
#include "cJSON.h"
#include "thash.h" #include "thash.h"
#include "tlockfree.h"
#include "tqueue.h" #include "tqueue.h"
#include "tstep.h" #include "tstep.h"
#include "tthread.h" #include "tthread.h"
...@@ -56,7 +58,7 @@ static struct { ...@@ -56,7 +58,7 @@ static struct {
SSteps *pSteps; SSteps *pSteps;
int32_t openVnodes; int32_t openVnodes;
int32_t totalVnodes; int32_t totalVnodes;
char file[PATH_MAX + 20]; SRWLatch latch;
} tsVnodes; } tsVnodes;
static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode); static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode);
...@@ -70,6 +72,28 @@ static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode); ...@@ -70,6 +72,28 @@ static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode); static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode); static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode);
static SVnodeObj *dnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&tsVnodes.latch);
taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
}
taosRUnLockLatch(&tsVnodes.latch);
dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount);
return pVnode;
}
static void dnodeReleaseVnode(SVnodeObj *pVnode) {
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) {
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) { if (pVnode == NULL) {
...@@ -107,13 +131,27 @@ static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { ...@@ -107,13 +131,27 @@ static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) {
return code; return code;
} }
return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosWLockLatch(&tsVnodes.latch);
code = taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosWUnLockLatch(&tsVnodes.latch);
return code;
} }
static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) {
taosWLockLatch(&tsVnodes.latch);
taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t));
taosWUnLockLatch(&tsVnodes.latch);
// wait all queue empty
dnodeReleaseVnode(pVnode);
while (pVnode->refCount > 0) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
//todo wait all queue empty
dnodeFreeVnodeQueryQueue(pVnode); dnodeFreeVnodeQueryQueue(pVnode);
dnodeFreeVnodeFetchQueue(pVnode); dnodeFreeVnodeFetchQueue(pVnode);
dnodeFreeVnodeWriteQueue(pVnode); dnodeFreeVnodeWriteQueue(pVnode);
...@@ -121,35 +159,164 @@ static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { ...@@ -121,35 +159,164 @@ static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) {
dnodeFreeVnodeSyncQueue(pVnode); dnodeFreeVnodeSyncQueue(pVnode);
} }
static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) { static SVnodeObj **dnodeGetVnodesFromHash(int32_t *numOfVnodes) {
taosRLockLatch(&tsVnodes.latch);
int32_t num = 0;
int32_t size = taosHashGetSize(tsVnodes.hash);
SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *));
void *pIter = taosHashIterate(tsVnodes.hash, NULL); void *pIter = taosHashIterate(tsVnodes.hash, NULL);
while (pIter) { while (pIter) {
SVnodeObj **ppVnode = pIter; SVnodeObj **ppVnode = pIter;
if (*ppVnode) { SVnodeObj *pVnode = *ppVnode;
(*numOfVnodes)++; if (pVnode) {
if (*numOfVnodes >= TSDB_MAX_VNODES) { num++;
dError("vgId:%d, too many open vnodes, exist:%d max:%d", (*ppVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES); if (num < size) {
continue; int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
} else { dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount);
pVnodes[*numOfVnodes - 1] = (*ppVnode); pVnodes[num] = (*ppVnode);
} }
} }
pIter = taosHashIterate(tsVnodes.hash, pIter); pIter = taosHashIterate(tsVnodes.hash, pIter);
} }
return TSDB_CODE_SUCCESS; taosRUnLockLatch(&tsVnodes.latch);
*numOfVnodes = num;
return pVnodes;
} }
static int32_t dnodeGetVnodesFromFile(SVnodeObj *pVnodes, int32_t *numOfVnodes) { static int32_t dnodeGetVnodesFromFile(SVnodeObj **ppVnodes, int32_t *numOfVnodes) {
pVnodes[0].vgId = 2; int32_t code = TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR;
pVnodes[0].dropped = 0; int32_t len = 0;
pVnodes[0].vgId = 3; int32_t maxLen = 30000;
pVnodes[0].dropped = 0; char *content = calloc(1, maxLen + 1);
return 0; cJSON *root = NULL;
FILE *fp = NULL;
char file[PATH_MAX + 20] = {0};
SVnodeObj *pVnodes = NULL;
snprintf(file, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("file %s not exist", file);
code = 0;
goto PRASE_VNODE_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", file);
goto PRASE_VNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", file);
goto PRASE_VNODE_OVER;
}
cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
if (!vnodes || vnodes->type != cJSON_Array) {
dError("failed to read %s since vnodes not found", file);
goto PRASE_VNODE_OVER;
}
int32_t vnodesNum = cJSON_GetArraySize(vnodes);
if (vnodesNum <= 0) {
dError("failed to read %s since vnodes size:%d invalid", file, vnodesNum);
goto PRASE_VNODE_OVER;
}
pVnodes = calloc(vnodesNum, sizeof(SVnodeObj));
if (pVnodes == NULL) {
dError("failed to read %s since out of memory", file);
goto PRASE_VNODE_OVER;
}
for (int32_t i = 0; i < vnodesNum; ++i) {
cJSON *vnode = cJSON_GetArrayItem(vnodes, i);
SVnodeObj *pVnode = &pVnodes[i];
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
if (!vgId || vgId->type != cJSON_String) {
dError("failed to read %s since vgId not found", file);
goto PRASE_VNODE_OVER;
}
pVnode->vgId = atoi(vgId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", file);
goto PRASE_VNODE_OVER;
}
pVnode->dropped = atoi(vnode->valuestring);
}
code = 0;
dInfo("succcessed to read file %s", file);
PRASE_VNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
return code;
} }
static int32_t dnodeWriteVnodesToFile() { return 0; } static int32_t dnodeWriteVnodesToFile() {
char file[PATH_MAX + 20] = {0};
char realfile[PATH_MAX + 20] = {0};
snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", tsVnodeDir);
snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s since %s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n");
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped);
if (i < numOfVnodes - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
dnodeReleaseVnode(pVnode);
}
if (pVnodes != NULL) {
free(pVnodes);
}
dInfo("successed to write %s", file);
return taosRenameFile(file, realfile);
}
static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) { static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) {
int32_t code = 0; int32_t code = 0;
...@@ -193,24 +360,6 @@ static int32_t dnodeDropVnode(SVnodeObj *pVnode) { ...@@ -193,24 +360,6 @@ static int32_t dnodeDropVnode(SVnodeObj *pVnode) {
return 0; return 0;
} }
static SVnodeObj *dnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = NULL;
taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
}
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount);
return pVnode;
}
static void dnodeReleaseVnode(SVnodeObj *pVnode) {
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
static void *dnodeOpenVnodeFunc(void *param) { static void *dnodeOpenVnodeFunc(void *param) {
SVThread *pThread = param; SVThread *pThread = param;
...@@ -246,15 +395,17 @@ static void *dnodeOpenVnodeFunc(void *param) { ...@@ -246,15 +395,17 @@ static void *dnodeOpenVnodeFunc(void *param) {
} }
static int32_t dnodeOpenVnodes() { static int32_t dnodeOpenVnodes() {
taosInitRWLatch(&tsVnodes.latch);
tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnodes.hash == NULL) { if (tsVnodes.hash == NULL) {
dError("failed to init vnode hash"); dError("failed to init vnode hash");
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
} }
SVnodeObj pVnodes[TSDB_MAX_VNODES] = {0}; SVnodeObj *pVnodes = NULL;
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
int32_t code = dnodeGetVnodesFromFile(pVnodes, &numOfVnodes); int32_t code = dnodeGetVnodesFromFile(&pVnodes, &numOfVnodes);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
dInfo("failed to get vnode list from disk since %s", tstrerror(code)); dInfo("failed to get vnode list from disk since %s", tstrerror(code));
return code; return code;
...@@ -308,17 +459,14 @@ static int32_t dnodeOpenVnodes() { ...@@ -308,17 +459,14 @@ static int32_t dnodeOpenVnodes() {
} }
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
SVnodeObj *pVnodes[TSDB_MAX_VNODES] = {0}; int32_t numOfVnodes = 0;
int32_t numOfVnodes = 0; SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes);
int32_t code = dnodeGetVnodesFromHash(pVnodes, &numOfVnodes);
if (code != TSDB_CODE_SUCCESS) {
dInfo("failed to get dnode list since code %d", code);
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeClose(pVnodes[i]->pImpl); dnodeDropVnodeWrapper(pVnodes[i]);
}
if (pVnodes != NULL) {
free(pVnodes);
} }
if (tsVnodes.hash != NULL) { if (tsVnodes.hash != NULL) {
...@@ -431,12 +579,12 @@ static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) { ...@@ -431,12 +579,12 @@ static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) {
return code; return code;
} }
code = vnodeDrop(pVnode->pImpl); code = dnodeDropVnode(pVnode);
if (code != 0) { if (code != 0) {
dnodeReleaseVnode(pVnode);
dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code)); dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code));
} }
dnodeReleaseVnode(pVnode);
return code; return code;
} }
......
...@@ -235,6 +235,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message lengt ...@@ -235,6 +235,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message lengt
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, "Too many vnode directories") TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, "Too many vnode directories")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR, "Parse vnodes.json error")
// vnode // vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress")
......
...@@ -98,6 +98,20 @@ void taosCloseQueue(taos_queue param) { ...@@ -98,6 +98,20 @@ void taosCloseQueue(taos_queue param) {
uTrace("queue:%p is closed", queue); uTrace("queue:%p is closed", queue);
} }
bool taosQueueEmpty(taos_queue param) {
if (param == NULL) return true;
STaosQueue *queue = (STaosQueue *)param;
bool empty = false;
pthread_mutex_lock(&queue->mutex);
if (queue->head == NULL && queue->tail == NULL) {
empty = true;
}
pthread_mutex_destroy(&queue->mutex);
return empty;
}
void *taosAllocateQitem(int size) { void *taosAllocateQitem(int size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册