提交 b89877b4 编写于 作者: S Shengliang Guan

refact: add vnode timer

上级 e7646660
......@@ -38,6 +38,8 @@ typedef struct SVnodeMgmt {
TdThreadRwlock lock;
SVnodesStat state;
STfs *pTfs;
TdThread thread;
bool stop;
} SVnodeMgmt;
typedef struct {
......
......@@ -334,6 +334,49 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
taosMemoryFree(pMgmt);
}
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {}
static void *vmThreadFp(void *param) {
SVnodeMgmt *pMgmt = param;
int64_t lastTime = 0;
setThreadName("vnode-timer");
while (1) {
lastTime++;
taosMsleep(100);
if (pMgmt->stop) break;
if (lastTime % 10 != 0) continue;
int64_t sec = lastTime / 10;
if (sec % (tsStatusInterval * 5) == 0) {
vmCheckSyncTimeout(pMgmt);
}
}
return NULL;
}
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
dError("failed to create vnode timer thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
return 0;
}
static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
pMgmt->stop = true;
if (taosCheckPthreadValid(pMgmt->thread)) {
taosThreadJoin(pMgmt->thread, NULL);
taosThreadClear(&pMgmt->thread);
}
}
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
int32_t code = -1;
......@@ -510,12 +553,10 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
taosMemoryFree(ppVnodes);
}
return 0;
return vmInitTimer(pMgmt);
}
static void vmStop(SVnodeMgmt *pMgmt) {
// process inside the vnode
}
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
SMgmtFunc vmGetMgmtFunc() {
SMgmtFunc mgmtFunc = {0};
......
......@@ -26,21 +26,6 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
tsem_wait(&pVnode->syncSem);
}
static inline void vnodeWaitBlockMsgOld(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) {
const STraceId *trace = &pMsg->info.traceId;
taosThreadMutexLock(&pVnode->lock);
if (!pVnode->blocked) {
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
pVnode->blocked = true;
taosThreadMutexUnlock(&pVnode->lock);
tsem_wait(&pVnode->syncSem);
} else {
taosThreadMutexUnlock(&pVnode->lock);
}
}
}
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) {
const STraceId *trace = &pMsg->info.traceId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册