提交 88b2848d 编写于 作者: C cadem

multi-threading

上级 18b039fe
......@@ -40,6 +40,7 @@ typedef struct SVnodeMgmt {
STfs *pTfs;
TdThread thread;
bool stop;
TdThreadMutex cfgMutex;
} SVnodeMgmt;
typedef struct {
......@@ -77,6 +78,11 @@ typedef struct {
SVnodeObj **ppVnodes;
} SVnodeThread;
typedef struct {
SQueueInfo *pInfo;
SRpcMsg *pMsg;
} SVnodeMgmtThreadParam;
// vmInt.c
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
......
......@@ -194,6 +194,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
if (buffer == NULL) goto _OVER;
terrno = 0;
taosThreadMutexLock(&pMgmt->cfgMutex);
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) goto _OVER;
......@@ -203,6 +204,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) goto _OVER;
taosThreadMutexUnlock(&pMgmt->cfgMutex);
code = 0;
dInfo("succeed to write vnodes file:%s, vnodes:%d", realfile, numOfVnodes);
......
......@@ -422,6 +422,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL);
taosThreadMutexInit(&pMgmt->cfgMutex, NULL);
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
......
......@@ -27,7 +27,11 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
tmsgSendRsp(&rsp);
}
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void* vmMgmtThreadFun(void *param){
SVnodeMgmtThreadParam *threadParam = (SVnodeMgmtThreadParam*) param;
SQueueInfo *pInfo = threadParam->pInfo;
SRpcMsg *pMsg = threadParam->pMsg;
SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
const STraceId *trace = &pMsg->info.traceId;
......@@ -68,6 +72,27 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
taosMemoryFree(param);
return NULL;
}
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
TdThread thread1;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
SVnodeMgmtThreadParam *param = taosMemoryCalloc(1, sizeof(SVnodeMgmtThreadParam));
param->pInfo = pInfo;
param->pMsg = pMsg;
if (taosThreadCreate(&thread1, &thAttr, vmMgmtThreadFun, param) != 0) {
dError("failed to create vnode timer thread since %s", strerror(errno));
return;
}
taosThreadAttrDestroy(&thAttr);
return;
}
static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册