提交 6ef580b4 编写于 作者: S Shengliang Guan

[TD-777] use multi-threads to open vnodes

上级 99bc6a5f
......@@ -39,6 +39,15 @@
#define MPEER_CONTENT_LEN 2000
typedef struct {
pthread_t thread;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
int32_t * vnodeList;
} SOpenVnodeThread;
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime;
......@@ -242,14 +251,33 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
return TSDB_CODE_SUCCESS;
}
static int32_t dnodeOpenVnodes() {
static void *dnodeOpenVnode(void *param) {
SOpenVnodeThread *pThread = param;
char vnodeDir[TSDB_FILENAME_LEN * 3];
int32_t failed = 0;
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes;
int32_t status;
status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId);
if (vnodeOpen(vgId, vnodeDir) < 0) {
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
pThread->opened++;
}
}
dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t dnodeOpenVnodes() {
int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t));
int32_t numOfVnodes;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed");
......@@ -257,13 +285,51 @@ static int32_t dnodeOpenVnodes() {
return status;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++;
int32_t threadNum = tsNumOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SOpenVnodeThread *pThread = &threads[t];
pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
}
dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
pthread_attr_destroy(&thAttr);
}
int32_t openVnodes = 0;
int32_t failedVnodes = 0;
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->thread) {
pthread_join(pThread->thread, NULL);
}
openVnodes += pThread->opened;
failedVnodes += pThread->failed;
free(pThread->vnodeList);
}
free(vnodeList);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册