未验证 提交 6bf2e4d0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4117 from taosdata/feature/table

Feature/table
...@@ -20,9 +20,11 @@ ...@@ -20,9 +20,11 @@
extern "C" { extern "C" {
#endif #endif
int32_t dnodeInitVnodeRead(); int32_t dnodeInitVRead();
void dnodeCleanupVnodeRead(); void dnodeCleanupVRead();
void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg); void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -61,7 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = { ...@@ -61,7 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos}, {"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"wal", walInit, walCleanUp}, {"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vread", dnodeInitVRead, dnodeCleanupVRead},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite}, {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
......
...@@ -39,8 +39,8 @@ static int32_t tsDnodeSubmitReqNum = 0; ...@@ -39,8 +39,8 @@ static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
......
...@@ -17,83 +17,78 @@ ...@@ -17,83 +17,78 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "twal.h"
#include "tglobal.h" #include "tglobal.h"
#include "dnodeInt.h" #include "tqueue.h"
#include "dnodeMgmt.h"
#include "dnodeVRead.h"
#include "vnode.h" #include "vnode.h"
#include "dnodeInt.h"
typedef struct { typedef struct {
pthread_t thread; // thread pthread_t thread; // thread
int32_t workerId; // worker ID int32_t workerId; // worker ID
} SReadWorker; } SVReadWorker;
typedef struct { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t min; // min number of workers int32_t min; // min number of workers
int32_t num; // current number of workers int32_t num; // current number of workers
SReadWorker *readWorker; SVReadWorker * worker;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SReadWorkerPool; } SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *param); static void *dnodeProcessReadQueue(void *param);
static void dnodeHandleIdleReadWorker(SReadWorker *);
// module global variable // module global variable
static SReadWorkerPool readPool; static SVReadWorkerPool tsVReadWP;
static taos_qset readQset; static taos_qset tsVReadQset;
int32_t dnodeInitVnodeRead() { int32_t dnodeInitVRead() {
readQset = taosOpenQset(); tsVReadQset = taosOpenQset();
readPool.min = tsNumOfCores; tsVReadWP.min = tsNumOfCores;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min;
readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max); tsVReadWP.worker = (SVReadWorker *)calloc(sizeof(SVReadWorker), tsVReadWP.max);
pthread_mutex_init(&readPool.mutex, NULL); pthread_mutex_init(&tsVReadWP.mutex, NULL);
if (readPool.readWorker == NULL) return -1; if (tsVReadWP.worker == NULL) return -1;
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
pWorker->workerId = i; pWorker->workerId = i;
} }
dInfo("dnode read is initialized, min worker:%d max worker:%d", readPool.min, readPool.max); dInfo("dnode vread is initialized, min worker:%d max worker:%d", tsVReadWP.min, tsVReadWP.max);
return 0; return 0;
} }
void dnodeCleanupVnodeRead() { void dnodeCleanupVRead() {
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(readQset); taosQsetThreadResume(tsVReadQset);
} }
} }
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
} }
free(readPool.readWorker); free(tsVReadWP.worker);
taosCloseQset(readQset); taosCloseQset(tsVReadQset);
pthread_mutex_destroy(&readPool.mutex); pthread_mutex_destroy(&tsVReadWP.mutex);
dInfo("dnode read is closed"); dInfo("dnode vread is closed");
} }
void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char * pCont = (char *)pMsg->pCont;
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *)pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
...@@ -106,7 +101,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -106,7 +101,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SVReadMsg *pRead = taosAllocateQitem(sizeof(SVReadMsg));
pRead->rpcMsg = *pMsg; pRead->rpcMsg = *pMsg;
pRead->pCont = pCont; pRead->pCont = pCont;
pRead->contLen = pHead->contLen; pRead->contLen = pHead->contLen;
...@@ -120,60 +115,52 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -120,60 +115,52 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
} }
if (queuedMsgNum == 0) { if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.code = TSDB_CODE_VND_INVALID_VGROUP_ID,
.msgType = 0
};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
} }
void *dnodeAllocVReadQueue(void *pVnode) { void *dnodeAllocVReadQueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex); pthread_mutex_lock(&tsVReadWP.mutex);
taos_queue queue = taosOpenQueue(); taos_queue queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&readPool.mutex); pthread_mutex_unlock(&tsVReadWP.mutex);
return NULL; return NULL;
} }
taosAddIntoQset(readQset, queue, pVnode); taosAddIntoQset(tsVReadQset, queue, pVnode);
// spawn a thread to process queue // spawn a thread to process queue
if (readPool.num < readPool.max) { if (tsVReadWP.num < tsVReadWP.max) {
do { do {
SReadWorker *pWorker = readPool.readWorker + readPool.num; SVReadWorker *pWorker = tsVReadWP.worker + tsVReadWP.num;
pthread_attr_t thAttr; pthread_attr_t thAttr;
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno)); dError("failed to create thread to process vread vqueue since %s", strerror(errno));
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
readPool.num++; tsVReadWP.num++;
dDebug("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num); dDebug("dnode vread worker:%d is launched, total:%d", pWorker->workerId, tsVReadWP.num);
} while (readPool.num < readPool.min); } while (tsVReadWP.num < tsVReadWP.min);
} }
pthread_mutex_unlock(&readPool.mutex); pthread_mutex_unlock(&tsVReadWP.mutex);
dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue); dDebug("pVnode:%p, dnode vread queue:%p is allocated", pVnode, queue);
return queue; return queue;
} }
void dnodeFreeVReadQueue(void *rqueue) { void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue); taosCloseQueue(rqueue);
// dynamically adjust the number of threads
} }
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle, .handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp, .pCont = pRead->rspRet.rsp,
...@@ -186,32 +173,32 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { ...@@ -186,32 +173,32 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont); rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
SReadMsg *pReadMsg; SVReadMsg *pReadMsg;
int type; int32_t qtype;
void *pVnode; void * pVnode;
while (1) { while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pReadMsg, &pVnode) == 0) {
dDebug("qset:%p dnode read got no message from qset, exiting", readQset); dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset);
break; break;
} }
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle, dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type, pReadMsg); taosMsg[pReadMsg->rpcMsg.msgType], qtype, pReadMsg);
int32_t code = vnodeProcessRead(pVnode, pReadMsg); int32_t code = vnodeProcessRead(pVnode, pReadMsg);
if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcVReadRsp(pVnode, pReadMsg, code);
} else { } else {
if (code == TSDB_CODE_QRY_HAS_RSP) { if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); dnodeSendRpcVReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
} else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client } else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5)); assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5));
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
...@@ -223,19 +210,3 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -223,19 +210,3 @@ static void *dnodeProcessReadQueue(void *param) {
return NULL; return NULL;
} }
UNUSED_FUNC
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
int32_t num = taosGetQueueNumber(readQset);
if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) {
readPool.num--;
dDebug("read worker:%d is released, total:%d", pWorker->workerId, readPool.num);
pthread_exit(NULL);
} else {
usleep(30000);
sched_yield();
}
}
...@@ -15,13 +15,12 @@ ...@@ -15,13 +15,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "tsync.h"
#include "vnode.h" #include "vnode.h"
#include "syncInt.h"
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct { typedef struct {
...@@ -29,22 +28,21 @@ typedef struct { ...@@ -29,22 +28,21 @@ typedef struct {
taos_qset qset; // queue set taos_qset qset; // queue set
int32_t workerId; // worker ID int32_t workerId; // worker ID
pthread_t thread; // thread pthread_t thread; // thread
} SWriteWorker; } SVWriteWorker;
typedef struct { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *worker; SVWriteWorker * worker;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWriteWorkerPool; } SVWriteWorkerPool;
static SWriteWorkerPool tsVWriteWP; static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessWriteQueue(void *param); static void *dnodeProcessVWriteQueue(void *param);
int32_t dnodeInitVWrite() { int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores; tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max); tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1; if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL); pthread_mutex_init(&tsVWriteWP.mutex, NULL);
...@@ -58,14 +56,14 @@ int32_t dnodeInitVWrite() { ...@@ -58,14 +56,14 @@ int32_t dnodeInitVWrite() {
void dnodeCleanupVWrite() { void dnodeCleanupVWrite() {
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset); taosQsetThreadResume(pWorker->qset);
} }
} }
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
...@@ -100,7 +98,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { ...@@ -100,7 +98,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
pHead->msgType = pRpcMsg->msgType; pHead->msgType = pRpcMsg->msgType;
pHead->version = 0; pHead->version = 0;
pHead->len = pMsg->contLen; pHead->len = pMsg->contLen;
code = vnodeWriteToQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg); code = vnodeWriteToWQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -114,7 +112,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { ...@@ -114,7 +112,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
void *dnodeAllocVWriteQueue(void *pVnode) { void *dnodeAllocVWriteQueue(void *pVnode) {
pthread_mutex_lock(&tsVWriteWP.mutex); pthread_mutex_lock(&tsVWriteWP.mutex);
SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId; SVWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
void *queue = taosOpenQueue(); void *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&tsVWriteWP.mutex); pthread_mutex_unlock(&tsVWriteWP.mutex);
...@@ -141,7 +139,7 @@ void *dnodeAllocVWriteQueue(void *pVnode) { ...@@ -141,7 +139,7 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessVWriteQueue, pWorker) != 0) {
dError("failed to create thread to process vwrite queue since %s", strerror(errno)); dError("failed to create thread to process vwrite queue since %s", strerror(errno));
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
...@@ -190,8 +188,8 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { ...@@ -190,8 +188,8 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void *dnodeProcessWriteQueue(void *param) { static void *dnodeProcessVWriteQueue(void *param) {
SWriteWorker *pWorker = param; SVWriteWorker *pWorker = param;
SVWriteMsg * pWrite; SVWriteMsg * pWrite;
void * pVnode; void * pVnode;
int32_t numOfMsgs; int32_t numOfMsgs;
...@@ -206,18 +204,20 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -206,18 +204,20 @@ static void *dnodeProcessWriteQueue(void *param) {
break; break;
} }
bool forceFsync = false;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite, dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite,
taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version); taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version);
pWrite->code = vnodeProcessWrite(pVnode, qtype, pWrite); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1; if (pWrite->code <= 0) pWrite->processedCount = 1;
if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
} }
walFsync(vnodeGetWal(pVnode)); walFsync(vnodeGetWal(pVnode), forceFsync);
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(pWorker->qall); taosResetQitems(pWorker->qall);
......
...@@ -55,9 +55,9 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); ...@@ -55,9 +55,9 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode); void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode); void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue); void dnodeFreeVReadQueue(void *rqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue(); int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue(); void dnodeFreeMnodePqueue();
......
...@@ -55,7 +55,7 @@ void walStop(twalh); ...@@ -55,7 +55,7 @@ void walStop(twalh);
void walClose(twalh); void walClose(twalh);
int32_t walRenew(twalh); int32_t walRenew(twalh);
int32_t walWrite(twalh, SWalHead *); int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh); void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
int64_t walGetVersion(twalh); int64_t walGetVersion(twalh);
......
...@@ -38,10 +38,10 @@ typedef struct { ...@@ -38,10 +38,10 @@ typedef struct {
typedef struct { typedef struct {
SRspRet rspRet; SRspRet rspRet;
void *pCont; void * pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
} SReadMsg; } SVReadMsg;
typedef struct { typedef struct {
int32_t code; int32_t code;
...@@ -66,8 +66,8 @@ void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue ...@@ -66,8 +66,8 @@ void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg); int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *param);
...@@ -77,7 +77,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); ...@@ -77,7 +77,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources(); int32_t vnodeInitResources();
void vnodeCleanupResources(); void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pReadMsg);
int32_t vnodeCheckRead(void *pVnode); int32_t vnodeCheckRead(void *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -1090,7 +1090,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -1090,7 +1090,7 @@ static void *sdbWorkerFp(void *param) {
} }
} }
walFsync(tsSdbObj.wal); walFsync(tsSdbObj.wal, true);
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(tsSdbWriteQall); taosResetQitems(tsSdbWriteQall);
......
...@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db); strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToQueue; cqCfg.cqWrite = vnodeWriteToWQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) { if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
...@@ -305,7 +305,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -305,7 +305,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return terrno; return terrno;
} }
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); walRestore(pVnode->wal, pVnode, vnodeProcessWrite);
if (pVnode->version == 0) { if (pVnode->version == 0) {
pVnode->version = walGetVersion(pVnode->wal); pVnode->version = walGetVersion(pVnode->wal);
} }
...@@ -320,7 +320,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -320,7 +320,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.ahandle = pVnode; syncInfo.ahandle = pVnode;
syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.writeToCache = vnodeWriteToWQueue;
syncInfo.confirmForward = dnodeSendRpcVWriteRsp; syncInfo.confirmForward = dnodeSendRpcVWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
......
...@@ -29,9 +29,9 @@ ...@@ -29,9 +29,9 @@
#include "vnodeInt.h" #include "vnodeInt.h"
#include "tqueue.h" #include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
void vnodeInitReadFp(void) { void vnodeInitReadFp(void) {
...@@ -44,7 +44,7 @@ void vnodeInitReadFp(void) { ...@@ -44,7 +44,7 @@ void vnodeInitReadFp(void) {
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the // still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue // request enters the queue
// //
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType; int msgType = pReadMsg->rpcMsg.msgType;
...@@ -82,7 +82,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void ...@@ -82,7 +82,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void
int32_t code = vnodeCheckRead(pVnode); int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) return code;
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SVReadMsg *pRead = (SVReadMsg *)taosAllocateQitem(sizeof(SVReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle; pRead->pCont = qhandle;
pRead->contLen = 0; pRead->contLen = 0;
...@@ -146,7 +146,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) { ...@@ -146,7 +146,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRsp->completed = true; pRsp->completed = true;
} }
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont; void * pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen; int32_t contLen = pReadMsg->contLen;
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
...@@ -274,7 +274,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -274,7 +274,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
return code; return code;
} }
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont; void * pCont = pReadMsg->pCont;
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
......
...@@ -46,10 +46,11 @@ void vnodeInitWriteFp(void) { ...@@ -46,10 +46,11 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
} }
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = param; SVnodeObj * pVnode = vparam;
SWalHead * pHead = pWrite->pHead; SWalHead * pHead = wparam;
SRspRet * pRspRet = rparam;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
...@@ -80,7 +81,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { ...@@ -80,7 +81,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) {
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, &pWrite->rspRet, qtype); syncCode = syncForwardToPeer(pVnode->sync, pHead, pRspRet, qtype);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write into WAL // write into WAL
...@@ -90,7 +91,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) { ...@@ -90,7 +91,7 @@ int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite) {
pVnode->version = pHead->version; pVnode->version = pHead->version;
// write data locally // write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, &pWrite->rspRet); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
if (code < 0) return code; if (code < 0) return code;
return syncCode; return syncCode;
...@@ -204,7 +205,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR ...@@ -204,7 +205,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) { int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam; SWalHead * pHead = wparam;
...@@ -219,8 +220,8 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) ...@@ -219,8 +220,8 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg)
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
} }
if (pMsg != NULL) { if (rparam != NULL) {
SRpcMsg *pRpcMsg = pMsg; SRpcMsg *pRpcMsg = rparam;
pWrite->rpcHandle = pRpcMsg->handle; pWrite->rpcHandle = pRpcMsg->handle;
pWrite->rpcAhandle = pRpcMsg->ahandle; pWrite->rpcAhandle = pRpcMsg->ahandle;
} }
......
...@@ -70,7 +70,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { ...@@ -70,7 +70,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
tstrncpy(pWal->path, path, sizeof(pWal->path)); tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL); pthread_mutex_init(&pWal->mutex, NULL);
pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
if (walInitObj(pWal) != TSDB_CODE_SUCCESS) { if (walInitObj(pWal) != TSDB_CODE_SUCCESS) {
......
...@@ -111,11 +111,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -111,11 +111,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
return code; return code;
} }
void walFsync(void *handle) { void walFsync(void *handle, bool forceFsync) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return; if (pWal == NULL || pWal->fd < 0) return;
if (pWal->fsyncPeriod == 0) { if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, file:%s, do fsync", pWal->vgId, pWal->name);
if (fsync(pWal->fd) < 0) { if (fsync(pWal->fd) < 0) {
wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno));
} }
...@@ -143,12 +144,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { ...@@ -143,12 +144,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
continue; continue;
} }
if (!pWal->keep) {
wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName);
remove(walName);
} else {
wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
}
count++; count++;
} }
...@@ -305,8 +301,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -305,8 +301,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId, wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId,
pWal->version, pHead->version, pHead->len); pWal->version, pHead->version, pHead->len);
if (pWal->keep) pWal->version = pHead->version; pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册