diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 925197d708ff15464df3835e55f98a38dc2861f1..7eb08a8f4fac86f5202e9419152ec7c7f7ae9d68 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -83,9 +83,9 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->vgId = pCreate->vgId; - pCfg->wsize = pCreate->cacheBlockSize; - pCfg->ssize = pCreate->cacheBlockSize; - pCfg->lsize = pCreate->cacheBlockSize; + pCfg->wsize = pCreate->cacheBlockSize * 1024 * 1024; + pCfg->ssize = 1024; + pCfg->lsize = 1024 * 1024; pCfg->isHeapAllocator = true; pCfg->ttl = 4; pCfg->keep = pCreate->daysToKeep0; @@ -96,13 +96,12 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0; pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; pCfg->tsdbCfg.retentions = pCreate->pRetensions; - pCfg->metaCfg.lruSize = pCreate->cacheBlockSize; - pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod; - pCfg->walCfg.level = pCreate->walLevel; - pCfg->walCfg.retentionPeriod = 10; - pCfg->walCfg.retentionSize = 128; - pCfg->walCfg.rollPeriod = 128; - pCfg->walCfg.segSize = 128; + pCfg->walCfg.level = TAOS_WAL_WRITE; + pCfg->walCfg.fsyncPeriod = 0; + pCfg->walCfg.retentionPeriod = 0; + pCfg->walCfg.retentionSize = 0; + pCfg->walCfg.rollPeriod = 0; + pCfg->walCfg.segSize = 0; pCfg->walCfg.vgId = pCreate->vgId; pCfg->hashBegin = pCreate->hashBegin; pCfg->hashEnd = pCreate->hashEnd; @@ -160,13 +159,10 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - vnodeCfg.msgCb = msgCb; - vnodeCfg.pTfs = pMgmt->pTfs; - vnodeCfg.dbId = wrapperCfg.dbUid; - SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); + SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); if (pImpl == NULL) { - tFreeSCreateVnodeReq(&createReq); dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); + tFreeSCreateVnodeReq(&createReq); return -1; } @@ -175,7 +171,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { tFreeSCreateVnodeReq(&createReq); dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr()); vnodeClose(pImpl); - vnodeDestroy(wrapperCfg.path); + vnodeDestroy(path, pMgmt->pTfs); terrno = code; return code; } @@ -184,7 +180,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { if (code != 0) { tFreeSCreateVnodeReq(&createReq); vnodeClose(pImpl); - vnodeDestroy(wrapperCfg.path); + vnodeDestroy(path, pMgmt->pTfs); terrno = code; return code; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 1b8e0eb961e4d43800557a16ee8c9c212f422eb2..bf33e85b959c3b786600571ebae8ea7728c9895c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -84,6 +84,8 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { } void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { + char path[TSDB_FILENAME_LEN]; + taosWLockLatch(&pMgmt->latch); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosWUnLockLatch(&pMgmt->latch); @@ -104,7 +106,8 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { if (pVnode->dropped) { dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped); - vnodeDestroy(pVnode->path); + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId); + vnodeDestroy(path, pMgmt->pTfs); } taosMemoryFree(pVnode->path); @@ -116,6 +119,7 @@ static void *vmOpenVnodeFunc(void *param) { SVnodeThread *pThread = param; SVnodesMgmt *pMgmt = pThread->pMgmt; SDnode *pDnode = pMgmt->pDnode; + char path[TSDB_FILENAME_LEN]; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); setThreadName("open-vnodes"); @@ -134,8 +138,8 @@ static void *vmOpenVnodeFunc(void *param) { msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; - SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); + SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 303376dba44f2cda60ae39875b9a96a0aab8c053..c969ad5abcb8032392920e33795ab51fbd94d08c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -37,16 +37,14 @@ extern "C" { // vnode typedef struct SVnode SVnode; -typedef struct SMetaCfg SMetaCfg; // todo: remove typedef struct STsdbCfg STsdbCfg; // todo: remove -typedef struct STqCfg STqCfg; // todo: remove typedef struct SVnodeCfg SVnodeCfg; int vnodeInit(int nthreads); void vnodeCleanup(); int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); -void vnodeDestroy(const char *path); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); +void vnodeDestroy(const char *path, STfs *pTfs); +SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs); int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); @@ -134,14 +132,9 @@ struct STsdbCfg { SArray *retentions; }; -struct STqCfg { - int32_t reserved; -}; - struct SVnodeCfg { int32_t vgId; uint64_t dbId; - STfs *pTfs; uint64_t wsize; uint64_t ssize; uint64_t lsize; @@ -151,10 +144,7 @@ struct SVnodeCfg { int8_t streamMode; bool isWeak; STsdbCfg tsdbCfg; - SMetaCfg metaCfg; - STqCfg tqCfg; SWalCfg walCfg; - SMsgCb msgCb; uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 94a1266f46a3f63a95982680a7d628523108091e..f30696c9ac2e31723a28014cb64491eb9747d293 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -40,7 +40,7 @@ typedef struct SMSmaCursor SMSmaCursor; #define META_CHILD_TABLE TD_CHILD_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE -SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF); +SMeta* metaOpen(const char* path, SMemAllocatorFactory* pMAF); void metaClose(SMeta* pMeta); void metaRemove(const char* path); int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); @@ -97,7 +97,6 @@ tb_uid_t metaGenerateUid(SMeta* pMeta); struct SMeta { char* path; SVnode* pVnode; - SMetaCfg options; SMetaDB* pDB; SMetaIdx* pIdx; SMetaCache* pCache; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ce81006661b7529f7526d27f8a66bba3a26cccbc..ed33473b1620102b1eacc1d7786f7a8f610f9b36 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -160,7 +160,6 @@ struct STQ { // the handle of meta kvstore bool writeTrigger; char* path; - STqCfg* tqConfig; STqMemRef tqMemRef; STqMetaStore* tqMeta; // STqPushMgr* tqPushMgr; @@ -251,8 +250,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, - SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac); void tqClose(STQ*); // required by vnode int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e95878b04e3b88f42de31135d0d295344c1a3037..a765f5e88a56953ccc00c4234aa8beae7457ea37 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -80,10 +80,11 @@ struct SVnodeInfo { }; struct SVnode { - int32_t vgId; char* path; SVnodeCfg config; SVState state; + STfs* pTfs; + SMsgCb msgCb; SVBufPool* pBufPool; SMeta* pMeta; STsdb* pTsdb; @@ -92,10 +93,10 @@ struct SVnode { SSink* pSink; tsem_t canCommit; SQHandle* pQuery; - SMsgCb msgCb; - STfs* pTfs; }; +#define TD_VID(PVNODE) (PVNODE)->config.vgId + // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/meta/metaMain.c b/source/dnode/vnode/src/meta/metaMain.c index dd60e56371bd3b2f589a75823504eba2492ecaa9..879a7e8a6fc46b9c5ea9336a9aee3d8798d2cf32 100644 --- a/source/dnode/vnode/src/meta/metaMain.c +++ b/source/dnode/vnode/src/meta/metaMain.c @@ -17,16 +17,16 @@ #include "vnodeInt.h" -static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); +static SMeta *metaNew(const char *path, SMemAllocatorFactory *pMAF); static void metaFree(SMeta *pMeta); static int metaOpenImpl(SMeta *pMeta); static void metaCloseImpl(SMeta *pMeta); -SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { +SMeta *metaOpen(const char *path, SMemAllocatorFactory *pMAF) { SMeta *pMeta = NULL; // Allocate handle - pMeta = metaNew(path, pMetaCfg, pMAF); + pMeta = metaNew(path, pMAF); if (pMeta == NULL) { // TODO: handle error return NULL; @@ -54,7 +54,7 @@ void metaClose(SMeta *pMeta) { void metaRemove(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { +static SMeta *metaNew(const char *path, SMemAllocatorFactory *pMAF) { SMeta *pMeta; size_t psize = strlen(path); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bfcc5dd2bb78db54ec7162c682ddf639c0fd760a..0aa6023eaf39cedb77bd8344dea3c1bae2aac8d4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -19,15 +19,13 @@ int32_t tqInit() { return tqPushMgrInit(); } void tqCleanUp() { tqPushMgrCleanUp(); } -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, - SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } pTq->path = strdup(path); - pTq->tqConfig = tqConfig; pTq->pVnode = pVnode; pTq->pWal = pWal; pTq->pVnodeMeta = pVnodeMeta; @@ -267,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -277,7 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -303,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (pTopic == NULL) { vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -312,7 +310,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; @@ -323,7 +321,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > reqEpoch) { vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", - consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); + consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } SWalReadHead* pHead; @@ -332,11 +330,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // if data inserted during waiting, launch query and // response to user vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset); + TD_VID(pTq->pVnode), fetchOffset); break; } vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset, pHead->msgType); + TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -361,7 +359,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (taosArrayGetSize(pRes) == 0) { vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, pTq->pVnode->vgId, fetchOffset); + pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -390,7 +388,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); @@ -422,7 +420,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch); /*}*/ @@ -446,14 +444,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); SMqPollRspV2 rspV2 = {0}; rspV2.dataLen = 0; STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -479,7 +477,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (pTopic == NULL) { vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -488,7 +486,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); rspV2.reqOffset = pReq->currentOffset; rspV2.skipLogNum = 0; @@ -499,7 +497,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > reqEpoch) { vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", - consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); + consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } SWalReadHead* pHead; @@ -508,11 +506,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // if data inserted during waiting, launch query and // response to user vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset); + TD_VID(pTq->pVnode), fetchOffset); break; } vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset, pHead->msgType); + TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -537,7 +535,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (taosArrayGetSize(pRes) == 0) { vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, pTq->pVnode->vgId, fetchOffset); + pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); fetchOffset++; rspV2.skipLogNum++; taosArrayDestroy(pRes); @@ -597,7 +595,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = msgLen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); @@ -631,7 +629,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch); /*}*/ @@ -742,7 +740,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); ASSERT(pTopic->buffer.output[i].task); } - vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, pTq->pVnode->vgId); + vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, TD_VID(pTq->pVnode)); taosArrayPush(pConsumer->topics, pTopic); if (create) { tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index fa249d3ba1fe6323e0ca9af8f2d9d4f496aceac6..3ece3c054e97b8019d571b169675cceecc575570 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -108,7 +108,7 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { goto _err; } - pData = taosMemoryMalloc(size); + pData = taosMemoryMalloc(size + 1); if (pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -119,6 +119,8 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { goto _err; } + pData[size] = '\0'; + taosCloseFile(&pFile); // decode info @@ -202,6 +204,16 @@ static int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; return 0; } @@ -229,6 +241,16 @@ static int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1; + if (tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1; + if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; + if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; return 0; } @@ -286,7 +308,7 @@ _err: static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { SJson *pJson = NULL; - pJson = tjsonCreateObject(); + pJson = tjsonParse(pData); if (pJson == NULL) { return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 241c26ab1c38f6ef54d85da35d2b7bb113c2b050..e7aeb75ea5ebfc3e346fb23d8c65223ed83f1332 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -15,10 +15,8 @@ #include "vnodeInt.h" -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); -static void vnodeFree(SVnode *pVnode); -static int vnodeOpenImpl(SVnode *pVnode); -static void vnodeCloseImpl(SVnode *pVnode); +static int vnodeOpenImpl(SVnode *pVnode); +static void vnodeCloseImpl(SVnode *pVnode); int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { SVnodeInfo info = {0}; @@ -51,37 +49,41 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { return 0; } -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { - SVnode *pVnode = NULL; - - // Set default options - SVnodeCfg cfg = vnodeCfgDefault; - if (pVnodeCfg != NULL) { - cfg.vgId = pVnodeCfg->vgId; - cfg.msgCb = pVnodeCfg->msgCb; - cfg.pTfs = pVnodeCfg->pTfs; - cfg.dbId = pVnodeCfg->dbId; - cfg.hashBegin = pVnodeCfg->hashBegin; - cfg.hashEnd = pVnodeCfg->hashEnd; - cfg.hashMethod = pVnodeCfg->hashMethod; - } +void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); } + +SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { + SVnode *pVnode = NULL; + SVnodeInfo info = {0}; + char dir[TSDB_FILENAME_LEN]; + int ret; - // Validate options - if (vnodeCheckCfg(&cfg) < 0) { - // TODO + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path); + + // load vnode info + ret = vnodeLoadInfo(dir, &info); + if (ret < 0) { + vError("failed to open vnode from %s since %s", path, tstrerror(terrno)); return NULL; } - // Create the handle - pVnode = vnodeNew(path, &cfg); + // create handle + pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode)); if (pVnode == NULL) { - // TODO: handle error + terrno = TSDB_CODE_OUT_OF_MEMORY; + vError("vgId: %d failed to open vnode since %s", info.config.vgId, tstrerror(terrno)); return NULL; } - taosMkDir(path); + pVnode->path = strdup(dir); + pVnode->config = info.config; + pVnode->state.committed = info.state.committed; + pVnode->state.processed = pVnode->state.applied = pVnode->state.committed; + pVnode->pTfs = pTfs; + pVnode->msgCb = msgCb; - // Open the vnode + tsem_init(&(pVnode->canCommit), 0, 1); + + // open the vnode if (vnodeOpenImpl(pVnode) < 0) { // TODO: handle error return NULL; @@ -93,41 +95,13 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { void vnodeClose(SVnode *pVnode) { if (pVnode) { vnodeCloseImpl(pVnode); - vnodeFree(pVnode); - } -} - -void vnodeDestroy(const char *path) { taosRemoveDir(path); } - -/* ------------------------ STATIC METHODS ------------------------ */ -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { - SVnode *pVnode = NULL; - - pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode)); - if (pVnode == NULL) { - // TODO - return NULL; - } - - pVnode->vgId = pVnodeCfg->vgId; - pVnode->msgCb = pVnodeCfg->msgCb; - pVnode->pTfs = pVnodeCfg->pTfs; - pVnode->path = strdup(path); - vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); - - tsem_init(&(pVnode->canCommit), 0, 1); - - return pVnode; -} - -static void vnodeFree(SVnode *pVnode) { - if (pVnode) { tsem_destroy(&(pVnode->canCommit)); taosMemoryFreeClear(pVnode->path); taosMemoryFree(pVnode); } } +/* ------------------------ STATIC METHODS ------------------------ */ static int vnodeOpenImpl(SVnode *pVnode) { char dir[TSDB_FILENAME_LEN]; @@ -138,7 +112,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open meta sprintf(dir, "%s/meta", pVnode->path); - pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg), vBufPoolGetMAF(pVnode)); + pVnode->pMeta = metaOpen(dir, vBufPoolGetMAF(pVnode)); if (pVnode->pMeta == NULL) { // TODO: handle error return -1; @@ -147,7 +121,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open tsdb sprintf(dir, "%s/tsdb", pVnode->path); pVnode->pTsdb = - tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); + tsdbOpen(dir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); if (pVnode->pTsdb == NULL) { // TODO: handle error return -1; @@ -163,7 +137,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open TQ sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) { // TODO: handle error return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 75079d50b285ff87bfed713e30d4adf7daba6862..4202c02a0c0b755fcc9a6082494377a6257c7855 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -16,7 +16,7 @@ #include "vnodeInt.h" int vnodeQueryOpen(SVnode *pVnode) { - return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); + return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } @@ -101,7 +101,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { metaRsp.numOfColumns = nCols; metaRsp.tableType = pTbCfg->type; metaRsp.tuid = uid; - metaRsp.vgId = pVnode->vgId; + metaRsp.vgId = TD_VID(pVnode); memcpy(metaRsp.pSchemas, pSW->pSchema, sizeof(SSchema) * pSW->nCols); if (nTagCols) { @@ -151,7 +151,7 @@ _exit: } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { - pLoad->vgId = pVnode->vgId; + pLoad->vgId = TD_VID(pVnode); pLoad->role = TAOS_SYNC_STATE_LEADER; pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = 400; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 87cef5304231b7f336205904013b9dcc9e56e2a2..288241eb66eada439ea4152db8e042ef7fe7047f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -36,7 +36,7 @@ void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { // TODO: handle error /*ASSERT(false);*/ - vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); + vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr()); } } @@ -73,12 +73,12 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_ALTER_STB: return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); case TDMT_VND_DROP_STB: - vTrace("vgId:%d, process drop stb req", pVnode->vgId); + vTrace("vgId:%d, process drop stb req", TD_VID(pVnode)); break; case TDMT_VND_DROP_TABLE: break; case TDMT_VND_SUBMIT: - /*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/ + /*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/ if (pVnode->config.streamMode == 0) { *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); (*pRsp)->handle = pMsg->handle; @@ -245,7 +245,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // TODO: handle error - vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); + vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name); } // TODO: to encapsule a free API taosMemoryFree(pCreateTbReq->name); @@ -268,7 +268,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR } } - vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); + vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); if (vCreateTbBatchRsp.rspList) { int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); @@ -289,7 +289,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) { SVCreateTbReq vAlterTbReq = {0}; - vTrace("vgId:%d, process alter stb req", pVnode->vgId); + vTrace("vgId:%d, process alter stb req", TD_VID(pVnode)); tDeserializeSVCreateTbReq(pReq, &vAlterTbReq); // TODO: to encapsule a free API taosMemoryFree(vAlterTbReq.stbCfg.pSchema);