未验证 提交 ac2af4f7 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #11560 from taosdata/feature/vnode_refact1

refactor: vnode
...@@ -83,9 +83,9 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { ...@@ -83,9 +83,9 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->vgId = pCreate->vgId; pCfg->vgId = pCreate->vgId;
pCfg->wsize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize * 1024 * 1024;
pCfg->ssize = pCreate->cacheBlockSize; pCfg->ssize = 1024;
pCfg->lsize = pCreate->cacheBlockSize; pCfg->lsize = 1024 * 1024;
pCfg->isHeapAllocator = true; pCfg->isHeapAllocator = true;
pCfg->ttl = 4; pCfg->ttl = 4;
pCfg->keep = pCreate->daysToKeep0; pCfg->keep = pCreate->daysToKeep0;
...@@ -96,13 +96,12 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { ...@@ -96,13 +96,12 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0; pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0;
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
pCfg->tsdbCfg.retentions = pCreate->pRetensions; pCfg->tsdbCfg.retentions = pCreate->pRetensions;
pCfg->metaCfg.lruSize = pCreate->cacheBlockSize; pCfg->walCfg.level = TAOS_WAL_WRITE;
pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod; pCfg->walCfg.fsyncPeriod = 0;
pCfg->walCfg.level = pCreate->walLevel; pCfg->walCfg.retentionPeriod = 0;
pCfg->walCfg.retentionPeriod = 10; pCfg->walCfg.retentionSize = 0;
pCfg->walCfg.retentionSize = 128; pCfg->walCfg.rollPeriod = 0;
pCfg->walCfg.rollPeriod = 128; pCfg->walCfg.segSize = 0;
pCfg->walCfg.segSize = 128;
pCfg->walCfg.vgId = pCreate->vgId; pCfg->walCfg.vgId = pCreate->vgId;
pCfg->hashBegin = pCreate->hashBegin; pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd; pCfg->hashEnd = pCreate->hashEnd;
...@@ -160,13 +159,10 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -160,13 +159,10 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.qsizeFp = vmGetQueueSize; msgCb.qsizeFp = vmGetQueueSize;
vnodeCfg.msgCb = msgCb; SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb);
vnodeCfg.pTfs = pMgmt->pTfs;
vnodeCfg.dbId = wrapperCfg.dbUid;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) { if (pImpl == NULL) {
tFreeSCreateVnodeReq(&createReq);
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
tFreeSCreateVnodeReq(&createReq);
return -1; return -1;
} }
...@@ -175,7 +171,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -175,7 +171,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
tFreeSCreateVnodeReq(&createReq); tFreeSCreateVnodeReq(&createReq);
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr()); dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
vnodeClose(pImpl); vnodeClose(pImpl);
vnodeDestroy(wrapperCfg.path); vnodeDestroy(path, pMgmt->pTfs);
terrno = code; terrno = code;
return code; return code;
} }
...@@ -184,7 +180,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -184,7 +180,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
if (code != 0) { if (code != 0) {
tFreeSCreateVnodeReq(&createReq); tFreeSCreateVnodeReq(&createReq);
vnodeClose(pImpl); vnodeClose(pImpl);
vnodeDestroy(wrapperCfg.path); vnodeDestroy(path, pMgmt->pTfs);
terrno = code; terrno = code;
return code; return code;
} }
......
...@@ -84,6 +84,8 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -84,6 +84,8 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
} }
void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN];
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
...@@ -104,7 +106,8 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -104,7 +106,8 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode->dropped) { if (pVnode->dropped) {
dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped); dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
vnodeDestroy(pVnode->path); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
vnodeDestroy(path, pMgmt->pTfs);
} }
taosMemoryFree(pVnode->path); taosMemoryFree(pVnode->path);
...@@ -116,6 +119,7 @@ static void *vmOpenVnodeFunc(void *param) { ...@@ -116,6 +119,7 @@ static void *vmOpenVnodeFunc(void *param) {
SVnodeThread *pThread = param; SVnodeThread *pThread = param;
SVnodesMgmt *pMgmt = pThread->pMgmt; SVnodesMgmt *pMgmt = pThread->pMgmt;
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
char path[TSDB_FILENAME_LEN];
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes"); setThreadName("open-vnodes");
...@@ -134,8 +138,8 @@ static void *vmOpenVnodeFunc(void *param) { ...@@ -134,8 +138,8 @@ static void *vmOpenVnodeFunc(void *param) {
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.qsizeFp = vmGetQueueSize; msgCb.qsizeFp = vmGetQueueSize;
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++; pThread->failed++;
......
...@@ -37,16 +37,14 @@ extern "C" { ...@@ -37,16 +37,14 @@ extern "C" {
// vnode // vnode
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct SMetaCfg SMetaCfg; // todo: remove
typedef struct STsdbCfg STsdbCfg; // todo: remove typedef struct STsdbCfg STsdbCfg; // todo: remove
typedef struct STqCfg STqCfg; // todo: remove
typedef struct SVnodeCfg SVnodeCfg; typedef struct SVnodeCfg SVnodeCfg;
int vnodeInit(int nthreads); int vnodeInit(int nthreads);
void vnodeCleanup(); void vnodeCleanup();
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs); void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
...@@ -134,14 +132,9 @@ struct STsdbCfg { ...@@ -134,14 +132,9 @@ struct STsdbCfg {
SArray *retentions; SArray *retentions;
}; };
struct STqCfg {
int32_t reserved;
};
struct SVnodeCfg { struct SVnodeCfg {
int32_t vgId; int32_t vgId;
uint64_t dbId; uint64_t dbId;
STfs *pTfs;
uint64_t wsize; uint64_t wsize;
uint64_t ssize; uint64_t ssize;
uint64_t lsize; uint64_t lsize;
...@@ -151,10 +144,7 @@ struct SVnodeCfg { ...@@ -151,10 +144,7 @@ struct SVnodeCfg {
int8_t streamMode; int8_t streamMode;
bool isWeak; bool isWeak;
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SMetaCfg metaCfg;
STqCfg tqCfg;
SWalCfg walCfg; SWalCfg walCfg;
SMsgCb msgCb;
uint32_t hashBegin; uint32_t hashBegin;
uint32_t hashEnd; uint32_t hashEnd;
int8_t hashMethod; int8_t hashMethod;
......
...@@ -40,7 +40,7 @@ typedef struct SMSmaCursor SMSmaCursor; ...@@ -40,7 +40,7 @@ typedef struct SMSmaCursor SMSmaCursor;
#define META_CHILD_TABLE TD_CHILD_TABLE #define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE
SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF); SMeta* metaOpen(const char* path, SMemAllocatorFactory* pMAF);
void metaClose(SMeta* pMeta); void metaClose(SMeta* pMeta);
void metaRemove(const char* path); void metaRemove(const char* path);
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg);
...@@ -97,7 +97,6 @@ tb_uid_t metaGenerateUid(SMeta* pMeta); ...@@ -97,7 +97,6 @@ tb_uid_t metaGenerateUid(SMeta* pMeta);
struct SMeta { struct SMeta {
char* path; char* path;
SVnode* pVnode; SVnode* pVnode;
SMetaCfg options;
SMetaDB* pDB; SMetaDB* pDB;
SMetaIdx* pIdx; SMetaIdx* pIdx;
SMetaCache* pCache; SMetaCache* pCache;
......
...@@ -160,7 +160,6 @@ struct STQ { ...@@ -160,7 +160,6 @@ struct STQ {
// the handle of meta kvstore // the handle of meta kvstore
bool writeTrigger; bool writeTrigger;
char* path; char* path;
STqCfg* tqConfig;
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
// STqPushMgr* tqPushMgr; // STqPushMgr* tqPushMgr;
...@@ -251,8 +250,7 @@ int tqInit(); ...@@ -251,8 +250,7 @@ int tqInit();
void tqCleanUp(); void tqCleanUp();
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac);
SMemAllocatorFactory* allocFac);
void tqClose(STQ*); void tqClose(STQ*);
// required by vnode // required by vnode
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
......
...@@ -80,10 +80,11 @@ struct SVnodeInfo { ...@@ -80,10 +80,11 @@ struct SVnodeInfo {
}; };
struct SVnode { struct SVnode {
int32_t vgId;
char* path; char* path;
SVnodeCfg config; SVnodeCfg config;
SVState state; SVState state;
STfs* pTfs;
SMsgCb msgCb;
SVBufPool* pBufPool; SVBufPool* pBufPool;
SMeta* pMeta; SMeta* pMeta;
STsdb* pTsdb; STsdb* pTsdb;
...@@ -92,10 +93,10 @@ struct SVnode { ...@@ -92,10 +93,10 @@ struct SVnode {
SSink* pSink; SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
SQHandle* pQuery; SQHandle* pQuery;
SMsgCb msgCb;
STfs* pTfs;
}; };
#define TD_VID(PVNODE) (PVNODE)->config.vgId
// sma // sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
......
...@@ -17,16 +17,16 @@ ...@@ -17,16 +17,16 @@
#include "vnodeInt.h" #include "vnodeInt.h"
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); static SMeta *metaNew(const char *path, SMemAllocatorFactory *pMAF);
static void metaFree(SMeta *pMeta); static void metaFree(SMeta *pMeta);
static int metaOpenImpl(SMeta *pMeta); static int metaOpenImpl(SMeta *pMeta);
static void metaCloseImpl(SMeta *pMeta); static void metaCloseImpl(SMeta *pMeta);
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { SMeta *metaOpen(const char *path, SMemAllocatorFactory *pMAF) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
// Allocate handle // Allocate handle
pMeta = metaNew(path, pMetaCfg, pMAF); pMeta = metaNew(path, pMAF);
if (pMeta == NULL) { if (pMeta == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
...@@ -54,7 +54,7 @@ void metaClose(SMeta *pMeta) { ...@@ -54,7 +54,7 @@ void metaClose(SMeta *pMeta) {
void metaRemove(const char *path) { taosRemoveDir(path); } void metaRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { static SMeta *metaNew(const char *path, SMemAllocatorFactory *pMAF) {
SMeta *pMeta; SMeta *pMeta;
size_t psize = strlen(path); size_t psize = strlen(path);
......
...@@ -19,15 +19,13 @@ int32_t tqInit() { return tqPushMgrInit(); } ...@@ -19,15 +19,13 @@ int32_t tqInit() { return tqPushMgrInit(); }
void tqCleanUp() { tqPushMgrCleanUp(); } void tqCleanUp() { tqPushMgrCleanUp(); }
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) {
SMemAllocatorFactory* allocFac) {
STQ* pTq = taosMemoryMalloc(sizeof(STQ)); STQ* pTq = taosMemoryMalloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig;
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
pTq->pVnodeMeta = pVnodeMeta; pTq->pVnodeMeta = pVnodeMeta;
...@@ -267,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -267,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
SMqPollRsp rsp = { SMqPollRsp rsp = {
/*.consumerId = consumerId,*/ /*.consumerId = consumerId,*/
...@@ -277,7 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -277,7 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
pMsg->pCont = NULL; pMsg->pCont = NULL;
pMsg->contLen = 0; pMsg->contLen = 0;
pMsg->code = -1; pMsg->code = -1;
...@@ -303,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -303,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
if (pTopic == NULL) { if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
pTq->pVnode->vgId); TD_VID(pTq->pVnode));
pMsg->pCont = NULL; pMsg->pCont = NULL;
pMsg->contLen = 0; pMsg->contLen = 0;
pMsg->code = -1; pMsg->code = -1;
...@@ -312,7 +310,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -312,7 +310,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
pTq->pVnode->vgId); TD_VID(pTq->pVnode));
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.skipLogNum = 0; rsp.skipLogNum = 0;
...@@ -323,7 +321,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -323,7 +321,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch = atomic_load_32(&pConsumer->epoch); consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > reqEpoch) { if (consumerEpoch > reqEpoch) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
break; break;
} }
SWalReadHead* pHead; SWalReadHead* pHead;
...@@ -332,11 +330,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -332,11 +330,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// if data inserted during waiting, launch query and // if data inserted during waiting, launch query and
// response to user // response to user
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset); TD_VID(pTq->pVnode), fetchOffset);
break; break;
} }
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset, pHead->msgType); TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/ /*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
...@@ -361,7 +359,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -361,7 +359,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (taosArrayGetSize(pRes) == 0) { if (taosArrayGetSize(pRes) == 0) {
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, pTq->pVnode->vgId, fetchOffset); pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
fetchOffset++; fetchOffset++;
rsp.skipLogNum++; rsp.skipLogNum++;
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
...@@ -390,7 +388,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -390,7 +388,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch); pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
taosMemoryFree(pHead); taosMemoryFree(pHead);
...@@ -422,7 +420,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -422,7 +420,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch); pReq->epoch);
/*}*/ /*}*/
...@@ -446,14 +444,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -446,14 +444,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
SMqPollRspV2 rspV2 = {0}; SMqPollRspV2 rspV2 = {0};
rspV2.dataLen = 0; rspV2.dataLen = 0;
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
pMsg->pCont = NULL; pMsg->pCont = NULL;
pMsg->contLen = 0; pMsg->contLen = 0;
pMsg->code = -1; pMsg->code = -1;
...@@ -479,7 +477,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -479,7 +477,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
if (pTopic == NULL) { if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
pTq->pVnode->vgId); TD_VID(pTq->pVnode));
pMsg->pCont = NULL; pMsg->pCont = NULL;
pMsg->contLen = 0; pMsg->contLen = 0;
pMsg->code = -1; pMsg->code = -1;
...@@ -488,7 +486,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -488,7 +486,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
pTq->pVnode->vgId); TD_VID(pTq->pVnode));
rspV2.reqOffset = pReq->currentOffset; rspV2.reqOffset = pReq->currentOffset;
rspV2.skipLogNum = 0; rspV2.skipLogNum = 0;
...@@ -499,7 +497,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -499,7 +497,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch = atomic_load_32(&pConsumer->epoch); consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > reqEpoch) { if (consumerEpoch > reqEpoch) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
break; break;
} }
SWalReadHead* pHead; SWalReadHead* pHead;
...@@ -508,11 +506,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -508,11 +506,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// if data inserted during waiting, launch query and // if data inserted during waiting, launch query and
// response to user // response to user
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset); TD_VID(pTq->pVnode), fetchOffset);
break; break;
} }
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset, pHead->msgType); TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/ /*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
...@@ -537,7 +535,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -537,7 +535,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (taosArrayGetSize(pRes) == 0) { if (taosArrayGetSize(pRes) == 0) {
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, pTq->pVnode->vgId, fetchOffset); pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
fetchOffset++; fetchOffset++;
rspV2.skipLogNum++; rspV2.skipLogNum++;
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
...@@ -597,7 +595,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -597,7 +595,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = msgLen; pMsg->contLen = msgLen;
pMsg->code = 0; pMsg->code = 0;
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch); pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
taosMemoryFree(pHead); taosMemoryFree(pHead);
...@@ -631,7 +629,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -631,7 +629,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch); pReq->epoch);
/*}*/ /*}*/
...@@ -742,7 +740,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -742,7 +740,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
ASSERT(pTopic->buffer.output[i].task); ASSERT(pTopic->buffer.output[i].task);
} }
vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, pTq->pVnode->vgId); vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, TD_VID(pTq->pVnode));
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
if (create) { if (create) {
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
......
...@@ -108,7 +108,7 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { ...@@ -108,7 +108,7 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
goto _err; goto _err;
} }
pData = taosMemoryMalloc(size); pData = taosMemoryMalloc(size + 1);
if (pData == NULL) { if (pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -119,6 +119,8 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { ...@@ -119,6 +119,8 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
goto _err; goto _err;
} }
pData[size] = '\0';
taosCloseFile(&pFile); taosCloseFile(&pFile);
// decode info // decode info
...@@ -202,6 +204,16 @@ static int vnodeEncodeConfig(const void *pObj, SJson *pJson) { ...@@ -202,6 +204,16 @@ static int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
return 0; return 0;
} }
...@@ -229,6 +241,16 @@ static int vnodeDecodeConfig(const SJson *pJson, void *pObj) { ...@@ -229,6 +241,16 @@ static int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1; if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
if (tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
return 0; return 0;
} }
...@@ -286,7 +308,7 @@ _err: ...@@ -286,7 +308,7 @@ _err:
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
SJson *pJson = NULL; SJson *pJson = NULL;
pJson = tjsonCreateObject(); pJson = tjsonParse(pData);
if (pJson == NULL) { if (pJson == NULL) {
return -1; return -1;
} }
......
...@@ -15,10 +15,8 @@ ...@@ -15,10 +15,8 @@
#include "vnodeInt.h" #include "vnodeInt.h"
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeFree(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode);
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
...@@ -51,37 +49,41 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -51,37 +49,41 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return 0; return 0;
} }
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); }
SVnode *pVnode = NULL;
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// Set default options SVnode *pVnode = NULL;
SVnodeCfg cfg = vnodeCfgDefault; SVnodeInfo info = {0};
if (pVnodeCfg != NULL) { char dir[TSDB_FILENAME_LEN];
cfg.vgId = pVnodeCfg->vgId; int ret;
cfg.msgCb = pVnodeCfg->msgCb;
cfg.pTfs = pVnodeCfg->pTfs;
cfg.dbId = pVnodeCfg->dbId;
cfg.hashBegin = pVnodeCfg->hashBegin;
cfg.hashEnd = pVnodeCfg->hashEnd;
cfg.hashMethod = pVnodeCfg->hashMethod;
}
// Validate options snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
if (vnodeCheckCfg(&cfg) < 0) {
// TODO // load vnode info
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
return NULL; return NULL;
} }
// Create the handle // create handle
pVnode = vnodeNew(path, &cfg); pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode));
if (pVnode == NULL) { if (pVnode == NULL) {
// TODO: handle error terrno = TSDB_CODE_OUT_OF_MEMORY;
vError("vgId: %d failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
return NULL; return NULL;
} }
taosMkDir(path); pVnode->path = strdup(dir);
pVnode->config = info.config;
pVnode->state.committed = info.state.committed;
pVnode->state.processed = pVnode->state.applied = pVnode->state.committed;
pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb;
// Open the vnode tsem_init(&(pVnode->canCommit), 0, 1);
// open the vnode
if (vnodeOpenImpl(pVnode) < 0) { if (vnodeOpenImpl(pVnode) < 0) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
...@@ -93,41 +95,13 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { ...@@ -93,41 +95,13 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeCloseImpl(pVnode); vnodeCloseImpl(pVnode);
vnodeFree(pVnode);
}
}
void vnodeDestroy(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL;
pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode));
if (pVnode == NULL) {
// TODO
return NULL;
}
pVnode->vgId = pVnodeCfg->vgId;
pVnode->msgCb = pVnodeCfg->msgCb;
pVnode->pTfs = pVnodeCfg->pTfs;
pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
tsem_init(&(pVnode->canCommit), 0, 1);
return pVnode;
}
static void vnodeFree(SVnode *pVnode) {
if (pVnode) {
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
taosMemoryFreeClear(pVnode->path); taosMemoryFreeClear(pVnode->path);
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
} }
} }
/* ------------------------ STATIC METHODS ------------------------ */
static int vnodeOpenImpl(SVnode *pVnode) { static int vnodeOpenImpl(SVnode *pVnode) {
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
...@@ -138,7 +112,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -138,7 +112,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open meta // Open meta
sprintf(dir, "%s/meta", pVnode->path); sprintf(dir, "%s/meta", pVnode->path);
pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg), vBufPoolGetMAF(pVnode)); pVnode->pMeta = metaOpen(dir, vBufPoolGetMAF(pVnode));
if (pVnode->pMeta == NULL) { if (pVnode->pMeta == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
...@@ -147,7 +121,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -147,7 +121,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb // Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path); sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = pVnode->pTsdb =
tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); tsdbOpen(dir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
if (pVnode->pTsdb == NULL) { if (pVnode->pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
...@@ -163,7 +137,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -163,7 +137,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open TQ // Open TQ
sprintf(dir, "%s/tq", pVnode->path); sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "vnodeInt.h" #include "vnodeInt.h"
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
} }
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
...@@ -101,7 +101,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -101,7 +101,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
metaRsp.numOfColumns = nCols; metaRsp.numOfColumns = nCols;
metaRsp.tableType = pTbCfg->type; metaRsp.tableType = pTbCfg->type;
metaRsp.tuid = uid; metaRsp.tuid = uid;
metaRsp.vgId = pVnode->vgId; metaRsp.vgId = TD_VID(pVnode);
memcpy(metaRsp.pSchemas, pSW->pSchema, sizeof(SSchema) * pSW->nCols); memcpy(metaRsp.pSchemas, pSW->pSchema, sizeof(SSchema) * pSW->nCols);
if (nTagCols) { if (nTagCols) {
...@@ -151,7 +151,7 @@ _exit: ...@@ -151,7 +151,7 @@ _exit:
} }
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = pVnode->vgId; pLoad->vgId = TD_VID(pVnode);
pLoad->role = TAOS_SYNC_STATE_LEADER; pLoad->role = TAOS_SYNC_STATE_LEADER;
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = 400; pLoad->numOfTimeSeries = 400;
......
...@@ -36,7 +36,7 @@ void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { ...@@ -36,7 +36,7 @@ void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) {
if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
// TODO: handle error // TODO: handle error
/*ASSERT(false);*/ /*ASSERT(false);*/
vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
} }
} }
...@@ -73,12 +73,12 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -73,12 +73,12 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_ALTER_STB: case TDMT_VND_ALTER_STB:
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
case TDMT_VND_DROP_STB: case TDMT_VND_DROP_STB:
vTrace("vgId:%d, process drop stb req", pVnode->vgId); vTrace("vgId:%d, process drop stb req", TD_VID(pVnode));
break; break;
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
break; break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/ /*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/
if (pVnode->config.streamMode == 0) { if (pVnode->config.streamMode == 0) {
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
(*pRsp)->handle = pMsg->handle; (*pRsp)->handle = pMsg->handle;
...@@ -245,7 +245,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR ...@@ -245,7 +245,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
// TODO: handle error // TODO: handle error
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name);
} }
// TODO: to encapsule a free API // TODO: to encapsule a free API
taosMemoryFree(pCreateTbReq->name); taosMemoryFree(pCreateTbReq->name);
...@@ -268,7 +268,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR ...@@ -268,7 +268,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
} }
} }
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray));
taosArrayDestroy(vCreateTbBatchReq.pArray); taosArrayDestroy(vCreateTbBatchReq.pArray);
if (vCreateTbBatchRsp.rspList) { if (vCreateTbBatchRsp.rspList) {
int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp);
...@@ -289,7 +289,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR ...@@ -289,7 +289,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) { static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
SVCreateTbReq vAlterTbReq = {0}; SVCreateTbReq vAlterTbReq = {0};
vTrace("vgId:%d, process alter stb req", pVnode->vgId); vTrace("vgId:%d, process alter stb req", TD_VID(pVnode));
tDeserializeSVCreateTbReq(pReq, &vAlterTbReq); tDeserializeSVCreateTbReq(pReq, &vAlterTbReq);
// TODO: to encapsule a free API // TODO: to encapsule a free API
taosMemoryFree(vAlterTbReq.stbCfg.pSchema); taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册