未验证 提交 cf98f82a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4102 from taosdata/feature/wal

TD-1918
...@@ -20,9 +20,12 @@ ...@@ -20,9 +20,12 @@
extern "C" { extern "C" {
#endif #endif
int32_t dnodeInitVnodeWrite(); int32_t dnodeInitVWrite();
void dnodeCleanupVnodeWrite(); void dnodeCleanupVWrite();
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg); void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -62,7 +62,7 @@ static const SDnodeComponent tsDnodeComponents[] = { ...@@ -62,7 +62,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"wal", walInit, walCleanUp}, {"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
{"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer}, {"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer},
......
...@@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL; ...@@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL; static void *tsDnodeClientRpc = NULL;
int32_t dnodeInitServer() { int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue;
......
...@@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0; ...@@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
......
...@@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
} }
} }
void *dnodeAllocateVnodeRqueue(void *pVnode) { void *dnodeAllocVReadQueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex); pthread_mutex_lock(&readPool.mutex);
taos_queue queue = taosOpenQueue(); taos_queue queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
...@@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) { ...@@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
return queue; return queue;
} }
void dnodeFreeVnodeRqueue(void *rqueue) { void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue); taosCloseQueue(rqueue);
// dynamically adjust the number of threads // dynamically adjust the number of threads
......
...@@ -15,74 +15,65 @@ ...@@ -15,74 +15,65 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "tglobal.h"
#include "taoserror.h"
#include "tutil.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tsync.h" #include "tsync.h"
#include "vnode.h" #include "vnode.h"
#include "dnodeInt.h"
#include "syncInt.h" #include "syncInt.h"
#include "dnodeVWrite.h" #include "dnodeInt.h"
#include "dnodeMgmt.h"
typedef struct { typedef struct {
taos_qall qall; taos_qall qall;
taos_qset qset; // queue set taos_qset qset; // queue set
pthread_t thread; // thread int32_t workerId; // worker ID
int32_t workerId; // worker ID pthread_t thread; // thread
} SWriteWorker; } SWriteWorker;
typedef struct { typedef struct {
SRspRet rspRet; SRspRet rspRet;
int32_t processedCount; SRpcMsg rpcMsg;
int32_t code; int32_t processedCount;
void *pCont; int32_t code;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; void * pCont;
} SWriteMsg; } SWriteMsg;
typedef struct { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker; SWriteWorker *worker;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWriteWorkerPool; } SWriteWorkerPool;
static SWriteWorkerPool tsVWriteWP;
static void *dnodeProcessWriteQueue(void *param); static void *dnodeProcessWriteQueue(void *param);
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
SWriteWorkerPool wWorkerPool; int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL);
int32_t dnodeInitVnodeWrite() { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
wWorkerPool.max = tsNumOfCores; tsVWriteWP.worker[i].workerId = i;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1;
pthread_mutex_init(&wWorkerPool.mutex, NULL);
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
wWorkerPool.writeWorker[i].workerId = i;
} }
dInfo("dnode write is initialized, max worker %d", wWorkerPool.max); dInfo("dnode vwrite is initialized, max worker %d", tsVWriteWP.max);
return 0; return 0;
} }
void dnodeCleanupVnodeWrite() { void dnodeCleanupVWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i; SWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset); taosQsetThreadResume(pWorker->qset);
} }
} }
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i; SWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
...@@ -90,13 +81,13 @@ void dnodeCleanupVnodeWrite() { ...@@ -90,13 +81,13 @@ void dnodeCleanupVnodeWrite() {
} }
} }
pthread_mutex_destroy(&wWorkerPool.mutex); pthread_mutex_destroy(&tsVWriteWP.mutex);
free(wWorkerPool.writeWorker); tfree(tsVWriteWP.worker);
dInfo("dnode write is closed"); dInfo("dnode vwrite is closed");
} }
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) {
char *pCont = (char *)pMsg->pCont; char *pCont = pMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
SMsgDesc *pDesc = (SMsgDesc *)pCont; SMsgDesc *pDesc = (SMsgDesc *)pCont;
...@@ -111,7 +102,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { ...@@ -111,7 +102,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
taos_queue queue = vnodeAcquireWqueue(pHead->vgId); taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) { if (queue) {
// put message into queue // put message into queue
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg; pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont; pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen; pWrite->contLen = pHead->contLen;
...@@ -130,12 +121,12 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { ...@@ -130,12 +121,12 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
} }
} }
void *dnodeAllocateVnodeWqueue(void *pVnode) { void *dnodeAllocVWriteQueue(void *pVnode) {
pthread_mutex_lock(&wWorkerPool.mutex); pthread_mutex_lock(&tsVWriteWP.mutex);
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
void *queue = taosOpenQueue(); void *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&wWorkerPool.mutex); pthread_mutex_unlock(&tsVWriteWP.mutex);
return NULL; return NULL;
} }
...@@ -143,7 +134,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { ...@@ -143,7 +134,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
taosCloseQueue(queue); taosCloseQueue(queue);
pthread_mutex_unlock(&wWorkerPool.mutex); pthread_mutex_unlock(&tsVWriteWP.mutex);
return NULL; return NULL;
} }
...@@ -152,7 +143,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { ...@@ -152,7 +143,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
if (pWorker->qall == NULL) { if (pWorker->qall == NULL) {
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
taosCloseQueue(queue); taosCloseQueue(queue);
pthread_mutex_unlock(&wWorkerPool.mutex); pthread_mutex_unlock(&tsVWriteWP.mutex);
return NULL; return NULL;
} }
pthread_attr_t thAttr; pthread_attr_t thAttr;
...@@ -160,37 +151,35 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { ...@@ -160,37 +151,35 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno)); dError("failed to create thread to process vwrite queue since %s", strerror(errno));
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
taosCloseQueue(queue); taosCloseQueue(queue);
queue = NULL; queue = NULL;
} else { } else {
dDebug("write worker:%d is launched", pWorker->workerId); dDebug("dnode vwrite worker:%d is launched", pWorker->workerId);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max;
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
} else { } else {
taosAddIntoQset(pWorker->qset, queue, pVnode); taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max;
} }
pthread_mutex_unlock(&wWorkerPool.mutex); pthread_mutex_unlock(&tsVWriteWP.mutex);
dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue); dDebug("pVnode:%p, dnode vwrite queue:%p is allocated", pVnode, queue);
return queue; return queue;
} }
void dnodeFreeVnodeWqueue(void *wqueue) { void dnodeFreeVWriteQueue(void *wqueue) {
taosCloseQueue(wqueue); taosCloseQueue(wqueue);
// dynamically adjust the number of threads
} }
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) { void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
SWriteMsg *pWrite = (SWriteMsg *)param; if (param == NULL) return;
if (pWrite == NULL) return; SWriteMsg *pWrite = param;
if (code < 0) pWrite->code = code; if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
...@@ -215,44 +204,45 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -215,44 +204,45 @@ static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param; SWriteWorker *pWorker = (SWriteWorker *)param;
SWriteMsg * pWrite; SWriteMsg * pWrite;
SWalHead * pHead; SWalHead * pHead;
int32_t numOfMsgs;
int type;
void * pVnode, *item;
SRspRet * pRspRet; SRspRet * pRspRet;
void * pVnode;
void * pItem;
int32_t numOfMsgs;
int32_t qtype;
dDebug("write worker:%d is running", pWorker->workerId); dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset); dDebug("qset:%p, dnode vwrite got no message from qset, exiting", pWorker->qset);
break; break;
} }
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL; pWrite = NULL;
pRspRet = NULL; pRspRet = NULL;
taosGetQitem(pWorker->qall, &type, &item); taosGetQitem(pWorker->qall, &qtype, &pItem);
if (type == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item; pWrite = pItem;
pRspRet = &pWrite->rspRet; pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
pHead->len = pWrite->contLen; pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]); taosMsg[pWrite->rpcMsg.msgType]);
} else if (type == TAOS_QTYPE_CQ) { } else if (qtype == TAOS_QTYPE_CQ) {
pHead = (SWalHead *)((char*)item + sizeof(SSyncHead)); pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead));
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version); pHead->version);
} else { } else {
pHead = (SWalHead *)item; pHead = pItem;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version); pHead->version);
} }
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet);
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType], dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
pHead->version, tstrerror(code)); pHead->version, tstrerror(code));
...@@ -267,17 +257,17 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -267,17 +257,17 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(pWorker->qall); taosResetQitems(pWorker->qall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &type, &item); taosGetQitem(pWorker->qall, &qtype, &pItem);
if (type == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item; pWrite = pItem;
dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code);
} else if (type == TAOS_QTYPE_FWD) { } else if (qtype == TAOS_QTYPE_FWD) {
pHead = (SWalHead *)item; pHead = pItem;
vnodeConfirmForward(pVnode, pHead->version, 0); vnodeConfirmForward(pVnode, pHead->version, 0);
taosFreeQitem(item); taosFreeQitem(pItem);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} else { } else {
taosFreeQitem(item); taosFreeQitem(pItem);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
} }
...@@ -285,19 +275,3 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -285,19 +275,3 @@ static void *dnodeProcessWriteQueue(void *param) {
return NULL; return NULL;
} }
UNUSED_FUNC
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset);
if (num > 0) {
usleep(30000);
sched_yield();
} else {
taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset);
pWorker->qset = NULL;
dDebug("write worker:%d is released", pWorker->workerId);
pthread_exit(NULL);
}
}
...@@ -53,11 +53,11 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); ...@@ -53,11 +53,11 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocateVnodeWqueue(void *pVnode); void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue); void dnodeFreeVWriteQueue(void *wqueue);
void *dnodeAllocateVnodeRqueue(void *pVnode); void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue); void dnodeFreeVReadQueue(void *rqueue);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue(); int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue(); void dnodeFreeMnodePqueue();
......
...@@ -22,14 +22,14 @@ extern "C" { ...@@ -22,14 +22,14 @@ extern "C" {
#ifndef TAOS_OS_FUNC_ALLOC #ifndef TAOS_OS_FUNC_ALLOC
#define tmalloc(size) malloc(size) #define tmalloc(size) malloc(size)
#define tcalloc(size) calloc(1, size) #define tcalloc(nmemb, size) calloc(nmemb, size)
#define trealloc(p, size) realloc(p, size) #define trealloc(p, size) realloc(p, size)
#define tmemalign(alignment, size) malloc(size) #define tmemalign(alignment, size) malloc(size)
#define tfree(p) free(p) #define tfree(p) free(p)
#define tmemzero(p, size) memset(p, 0, size) #define tmemzero(p, size) memset(p, 0, size)
#else #else
void *tmalloc(int32_t size); void *tmalloc(int32_t size);
void *tcalloc(int32_t size); void *tcalloc(int32_t nmemb, int32_t size);
void *trealloc(void *p, int32_t size); void *trealloc(void *p, int32_t size);
void *tmemalign(int32_t alignment, int32_t size); void *tmemalign(int32_t alignment, int32_t size);
void tfree(void *p); void tfree(void *p);
......
...@@ -32,11 +32,11 @@ void *tmalloc(int32_t size) { ...@@ -32,11 +32,11 @@ void *tmalloc(int32_t size) {
return p; return p;
} }
void *tcalloc(int32_t size) { void *tcalloc(int32_t nmemb, int32_t size) {
void *p = calloc(1, size); void *p = calloc(nmemb, size);
if (p == NULL) { if (p == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to calloc memory, size:%d reason:%s", size, strerror(errno)); uError("failed to calloc memory, nmemb:%d size:%d reason:%s", nmemb, size, strerror(errno));
} }
return p; return p;
......
...@@ -254,8 +254,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -254,8 +254,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->fversion = pVnode->version; pVnode->fversion = pVnode->version;
pVnode->wqueue = dnodeAllocateVnodeWqueue(pVnode); pVnode->wqueue = dnodeAllocVWriteQueue(pVnode);
pVnode->rqueue = dnodeAllocateVnodeRqueue(pVnode); pVnode->rqueue = dnodeAllocVReadQueue(pVnode);
if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) { if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
...@@ -321,7 +321,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -321,7 +321,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp; syncInfo.confirmForward = dnodeSendRpcVWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced; syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
...@@ -408,12 +408,12 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -408,12 +408,12 @@ void vnodeRelease(void *pVnodeRaw) {
} }
if (pVnode->wqueue) { if (pVnode->wqueue) {
dnodeFreeVnodeWqueue(pVnode->wqueue); dnodeFreeVWriteQueue(pVnode->wqueue);
pVnode->wqueue = NULL; pVnode->wqueue = NULL;
} }
if (pVnode->rqueue) { if (pVnode->rqueue) {
dnodeFreeVnodeRqueue(pVnode->rqueue); dnodeFreeVReadQueue(pVnode->rqueue);
pVnode->rqueue = NULL; pVnode->rqueue = NULL;
} }
......
...@@ -55,7 +55,7 @@ void walCleanUp() { ...@@ -55,7 +55,7 @@ void walCleanUp() {
} }
void *walOpen(char *path, SWalCfg *pCfg) { void *walOpen(char *path, SWalCfg *pCfg) {
SWal *pWal = tcalloc(sizeof(SWal)); SWal *pWal = tcalloc(1, sizeof(SWal));
if (pWal == NULL) { if (pWal == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册