diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 751edd6f98807a5d61f58479404b8648b9e5adb8..b8917e6dba604c5c54a99e439b55e9e30a9b6fd8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -86,9 +86,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->vgId = pCreate->vgId; strcpy(pCfg->dbname, pCreate->db); - pCfg->wsize = pCreate->cacheBlockSize * 1024 * 1024; - pCfg->ssize = 1024; - pCfg->lsize = 1024 * 1024; + pCfg->szBuf = pCreate->cacheBlockSize * 1024 * 1024; pCfg->streamMode = pCreate->streamMode; pCfg->isWeak = true; pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0; diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 04a84d375a9a92816fb6c3191d52570418e2e2fb..5516797d998a2462ec89b345d93d197efc27ae5f 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -5,9 +5,9 @@ target_sources( PRIVATE # vnode "src/vnd/vnodeOpen.c" - "src/vnd/vnodeArenaMAImpl.c" - "src/vnd/vnodeBufferPool.c" - # "src/vnd/vnodeBufferPool2.c" + # "src/vnd/vnodeArenaMAImpl.c" + # "src/vnd/vnodeBufferPool.c" + "src/vnd/vnodeBufPool.c" "src/vnd/vnodeCfg.c" "src/vnd/vnodeCommit.c" "src/vnd/vnodeInt.c" @@ -27,6 +27,7 @@ target_sources( # tsdb "src/tsdb/tsdbTDBImpl.c" "src/tsdb/tsdbCommit.c" + "src/tsdb/tsdbCommit2.c" "src/tsdb/tsdbCompact.c" "src/tsdb/tsdbFile.c" "src/tsdb/tsdbFS.c" diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 1258d124bfd49e77220576dfdd2e1df771030815..c24261a909c8e80169a612b41f45219cfab426be 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -123,6 +123,7 @@ struct STsdbCfg { int8_t precision; int8_t update; int8_t compression; + int8_t slLevel; int32_t days; int32_t minRows; int32_t maxRows; @@ -139,10 +140,8 @@ struct SVnodeCfg { uint64_t dbId; int32_t szPage; int32_t szCache; - uint64_t wsize; - uint64_t ssize; - uint64_t lsize; - bool isHeapAllocator; + uint64_t szBuf; + bool isHeap; uint32_t ttl; uint32_t keep; int8_t streamMode; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ed33473b1620102b1eacc1d7786f7a8f610f9b36..0cab00f47ad4ffb2911d9e2041222793c52db43e 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -250,7 +250,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta); void tqClose(STQ*); // required by vnode int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ce9549af563106e38afa061ef781e452af47d97d..e899e820085e29cb26e1548791a11f94d617f502 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -30,21 +30,38 @@ extern "C" { #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +// tsdbMemTable ================ +typedef struct STbData STbData; +typedef struct STsdbMemTable STsdbMemTable; +typedef struct SMergeInfo SMergeInfo; +typedef struct STable STable; + +int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable); +void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable); +int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp); +int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, + TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); + +// tsdbCommit ================ +int tsdbBegin(STsdb *pTsdb); + +#if 1 + typedef struct SSmaStat SSmaStat; typedef struct SSmaEnv SSmaEnv; typedef struct SSmaEnvs SSmaEnvs; -typedef struct STable { +struct STable { uint64_t tid; uint64_t uid; STSchema *pSchema; -} STable; +}; #define TABLE_TID(t) (t)->tid #define TABLE_UID(t) (t)->uid -STsdb *tsdbOpen(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); -void tsdbClose(STsdb *); +int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb); +int tsdbClose(STsdb *pTsdb); int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); @@ -110,25 +127,28 @@ typedef struct { TSKEY minKey; } SRtn; -typedef struct STbData { +struct STbData { tb_uid_t uid; TSKEY keyMin; TSKEY keyMax; + int64_t minVer; + int64_t maxVer; int64_t nrows; SSkipList *pData; -} STbData; +}; -typedef struct STsdbMemTable { +struct STsdbMemTable { + SVBufPool *pPool; T_REF_DECLARE() - SRWLatch latch; - TSKEY keyMin; - TSKEY keyMax; - uint64_t nRow; - SMemAllocator *pMA; - // Container + SRWLatch latch; + TSKEY keyMin; + TSKEY keyMax; + int64_t minVer; + int64_t maxVer; + int64_t nRow; SSkipList *pSlIdx; // SSkiplist SHashObj *pHashIdx; -} STsdbMemTable; +}; typedef struct { uint32_t version; // Commit version from 0 to increase @@ -154,18 +174,17 @@ typedef struct { } STsdbFS; struct STsdb { - int32_t vgId; - SVnode *pVnode; - bool repoLocked; - TdThreadMutex mutex; - char *path; - STsdbCfg config; - STsdbMemTable *mem; - STsdbMemTable *imem; - SRtn rtn; - SMemAllocatorFactory *pmaf; - STsdbFS *fs; - SSmaEnvs smaEnvs; + char *path; + SVnode *pVnode; + int32_t vgId; + bool repoLocked; + TdThreadMutex mutex; + STsdbCfg config; + STsdbMemTable *mem; + STsdbMemTable *imem; + SRtn rtn; + STsdbFS *fs; + SSmaEnvs smaEnvs; }; #define REPO_ID(r) ((r)->vgId) @@ -187,7 +206,7 @@ static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, } // tsdbMemTable.h -typedef struct { +struct SMergeInfo { int rowsInserted; int rowsUpdated; int rowsDeleteSucceed; @@ -195,7 +214,7 @@ typedef struct { int nOperations; TSKEY keyFirst; TSKEY keyLast; -} SMergeInfo; +}; static void *taosTMalloc(size_t size); static void *taosTCalloc(size_t nmemb, size_t size); @@ -204,12 +223,6 @@ static void *taosTZfree(void *ptr); static size_t taosTSizeof(void *ptr); static void taosTMemset(void *ptr, int c); -STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb); -void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable); -int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp); -int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, - TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); - static FORCE_INLINE STSRow *tsdbNextIterRow(SSkipListIterator *pIter) { if (pIter == NULL) return NULL; @@ -988,6 +1001,8 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void return len; } +#endif + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index e0c158edb1bd5f33b8eda01254ec70888e05eb51..afbea8663fecebed65757f1c2a8d980f5e78cfff 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -40,22 +40,31 @@ int vnodeDecodeConfig(const SJson* pJson, void* pObj); // vnodeModule ==================== int vnodeScheduleTask(int (*execute)(void*), void* arg); -// vnodeQuery ==================== -int vnodeQueryOpen(SVnode* pVnode); -void vnodeQueryClose(SVnode* pVnode); -int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); - -// vnodeCommit ==================== -int vnodeBegin(SVnode* pVnode); -int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); -int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); -int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); -int vnodeSyncCommit(SVnode* pVnode); -int vnodeAsyncCommit(SVnode* pVnode); - -#define vnodeShouldCommit vnodeBufPoolIsFull - +// vnodeBufPool ==================== #if 1 +typedef struct SVBufPoolNode SVBufPoolNode; +struct SVBufPoolNode { + SVBufPoolNode* prev; + SVBufPoolNode** pnext; + int64_t size; + uint8_t data[]; +}; + +struct SVBufPool { + SVBufPool* next; + int64_t nRef; + int64_t size; + uint8_t* ptr; + SVBufPoolNode* pTail; + SVBufPoolNode node; +}; + +int vnodeOpenBufPool(SVnode* pVnode, int64_t size); +int vnodeCloseBufPool(SVnode* pVnode); +void vnodeBufPoolReset(SVBufPool* pPool); +void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); +void vnodeBufPoolFree(SVBufPool* pPool, void* p); +#else // SVBufPool int vnodeOpenBufPool(SVnode* pVnode); void vnodeCloseBufPool(SVnode* pVnode); @@ -90,9 +99,21 @@ void vmaReset(SVMemAllocator* pVMA); void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); void vmaFree(SVMemAllocator* pVMA, void* ptr); bool vmaIsFull(SVMemAllocator* pVMA); - #endif +// vnodeQuery ==================== +int vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryClose(SVnode* pVnode); +int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); + +// vnodeCommit ==================== +int vnodeBegin(SVnode* pVnode); +int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); +int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); +int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); +int vnodeSyncCommit(SVnode* pVnode); +int vnodeAsyncCommit(SVnode* pVnode); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a9904013240c911b8d17bfe8e8624f9ec0c8e22f..50e51b84c1e017d87b4fd12315a7f200479bc0d2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -91,7 +91,10 @@ struct SVnode { SVState state; STfs* pTfs; SMsgCb msgCb; - SVBufPool* pBufPool; + SVBufPool* pPool; + SVBufPool* inUse; + SVBufPool* onCommit; + SVBufPool* onRecycle; SMeta* pMeta; STsdb* pTsdb; SWal* pWal; diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 8e28b6ea7b4f2859c04aab0efdfabe61b7ec984f..10126ddd1db0cd4365fda6cc523fe60952856f52 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -16,6 +16,9 @@ #include "vnodeInt.h" int metaBegin(SMeta *pMeta) { - // TODO + if (tdbBegin(pMeta->pEnv, NULL) < 0) { + return -1; + } + return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0aa6023eaf39cedb77bd8344dea3c1bae2aac8d4..57ea79fe8578da55f0173384b170bb62bc51e6f9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); } void tqCleanUp() { tqPushMgrCleanUp(); } -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -29,13 +29,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe pTq->pVnode = pVnode; pTq->pWal = pWal; pTq->pVnodeMeta = pVnodeMeta; -#if 0 - pTq->tqMemRef.pAllocatorFactory = allocFac; - pTq->tqMemRef.pAllocator = allocFac->create(allocFac); - if (pTq->tqMemRef.pAllocator == NULL) { - // TODO: error code of buffer pool - } -#endif pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0); if (pTq->tqMeta == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 09616a8969583abae05e152511ce886983a62823..6a2857513892a6bfb4850084a1a96b5180271fe6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -239,7 +239,7 @@ static void tsdbStartCommit(STsdb *pRepo) { static void tsdbEndCommit(STsdb *pTsdb, int eno) { tsdbEndFSTxn(pTsdb); - tsdbFreeMemTable(pTsdb, pTsdb->imem); + tsdbMemTableDestroy(pTsdb, pTsdb->imem); pTsdb->imem = NULL; tsdbInfo("vgId:%d commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c new file mode 100644 index 0000000000000000000000000000000000000000..b16f2f0fdbc647e68e32a831e4850ce1788448bb --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeInt.h" + +int tsdbBegin(STsdb *pTsdb) { + STsdbMemTable *pMem; + + if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) { + return -1; + } + + return 0; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index dd8723366dd7871363b9f870506961c9620ace17..df2ab558dae5d7fde53203092171084705b70982 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -15,94 +15,56 @@ #include "vnodeInt.h" -static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); -static void tsdbFree(STsdb *pTsdb); -static int tsdbOpenImpl(STsdb *pTsdb); -static void tsdbCloseImpl(STsdb *pTsdb); - -STsdb *tsdbOpen(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) { +int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) { STsdb *pTsdb = NULL; + int slen = 0; - // Set default TSDB Options - // if (pTsdbCfg == NULL) { - pTsdbCfg = &defautlTsdbOptions; - // } + *ppTsdb = NULL; + slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_TSDB_DIR) + 3; - // Validate the options - if (tsdbValidateOptions(pTsdbCfg) < 0) { - // TODO: handle error - return NULL; - } - - // Create the handle - pTsdb = tsdbNew(path, pVnode, pTsdbCfg, pMAF); + // create handle + pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen); if (pTsdb == NULL) { - // TODO: handle error - return NULL; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - taosMkDir(path); - - // Open the TSDB - if (tsdbOpenImpl(pTsdb) < 0) { - // TODO: handle error - return NULL; - } + pTsdb->path = (char *)&pTsdb[1]; + sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, + VNODE_TSDB_DIR); + pTsdb->pVnode = pVnode; + pTsdb->vgId = TD_VID(pVnode); + pTsdb->repoLocked = false; + tdbMutexInit(&pTsdb->mutex, NULL); + pTsdb->config = pVnode->config.tsdbCfg; + pTsdb->fs = tsdbNewFS(&pTsdb->config); - return pTsdb; -} + // create dir (TODO: use tfsMkdir) + taosMkDir(pTsdb->path); -void tsdbClose(STsdb *pTsdb) { - if (pTsdb) { - tsdbCloseImpl(pTsdb); - tsdbFree(pTsdb); + // open tsdb + if (tsdbOpenFS(pTsdb) < 0) { + goto _err; } -} - -/* ------------------------ STATIC METHODS ------------------------ */ -static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) { - STsdb *pTsdb = NULL; - pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb)); - if (pTsdb == NULL) { - // TODO: handle error - return NULL; - } + tsdbDebug("vgId: %d tsdb is opened", TD_VID(pVnode)); - pTsdb->path = strdup(path); - pTsdb->vgId = TD_VID(pVnode); - pTsdb->pVnode = pVnode; - tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg); - pTsdb->pmaf = pMAF; - pTsdb->fs = tsdbNewFS(pTsdbCfg); + *ppTsdb = pTsdb; + return 0; - return pTsdb; +_err: + taosMemoryFree(pTsdb); + return -1; } -static void tsdbFree(STsdb *pTsdb) { +int tsdbClose(STsdb *pTsdb) { if (pTsdb) { - // tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb)); - // tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb)); - tsdbFreeFS(pTsdb->fs); - taosMemoryFreeClear(pTsdb->path); + tsdbCloseFS(pTsdb); taosMemoryFree(pTsdb); } -} - -static int tsdbOpenImpl(STsdb *pTsdb) { - tsdbOpenFS(pTsdb); - - // tsdbInitSma(pTsdb); - // TODO - return 0; } -static void tsdbCloseImpl(STsdb *pTsdb) { - tsdbCloseFS(pTsdb); - // TODO -} - int tsdbLockRepo(STsdb *pTsdb) { int code = taosThreadMutexLock(&pTsdb->mutex); if (code != 0) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index bea2b66af8ad94a9129489c073940077d270c37b..dae70d955c17784e6b09b5c15e28ac940543ffdc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -24,51 +24,46 @@ static int tsdbTbDataComp(const void *arg1, const void *arg2); static char *tsdbTbDataGetUid(const void *arg); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row); -STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { - STsdbMemTable *pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); +int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { + STsdbMemTable *pMemTable; + SVnode *pVnode; + + *ppMemTable = NULL; + pVnode = pTsdb->pVnode; + + // alloc handle + pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); if (pMemTable == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } + pMemTable->pPool = pTsdb->pVnode->inUse; T_REF_INIT_VAL(pMemTable, 1); - taosInitRWLatch(&(pMemTable->latch)); - pMemTable->keyMax = TSKEY_MIN; + taosInitRWLatch(&pMemTable->latch); pMemTable->keyMin = TSKEY_MAX; + pMemTable->keyMax = TSKEY_MIN; pMemTable->nRow = 0; - pMemTable->pMA = pTsdb->pmaf->create(pTsdb->pmaf); - if (pMemTable->pMA == NULL) { - taosMemoryFree(pMemTable); - return NULL; - } - - // Initialize the container - pMemTable->pSlIdx = - tSkipListCreate(5, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid); + pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), + tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid); if (pMemTable->pSlIdx == NULL) { - pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); taosMemoryFree(pMemTable); - return NULL; + return -1; } pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pMemTable->pHashIdx == NULL) { - pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); tSkipListDestroy(pMemTable->pSlIdx); taosMemoryFree(pMemTable); - return NULL; + return -1; } - return pMemTable; + return 0; } -void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) { +void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) { if (pMemTable) { taosHashCleanup(pMemTable->pHashIdx); tSkipListDestroy(pMemTable->pSlIdx); - if (pMemTable->pMA) { - pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); - } taosMemoryFree(pMemTable); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 910b5adc963559f712868937abfe1d5ad160b526..2b34714e11a2c2f30a390d962fd2d3d95289faea 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -25,11 +25,11 @@ */ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { // Check if mem is there. If not, create one. - if (pTsdb->mem == NULL) { - pTsdb->mem = tsdbNewMemTable(pTsdb); - if (pTsdb->mem == NULL) { - return -1; - } - } + // if (pTsdb->mem == NULL) { + // pTsdb->mem = tsdbMemTableCreate(pTsdb); + // if (pTsdb->mem == NULL) { + // return -1; + // } + // } return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, pRsp); } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeBufferPool2.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c similarity index 91% rename from source/dnode/vnode/src/vnd/vnodeBufferPool2.c rename to source/dnode/vnode/src/vnd/vnodeBufPool.c index d63c86734a4573112fd449dc8cb23e6a8f91af7e..0125cdc82f9432c1f33482f0609f73c2daf53014 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufferPool2.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -17,7 +17,7 @@ /* ------------------------ STRUCTURES ------------------------ */ -static int vnodeBufPoolCreate(int size, SVBufPool **ppPool); +static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool); static int vnodeBufPoolDestroy(SVBufPool *pPool); int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { @@ -30,17 +30,17 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { // create pool ret = vnodeBufPoolCreate(size, &pPool); if (ret < 0) { - vError("vgId:%d failed to open vnode buffer pool since %s", TD_VNODE_ID(pVnode), tstrerror(terrno)); + vError("vgId:%d failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); return -1; } - // add pool to queue + // add pool to vnode pPool->next = pVnode->pPool; pVnode->pPool = pPool; } - vDebug("vgId:%d vnode buffer pool is opened, pool size: %" PRId64, TD_VNODE_ID(pVnode), size); + vDebug("vgId:%d vnode buffer pool is opened, pool size: %" PRId64, TD_VID(pVnode), size); return 0; } @@ -53,7 +53,7 @@ int vnodeCloseBufPool(SVnode *pVnode) { vnodeBufPoolDestroy(pPool); } - vDebug("vgId:%d vnode buffer pool is closed", TD_VNODE_ID(pVnode)); + vDebug("vgId:%d vnode buffer pool is closed", TD_VID(pVnode)); return 0; } @@ -75,7 +75,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) { pPool->ptr = pPool->node.data; } -void *vnodeBufPoolMalloc(SVBufPool *pPool, size_t size) { +void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { SVBufPoolNode *pNode; void *p; @@ -120,7 +120,7 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) { } // STATIC METHODS ------------------- -static int vnodeBufPoolCreate(int size, SVBufPool **ppPool) { +static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) { SVBufPool *pPool; pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 98921a9efe83245dcf83fe51fa54ac853f41de46..4f9631979cf5623016aeab8d99146f8f712c7235 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -21,10 +21,8 @@ const SVnodeCfg vnodeCfgDefault = { .dbId = 0, .szPage = 4096, .szCache = 256, - .wsize = 96 * 1024 * 1024, - .ssize = 1 * 1024 * 1024, - .lsize = 1024, - .isHeapAllocator = false, + .szBuf = 96 * 1024 * 1024, + .isHeap = false, .ttl = 0, .keep = 0, .streamMode = 0, @@ -32,6 +30,7 @@ const SVnodeCfg vnodeCfgDefault = { .tsdbCfg = {.precision = TWO_STAGE_COMP, .update = 0, .compression = 2, + .slLevel = 5, .days = 10, .minRows = 100, .maxRows = 4096, @@ -57,10 +56,8 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "szPage", pCfg->szPage) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "szCache", pCfg->szCache) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "wsize", pCfg->wsize) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "ssize", pCfg->ssize) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "lsize", pCfg->lsize) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "ttl", pCfg->ttl) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep", pCfg->keep) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "streamMode", pCfg->streamMode) < 0) return -1; @@ -68,6 +65,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "slLevel", pCfg->tsdbCfg.slLevel) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1; @@ -97,10 +95,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1; if (tjsonGetNumberValue(pJson, "szPage", pCfg->szPage) < 0) return -1; if (tjsonGetNumberValue(pJson, "szCache", pCfg->szCache) < 0) return -1; - if (tjsonGetNumberValue(pJson, "wsize", pCfg->wsize) < 0) return -1; - if (tjsonGetNumberValue(pJson, "ssize", pCfg->ssize) < 0) return -1; - if (tjsonGetNumberValue(pJson, "lsize", pCfg->lsize) < 0) return -1; - if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1; + if (tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf) < 0) return -1; + if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap) < 0) return -1; if (tjsonGetNumberValue(pJson, "ttl", pCfg->ttl) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep", pCfg->keep) < 0) return -1; if (tjsonGetNumberValue(pJson, "streamMode", pCfg->streamMode) < 0) return -1; @@ -108,6 +104,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; + if (tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel) < 0) return -1; if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1; if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1; if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 61480a7b0bcea2c2b2bfcc3b1291d29b592639b0..8b2f22f1bac68d4b3b0f2658f26a7d84f410a74b 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -26,7 +26,19 @@ static int vnodeCommit(void *arg); static void vnodeWaitCommit(SVnode *pVnode); int vnodeBegin(SVnode *pVnode) { - // begin buffer pool + // alloc buffer pool + /* pthread_mutex_lock(); */ + + while (pVnode->pPool == NULL) { + /* pthread_cond_wait(); */ + } + + pVnode->inUse = pVnode->pPool; + pVnode->pPool = pVnode->inUse->next; + pVnode->inUse->next = NULL; + /* ref pVnode->inUse buffer pool */ + + /* pthread_mutex_unlock(); */ // begin meta if (metaBegin(pVnode->pMeta) < 0) { @@ -35,12 +47,10 @@ int vnodeBegin(SVnode *pVnode) { } // begin tsdb -#if 0 if (tsdbBegin(pVnode->pTsdb) < 0) { vError("vgId: %d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } -#endif return 0; } @@ -162,7 +172,7 @@ _err: int vnodeAsyncCommit(SVnode *pVnode) { vnodeWaitCommit(pVnode); - vnodeBufPoolSwitch(pVnode); + // vnodeBufPoolSwitch(pVnode); tsdbPrepareCommit(pVnode->pTsdb); vnodeScheduleTask(vnodeCommit, pVnode); @@ -184,7 +194,7 @@ static int vnodeCommit(void *arg) { tqCommit(pVnode->pTq); tsdbCommit(pVnode->pTsdb); - vnodeBufPoolRecycle(pVnode); + // vnodeBufPoolRecycle(pVnode); tsem_post(&(pVnode->canCommit)); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 8f32f30824587b87c5db8a875236c5e07fe87afe..dcb1af8963b449d21a426c937cdaecc4b5e39ebf 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -83,7 +83,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { tsem_init(&(pVnode->canCommit), 0, 1); // open buffer pool - if (vnodeOpenBufPool(pVnode) < 0) { + if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) { vError("vgId: %d failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } @@ -95,9 +95,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { } // open tsdb - sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TSDB_DIR); - pVnode->pTsdb = tsdbOpen(tdir, pVnode, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode)); - if (pVnode->pTsdb == NULL) { + if (tsdbOpen(pVnode, &pVnode->pTsdb) < 0) { vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } @@ -112,7 +110,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open tq sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); - pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta); if (pVnode->pTq == NULL) { vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 01241ec6864fd11d91797b4736a15b318f99cbac..fb611efad8de47d4a1f3fb0d2f475e0f70eccab6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -46,7 +46,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg int ret; if (pVnode->config.streamMode == 0) { - ptr = vnodeMalloc(pVnode, pMsg->contLen); + // ptr = vnodeMalloc(pVnode, pMsg->contLen); if (ptr == NULL) { // TODO: handle error } @@ -125,7 +125,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pVnode->state.applied = version; // Check if it needs to commit - if (vnodeShouldCommit(pVnode)) { + if (0 /*vnodeShouldCommit(pVnode)*/) { // tsem_wait(&(pVnode->canCommit)); if (vnodeAsyncCommit(pVnode) < 0) { // TODO: handle error