提交 ce79f915 编写于 作者: S Shengliang Guan

fix: restore vnodes in multi-threads

上级 3c65b2d8
...@@ -74,6 +74,7 @@ typedef struct { ...@@ -74,6 +74,7 @@ typedef struct {
TdThread thread; TdThread thread;
SVnodeMgmt *pMgmt; SVnodeMgmt *pMgmt;
SWrapperCfg *pCfgs; SWrapperCfg *pCfgs;
SVnodeObj **ppVnodes;
} SVnodeThread; } SVnodeThread;
// vmInt.c // vmInt.c
......
...@@ -218,14 +218,14 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { ...@@ -218,14 +218,14 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
dInfo("start to close all vnodes"); dInfo("start to close all vnodes");
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
vmCloseVnode(pMgmt, pVnodes[i]); vmCloseVnode(pMgmt, ppVnodes[i]);
} }
if (pVnodes != NULL) { if (ppVnodes != NULL) {
taosMemoryFree(pVnodes); taosMemoryFree(ppVnodes);
} }
if (pMgmt->hash != NULL) { if (pMgmt->hash != NULL) {
...@@ -331,22 +331,92 @@ static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) { ...@@ -331,22 +331,92 @@ static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
return 0; return 0;
} }
static int32_t vmStart(SVnodeMgmt *pMgmt) { static void *vmRestoreVnodeInThread(void *param) {
SVnodeThread *pThread = param;
SVnodeMgmt *pMgmt = pThread->pMgmt;
dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("restore-vnodes");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SVnodeObj *pVnode = pThread->ppVnodes[v];
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
tmsgReportStartup("vnode-restore", stepDesc);
int32_t code = vnodeStart(pVnode->pImpl);
if (code != 0) {
dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->failed++;
} else {
dDebug("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->opened++;
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
}
}
dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
for (int32_t i = 0; i < numOfVnodes; ++i) { int32_t threadNum = tsNumOfCores / 2;
SVnodeObj *pVnode = pVnodes[i]; if (threadNum < 1) threadNum = 1;
vnodeStart(pVnode->pImpl); int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].pMgmt = pMgmt;
threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
} }
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SVnodeThread *pThread = &threads[t];
pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
}
pMgmt->state.openVnodes = 0;
dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);
for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
}
taosThreadAttrDestroy(&thAttr);
}
for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
taosThreadJoin(pThread->thread, NULL);
taosThreadClear(&pThread->thread);
}
taosMemoryFree(pThread->pCfgs);
}
taosMemoryFree(threads);
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i]; SVnodeObj *pVnode = ppVnodes[i];
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
} }
if (pVnodes != NULL) { if (ppVnodes != NULL) {
taosMemoryFree(pVnodes); taosMemoryFree(ppVnodes);
} }
return 0; return 0;
...@@ -360,7 +430,7 @@ SMgmtFunc vmGetMgmtFunc() { ...@@ -360,7 +430,7 @@ SMgmtFunc vmGetMgmtFunc() {
SMgmtFunc mgmtFunc = {0}; SMgmtFunc mgmtFunc = {0};
mgmtFunc.openFp = vmInit; mgmtFunc.openFp = vmInit;
mgmtFunc.closeFp = (NodeCloseFp)vmCleanup; mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
mgmtFunc.startFp = (NodeStartFp)vmStart; mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
mgmtFunc.stopFp = (NodeStopFp)vmStop; mgmtFunc.stopFp = (NodeStopFp)vmStop;
mgmtFunc.requiredFp = vmRequire; mgmtFunc.requiredFp = vmRequire;
mgmtFunc.getHandlesFp = vmGetMsgHandles; mgmtFunc.getHandlesFp = vmGetMsgHandles;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册