diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 763ff61e80bbe8de86f88516d19bd6ad9cc8e030..e2a802d38cc7e39f7d7661833a581baf427e6623 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1562,8 +1562,8 @@ typedef struct SVCreateTbReq { int8_t type; union { struct { - tb_uid_t suid; - const void* pTag; + tb_uid_t suid; + const uint8_t* pTag; } ctb; struct { SSchemaWrapper schema; @@ -2593,12 +2593,12 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { #define TD_AUTO_CREATE_TABLE 0x1 typedef struct { - int64_t suid; - int64_t uid; - int32_t sver; - uint64_t nData; - const void* pData; - SVCreateTbReq cTbReq; + int64_t suid; + int64_t uid; + int32_t sver; + uint64_t nData; + const uint8_t* pData; + SVCreateTbReq cTbReq; } SVSubmitBlk; typedef struct { diff --git a/include/util/tencode.h b/include/util/tencode.h index b081d1a157369ad244fed870b126901f1009374a..e5962b5c728fd5a98d5e61ca64df086189e738ff 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -138,7 +138,7 @@ static int32_t tEncodeU64v(SCoder* pEncoder, uint64_t val); static int32_t tEncodeI64v(SCoder* pEncoder, int64_t val); static int32_t tEncodeFloat(SCoder* pEncoder, float val); static int32_t tEncodeDouble(SCoder* pEncoder, double val); -static int32_t tEncodeBinary(SCoder* pEncoder, const void* val, uint64_t len); +static int32_t tEncodeBinary(SCoder* pEncoder, const uint8_t* val, uint64_t len); static int32_t tEncodeCStrWithLen(SCoder* pEncoder, const char* val, uint64_t len); static int32_t tEncodeCStr(SCoder* pEncoder, const char* val); @@ -162,7 +162,7 @@ static int32_t tDecodeU64v(SCoder* pDecoder, uint64_t* val); static int32_t tDecodeI64v(SCoder* pDecoder, int64_t* val); static int32_t tDecodeFloat(SCoder* pDecoder, float* val); static int32_t tDecodeDouble(SCoder* pDecoder, double* val); -static int32_t tDecodeBinary(SCoder* pDecoder, const void** val, uint64_t* len); +static int32_t tDecodeBinary(SCoder* pDecoder, const uint8_t** val, uint64_t* len); static int32_t tDecodeCStrAndLen(SCoder* pDecoder, const char** val, uint64_t* len); static int32_t tDecodeCStr(SCoder* pDecoder, const char** val); static int32_t tDecodeCStrTo(SCoder* pDecoder, char* val); @@ -292,7 +292,7 @@ static FORCE_INLINE int32_t tEncodeDouble(SCoder* pEncoder, double val) { return tEncodeU64(pEncoder, v.ui); } -static FORCE_INLINE int32_t tEncodeBinary(SCoder* pEncoder, const void* val, uint64_t len) { +static FORCE_INLINE int32_t tEncodeBinary(SCoder* pEncoder, const uint8_t* val, uint64_t len) { if (tEncodeU64v(pEncoder, len) < 0) return -1; if (pEncoder->data) { if (TD_CODER_CHECK_CAPACITY_FAILED(pEncoder, len)) return -1; @@ -413,7 +413,7 @@ static FORCE_INLINE int32_t tDecodeDouble(SCoder* pDecoder, double* val) { return 0; } -static FORCE_INLINE int32_t tDecodeBinary(SCoder* pDecoder, const void** val, uint64_t* len) { +static FORCE_INLINE int32_t tDecodeBinary(SCoder* pDecoder, const uint8_t** val, uint64_t* len) { if (tDecodeU64v(pDecoder, len) < 0) return -1; if (TD_CODER_CHECK_CAPACITY_FAILED(pDecoder, *len)) return -1; @@ -426,7 +426,7 @@ static FORCE_INLINE int32_t tDecodeBinary(SCoder* pDecoder, const void** val, ui } static FORCE_INLINE int32_t tDecodeCStrAndLen(SCoder* pDecoder, const char** val, uint64_t* len) { - if (tDecodeBinary(pDecoder, (const void**)val, len) < 0) return -1; + if (tDecodeBinary(pDecoder, (const uint8_t**)val, len) < 0) return -1; (*len) -= 1; return 0; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 488edc4f25613ee2f96e7133f01e6a8a67b233aa..db8cb2f0b448722726decdb6c6276164438456a1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -90,7 +90,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur); // tsdb // typedef struct STsdb STsdb; -typedef void *tsdbReaderT; +typedef void *tsdbReaderT; #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 @@ -108,12 +108,12 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); -int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool* allHave); -SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); -void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); -void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); -int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); -int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo); +int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); +SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); +void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); +void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); +int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); +int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo); // tq @@ -126,8 +126,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); -int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t* pUid, int32_t *pNumOfRows, - int16_t *pNumOfCols); +int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid, + int32_t *pNumOfRows, int16_t *pNumOfCols); // need to reposition @@ -189,10 +189,10 @@ struct SMetaEntry { SSchemaWrapper schemaTag; } stbEntry; struct { - int64_t ctime; - int32_t ttlDays; - tb_uid_t suid; - const void *pTags; + int64_t ctime; + int32_t ttlDays; + tb_uid_t suid; + const uint8_t *pTags; } ctbEntry; struct { int64_t ctime; diff --git a/source/dnode/vnode/src/meta/metaTDBImpl.c b/source/dnode/vnode/src/meta/metaTDBImpl.c deleted file mode 100644 index cb556e8630a3d01d6d98d8c990b48f5a43adf5e7..0000000000000000000000000000000000000000 --- a/source/dnode/vnode/src/meta/metaTDBImpl.c +++ /dev/null @@ -1,843 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeInt.h" - -#ifndef META_REFACT - -typedef struct SPoolMem { - int64_t size; - struct SPoolMem *prev; - struct SPoolMem *next; -} SPoolMem; - -#define META_TDB_SMA_TEST - -static SPoolMem *openPool(); -static void clearPool(SPoolMem *pPool); -static void closePool(SPoolMem *pPool); -static void *poolMalloc(void *arg, size_t size); -static void poolFree(void *arg, void *ptr); - -struct SMetaDB { - TXN txn; - TENV *pEnv; - TDB *pTbDB; - TDB *pSchemaDB; - TDB *pNameIdx; - TDB *pStbIdx; - TDB *pNtbIdx; - TDB *pCtbIdx; - SPoolMem *pPool; -#ifdef META_TDB_SMA_TEST - TDB *pSmaDB; - TDB *pSmaIdx; -#endif -}; - -#pragma pack(push, 1) -typedef struct { - tb_uid_t uid; - int32_t sver; -} SSchemaDbKey; -#pragma pack(pop) - -typedef struct { - char *name; - tb_uid_t uid; -} SNameIdxKey; - -typedef struct { - tb_uid_t suid; - tb_uid_t uid; -} SCtbIdxKey; - -typedef struct { - tb_uid_t uid; - int64_t smaUid; -} SSmaIdxKey; - -static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); -static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); -static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); -static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW); -static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW); -static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx); - -static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx); - -static inline int metaUidCmpr(const void *arg1, int len1, const void *arg2, int len2) { - tb_uid_t uid1, uid2; - - ASSERT(len1 == sizeof(tb_uid_t)); - ASSERT(len2 == sizeof(tb_uid_t)); - - uid1 = ((tb_uid_t *)arg1)[0]; - uid2 = ((tb_uid_t *)arg2)[0]; - - if (uid1 < uid2) { - return -1; - } - if (uid1 == uid2) { - return 0; - } else { - return 1; - } -} - -static inline int metaSchemaKeyCmpr(const void *arg1, int len1, const void *arg2, int len2) { - int c; - SSchemaDbKey *pKey1 = (SSchemaDbKey *)arg1; - SSchemaDbKey *pKey2 = (SSchemaDbKey *)arg2; - - c = metaUidCmpr(arg1, sizeof(tb_uid_t), arg2, sizeof(tb_uid_t)); - if (c) return c; - - if (pKey1->sver > pKey2->sver) { - return 1; - } else if (pKey1->sver == pKey2->sver) { - return 0; - } else { - return -1; - } -} - -static inline int metaNameIdxCmpr(const void *arg1, int len1, const void *arg2, int len2) { - return strcmp((char *)arg1, (char *)arg2); -} - -static inline int metaCtbIdxCmpr(const void *arg1, int len1, const void *arg2, int len2) { - int c; - SCtbIdxKey *pKey1 = (SCtbIdxKey *)arg1; - SCtbIdxKey *pKey2 = (SCtbIdxKey *)arg2; - - c = metaUidCmpr(arg1, sizeof(tb_uid_t), arg2, sizeof(tb_uid_t)); - if (c) return c; - - return metaUidCmpr(&pKey1->uid, sizeof(tb_uid_t), &pKey2->uid, sizeof(tb_uid_t)); -} - -static inline int metaSmaIdxCmpr(const void *arg1, int len1, const void *arg2, int len2) { - int c; - SSmaIdxKey *pKey1 = (SSmaIdxKey *)arg1; - SSmaIdxKey *pKey2 = (SSmaIdxKey *)arg2; - - c = metaUidCmpr(arg1, sizeof(tb_uid_t), arg2, sizeof(tb_uid_t)); - if (c) return c; - - return metaUidCmpr(&pKey1->smaUid, sizeof(int64_t), &pKey2->smaUid, sizeof(int64_t)); -} - -int metaOpenDB(SMeta *pMeta) { - SMetaDB *pMetaDb; - int ret; - - // allocate DB handle - pMetaDb = taosMemoryCalloc(1, sizeof(*pMetaDb)); - if (pMetaDb == NULL) { - // TODO - ASSERT(0); - return -1; - } - - // open the ENV - ret = tdbEnvOpen(pMeta->path, 4096, 256, &(pMetaDb->pEnv)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } - - // open table DB - ret = tdbDbOpen("table.db", sizeof(tb_uid_t), TDB_VARIANT_LEN, metaUidCmpr, pMetaDb->pEnv, &(pMetaDb->pTbDB)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } - -#ifdef META_TDB_SMA_TEST - ret = tdbDbOpen("sma.db", sizeof(int64_t), TDB_VARIANT_LEN, metaUidCmpr, pMetaDb->pEnv, &(pMetaDb->pSmaDB)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } -#endif - - // open schema DB - ret = tdbDbOpen("schema.db", sizeof(SSchemaDbKey), TDB_VARIANT_LEN, metaSchemaKeyCmpr, pMetaDb->pEnv, - &(pMetaDb->pSchemaDB)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } - - ret = tdbDbOpen("name.idx", TDB_VARIANT_LEN, 0, metaNameIdxCmpr, pMetaDb->pEnv, &(pMetaDb->pNameIdx)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } - - ret = tdbDbOpen("stb.idx", sizeof(tb_uid_t), 0, metaUidCmpr, pMetaDb->pEnv, &(pMetaDb->pStbIdx)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } - - ret = tdbDbOpen("ntb.idx", sizeof(tb_uid_t), 0, metaUidCmpr, pMetaDb->pEnv, &(pMetaDb->pNtbIdx)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } - - ret = tdbDbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, metaCtbIdxCmpr, pMetaDb->pEnv, &(pMetaDb->pCtbIdx)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } - -#ifdef META_TDB_SMA_TEST - ret = tdbDbOpen("sma.idx", sizeof(SSmaIdxKey), 0, metaSmaIdxCmpr, pMetaDb->pEnv, &(pMetaDb->pSmaIdx)); - if (ret < 0) { - // TODO - ASSERT(0); - return -1; - } -#endif - - pMetaDb->pPool = openPool(); - tdbTxnOpen(&pMetaDb->txn, 0, poolMalloc, poolFree, pMetaDb->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pMetaDb->pEnv, NULL); - - pMeta->pDB = pMetaDb; - return 0; -} - -void metaCloseDB(SMeta *pMeta) { - if (pMeta->pDB) { - tdbCommit(pMeta->pDB->pEnv, &pMeta->pDB->txn); - tdbTxnClose(&pMeta->pDB->txn); - clearPool(pMeta->pDB->pPool); -#ifdef META_TDB_SMA_TEST - tdbDbClose(pMeta->pDB->pSmaIdx); -#endif - tdbDbClose(pMeta->pDB->pCtbIdx); - tdbDbClose(pMeta->pDB->pNtbIdx); - tdbDbClose(pMeta->pDB->pStbIdx); - tdbDbClose(pMeta->pDB->pNameIdx); -#ifdef META_TDB_SMA_TEST - tdbDbClose(pMeta->pDB->pSmaDB); -#endif - tdbDbClose(pMeta->pDB->pSchemaDB); - tdbDbClose(pMeta->pDB->pTbDB); - taosMemoryFree(pMeta->pDB); - } -} - -int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { - tb_uid_t uid; - SMetaDB *pMetaDb; - void *pKey; - void *pVal; - int kLen; - int vLen; - int ret; - char buf[512]; - void *pBuf; - SCtbIdxKey ctbIdxKey; - SSchemaDbKey schemaDbKey; - SSchemaWrapper schemaWrapper; - - pMetaDb = pMeta->pDB; - - // TODO: make this operation pre-process - if (pTbCfg->type == META_SUPER_TABLE) { - uid = pTbCfg->stbCfg.suid; - } else { - uid = metaGenerateUid(pMeta); - } - - // check name and uid unique - if (tdbDbGet(pMetaDb->pTbDB, &uid, sizeof(uid), NULL, NULL) == 0) { - return -1; - } - if (tdbDbGet(pMetaDb->pNameIdx, pTbCfg->name, strlen(pTbCfg->name) + 1, NULL, NULL) == 0) { - return -1; - } - - // save to table.db - pKey = &uid; - kLen = sizeof(uid); - pVal = pBuf = buf; - metaEncodeTbInfo(&pBuf, pTbCfg); - vLen = POINTER_DISTANCE(pBuf, buf); - ret = tdbDbInsert(pMetaDb->pTbDB, pKey, kLen, pVal, vLen, &pMetaDb->txn); - if (ret < 0) { - return -1; - } - - // save to schema.db for META_SUPER_TABLE and META_NORMAL_TABLE - if (pTbCfg->type != META_CHILD_TABLE) { - schemaDbKey.uid = uid; - schemaDbKey.sver = 0; // TODO - pKey = &schemaDbKey; - kLen = sizeof(schemaDbKey); - - if (pTbCfg->type == META_SUPER_TABLE) { - schemaWrapper.nCols = pTbCfg->stbCfg.nCols; - schemaWrapper.pSchema = pTbCfg->stbCfg.pSchema; - } else { - schemaWrapper.nCols = pTbCfg->ntbCfg.nCols; - schemaWrapper.pSchema = pTbCfg->ntbCfg.pSchema; - } - pVal = pBuf = buf; - metaEncodeSchemaEx(&pBuf, &schemaWrapper); - vLen = POINTER_DISTANCE(pBuf, buf); - ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen, &pMeta->pDB->txn); - if (ret < 0) { - return -1; - } - } - - // update name.idx - int nameLen = strlen(pTbCfg->name); - memcpy(buf, pTbCfg->name, nameLen + 1); - ((tb_uid_t *)(buf + nameLen + 1))[0] = uid; - pKey = buf; - kLen = nameLen + 1 + sizeof(uid); - pVal = NULL; - vLen = 0; - ret = tdbDbInsert(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); - if (ret < 0) { - return -1; - } - - // update other index - if (pTbCfg->type == META_SUPER_TABLE) { - pKey = &uid; - kLen = sizeof(uid); - pVal = NULL; - vLen = 0; - ret = tdbDbInsert(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); - if (ret < 0) { - return -1; - } - } else if (pTbCfg->type == META_CHILD_TABLE) { - ctbIdxKey.suid = pTbCfg->ctbCfg.suid; - ctbIdxKey.uid = uid; - pKey = &ctbIdxKey; - kLen = sizeof(ctbIdxKey); - pVal = NULL; - vLen = 0; - ret = tdbDbInsert(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); - if (ret < 0) { - return -1; - } - // child table handle for rsma - if (pHandle && pHandle->fp) { - if (((*pHandle->fp)(pHandle->ahandle, &pHandle->result, &ctbIdxKey.suid, &uid)) < 0) { - return -1; - }; - } - } else if (pTbCfg->type == META_NORMAL_TABLE) { - pKey = &uid; - kLen = sizeof(uid); - pVal = NULL; - vLen = 0; - ret = tdbDbInsert(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); - if (ret < 0) { - return -1; - } - } - - if (pMeta->pDB->pPool->size > 0) { - metaCommit(pMeta); - } - - return 0; -} - -int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { - // TODO - ASSERT(0); - return 0; -} - -static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) { - void *pKey; - void *pVal; - int kLen; - int vLen; - int ret; - SSchemaDbKey schemaDbKey; - SSchemaWrapper *pSchemaWrapper; - void *pBuf; - - // fetch - schemaDbKey.uid = uid; - schemaDbKey.sver = sver; - pKey = &schemaDbKey; - kLen = sizeof(schemaDbKey); - pVal = NULL; - ret = tdbDbGet(pMeta->pDB->pSchemaDB, pKey, kLen, &pVal, &vLen); - if (ret < 0) { - return NULL; - } - - // decode - pBuf = pVal; - pSchemaWrapper = taosMemoryMalloc(sizeof(*pSchemaWrapper)); - metaDecodeSchemaEx(pBuf, pSchemaWrapper, isGetEx); - - tdbFree(pVal); - - return pSchemaWrapper; -} - -struct SMSmaCursor { - TDBC *pCur; - tb_uid_t uid; - void *pKey; - void *pVal; - int kLen; - int vLen; -}; - -STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { - // TODO - // ASSERT(0); - // return NULL; -#ifdef META_TDB_SMA_TEST - STSmaWrapper *pSW = NULL; - - SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); - if (pCur == NULL) { - return NULL; - } - - void *pBuf = NULL; - SSmaIdxKey *pSmaIdxKey = NULL; - - while (true) { - // TODO: lock during iterate? - if (tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) { - pSmaIdxKey = pCur->pKey; - ASSERT(pSmaIdxKey != NULL); - - void *pSmaVal = metaGetSmaInfoByIndex(pMeta, pSmaIdxKey->smaUid, false); - - if (pSmaVal == NULL) { - tsdbWarn("no tsma exists for indexUid: %" PRIi64, pSmaIdxKey->smaUid); - continue; - } - - if ((pSW == NULL) && ((pSW = taosMemoryCalloc(1, sizeof(*pSW))) == NULL)) { - tdbFree(pSmaVal); - metaCloseSmaCursor(pCur); - return NULL; - } - - ++pSW->number; - STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma)); - if (tptr == NULL) { - tdbFree(pSmaVal); - metaCloseSmaCursor(pCur); - tdDestroyTSmaWrapper(pSW); - taosMemoryFreeClear(pSW); - return NULL; - } - pSW->tSma = tptr; - pBuf = pSmaVal; - if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) { - tdbFree(pSmaVal); - metaCloseSmaCursor(pCur); - tdDestroyTSmaWrapper(pSW); - taosMemoryFreeClear(pSW); - return NULL; - } - tdbFree(pSmaVal); - continue; - } - break; - } - - metaCloseSmaCursor(pCur); - - return pSW; - -#endif -} - -int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) { - // TODO - ASSERT(0); -#ifndef META_TDB_SMA_TEST - DBT key = {0}; - - key.data = (void *)indexName; - key.size = strlen(indexName); - - metaDBWLock(pMeta->pDB); - // TODO: No guarantee of consistence. - // Use transaction or DB->sync() for some guarantee. - pMeta->pDB->pSmaDB->del(pMeta->pDB->pSmaDB, NULL, &key, 0); - metaDBULock(pMeta->pDB); -#endif - return 0; -} - -int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { - // TODO - // ASSERT(0); - -#ifdef META_TDB_SMA_TEST - int32_t ret = 0; - SMetaDB *pMetaDb = pMeta->pDB; - void *pBuf = NULL, *qBuf = NULL; - void *key = {0}, *val = {0}; - - // save sma info - int32_t len = tEncodeTSma(NULL, pSmaCfg); - pBuf = taosMemoryCalloc(1, len); - if (pBuf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - key = (void *)&pSmaCfg->indexUid; - qBuf = pBuf; - tEncodeTSma(&qBuf, pSmaCfg); - val = pBuf; - - int32_t kLen = sizeof(pSmaCfg->indexUid); - int32_t vLen = POINTER_DISTANCE(qBuf, pBuf); - - ret = tdbDbInsert(pMeta->pDB->pSmaDB, key, kLen, val, vLen, &pMetaDb->txn); - if (ret < 0) { - taosMemoryFreeClear(pBuf); - return -1; - } - - // add sma idx - SSmaIdxKey smaIdxKey; - smaIdxKey.uid = pSmaCfg->tableUid; - smaIdxKey.smaUid = pSmaCfg->indexUid; - key = &smaIdxKey; - kLen = sizeof(smaIdxKey); - val = NULL; - vLen = 0; - - ret = tdbDbInsert(pMeta->pDB->pSmaIdx, key, kLen, val, vLen, &pMetaDb->txn); - if (ret < 0) { - taosMemoryFreeClear(pBuf); - return -1; - } - - // release - taosMemoryFreeClear(pBuf); - - if (pMeta->pDB->pPool->size > 0) { - metaCommit(pMeta); - } - -#endif - return 0; -} - -/** - * @brief - * - * @param pMeta - * @param uid 0 means iterate all uids. - * @return SMSmaCursor* - */ -SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { - // TODO - // ASSERT(0); - // return NULL; -#ifdef META_TDB_SMA_TEST - SMSmaCursor *pCur = NULL; - SMetaDB *pDB = pMeta->pDB; - int ret; - - pCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pCur)); - if (pCur == NULL) { - return NULL; - } - - pCur->uid = uid; - ret = tdbDbcOpen(pDB->pSmaIdx, &(pCur->pCur)); - if ((ret != 0) || (pCur->pCur == NULL)) { - taosMemoryFree(pCur); - return NULL; - } - - if (uid != 0) { - // TODO: move to the specific uid - } - - return pCur; -#endif -} - -/** - * @brief - * - * @param pCur - * @return int64_t smaIndexUid - */ -int64_t metaSmaCursorNext(SMSmaCursor *pCur) { - // TODO - // ASSERT(0); - // return NULL; -#ifdef META_TDB_SMA_TEST - int ret; - void *pBuf; - SSmaIdxKey *smaIdxKey; - - ret = tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, &pCur->pVal, &pCur->vLen); - if (ret < 0) { - return 0; - } - smaIdxKey = pCur->pKey; - return smaIdxKey->smaUid; -#endif -} - -void metaCloseSmaCursor(SMSmaCursor *pCur) { - // TODO - // ASSERT(0); -#ifdef META_TDB_SMA_TEST - if (pCur) { - if (pCur->pCur) { - tdbDbcClose(pCur->pCur); - } - - taosMemoryFree(pCur); - } -#endif -} - -static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) { - int tlen = 0; - SSchema *pSchema; - - tlen += taosEncodeFixedU32(buf, pSW->nCols); - for (int i = 0; i < pSW->nCols; i++) { - pSchema = pSW->pSchema + i; - tlen += taosEncodeFixedI8(buf, pSchema->type); - tlen += taosEncodeFixedI8(buf, pSchema->flags); - tlen += taosEncodeFixedI16(buf, pSchema->colId); - tlen += taosEncodeFixedI32(buf, pSchema->bytes); - tlen += taosEncodeString(buf, pSchema->name); - } - - return tlen; -} - -static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) { - SSchema *pSchema; - - buf = taosDecodeFixedU32(buf, &pSW->nCols); - pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols); - for (int i = 0; i < pSW->nCols; i++) { - pSchema = pSW->pSchema + i; - buf = taosDecodeFixedI8(buf, &pSchema->type); - buf = taosSkipFixedLen(buf, sizeof(int8_t)); - buf = taosDecodeFixedI16(buf, &pSchema->colId); - buf = taosDecodeFixedI32(buf, &pSchema->bytes); - buf = taosDecodeStringTo(buf, pSchema->name); - } - - return buf; -} - -static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) { - int tlen = 0; - SSchema *pSchema; - - tlen += taosEncodeFixedU32(buf, pSW->nCols); - for (int i = 0; i < pSW->nCols; ++i) { - pSchema = pSW->pSchema + i; - tlen += taosEncodeFixedI8(buf, pSchema->type); - tlen += taosEncodeFixedI8(buf, pSchema->flags); - tlen += taosEncodeFixedI16(buf, pSchema->colId); - tlen += taosEncodeFixedI32(buf, pSchema->bytes); - tlen += taosEncodeString(buf, pSchema->name); - } - - return tlen; -} - -static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) { - buf = taosDecodeFixedU32(buf, &pSW->nCols); - if (isGetEx) { - pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols); - for (int i = 0; i < pSW->nCols; i++) { - SSchema *pSchema = pSW->pSchema + i; - buf = taosDecodeFixedI8(buf, &pSchema->type); - buf = taosDecodeFixedI8(buf, &pSchema->flags); - buf = taosDecodeFixedI16(buf, &pSchema->colId); - buf = taosDecodeFixedI32(buf, &pSchema->bytes); - buf = taosDecodeStringTo(buf, pSchema->name); - } - } else { - pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols); - for (int i = 0; i < pSW->nCols; i++) { - SSchema *pSchema = pSW->pSchema + i; - buf = taosDecodeFixedI8(buf, &pSchema->type); - buf = taosSkipFixedLen(buf, sizeof(int8_t)); - buf = taosDecodeFixedI16(buf, &pSchema->colId); - buf = taosDecodeFixedI32(buf, &pSchema->bytes); - buf = taosDecodeStringTo(buf, pSchema->name); - } - } - - return buf; -} - -static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { - int tsize = 0; - - tsize += taosEncodeString(buf, pTbCfg->name); - tsize += taosEncodeFixedU32(buf, pTbCfg->ttl); - tsize += taosEncodeFixedU32(buf, pTbCfg->keep); - tsize += taosEncodeFixedU8(buf, pTbCfg->info); - - if (pTbCfg->type == META_SUPER_TABLE) { - SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema}; - tsize += metaEncodeSchema(buf, &sw); - } else if (pTbCfg->type == META_CHILD_TABLE) { - tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid); - tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag); - } else if (pTbCfg->type == META_NORMAL_TABLE) { - // TODO - } else { - ASSERT(0); - } - - return tsize; -} - -static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { - buf = taosDecodeString(buf, &(pTbCfg->name)); - buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl)); - buf = taosDecodeFixedU32(buf, &(pTbCfg->keep)); - buf = taosDecodeFixedU8(buf, &(pTbCfg->info)); - - if (pTbCfg->type == META_SUPER_TABLE) { - SSchemaWrapper sw; - buf = metaDecodeSchema(buf, &sw); - pTbCfg->stbCfg.nTagCols = sw.nCols; - pTbCfg->stbCfg.pTagSchema = sw.pSchema; - } else if (pTbCfg->type == META_CHILD_TABLE) { - buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid)); - buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag)); - } else if (pTbCfg->type == META_NORMAL_TABLE) { - // TODO - } else { - ASSERT(0); - } - return buf; -} - -int metaCommit(SMeta *pMeta) { - TXN *pTxn = &pMeta->pDB->txn; - - // Commit current txn - tdbCommit(pMeta->pDB->pEnv, pTxn); - tdbTxnClose(pTxn); - clearPool(pMeta->pDB->pPool); - - // start a new txn - tdbTxnOpen(&pMeta->pDB->txn, 0, poolMalloc, poolFree, pMeta->pDB->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pMeta->pDB->pEnv, pTxn); - return 0; -} - -static SPoolMem *openPool() { - SPoolMem *pPool = (SPoolMem *)tdbOsMalloc(sizeof(*pPool)); - - pPool->prev = pPool->next = pPool; - pPool->size = 0; - - return pPool; -} - -static void clearPool(SPoolMem *pPool) { - SPoolMem *pMem; - - do { - pMem = pPool->next; - - if (pMem == pPool) break; - - pMem->next->prev = pMem->prev; - pMem->prev->next = pMem->next; - pPool->size -= pMem->size; - - tdbOsFree(pMem); - } while (1); - - assert(pPool->size == 0); -} - -static void closePool(SPoolMem *pPool) { - clearPool(pPool); - tdbOsFree(pPool); -} - -static void *poolMalloc(void *arg, size_t size) { - void *ptr = NULL; - SPoolMem *pPool = (SPoolMem *)arg; - SPoolMem *pMem; - - pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size); - if (pMem == NULL) { - assert(0); - } - - pMem->size = sizeof(*pMem) + size; - pMem->next = pPool->next; - pMem->prev = pPool; - - pPool->next->prev = pMem; - pPool->next = pMem; - pPool->size += pMem->size; - - ptr = (void *)(&pMem[1]); - return ptr; -} - -static void poolFree(void *arg, void *ptr) { - SPoolMem *pPool = (SPoolMem *)arg; - SPoolMem *pMem; - - pMem = &(((SPoolMem *)ptr)[-1]); - - pMem->next->prev = pMem->prev; - pMem->prev->next = pMem->next; - pPool->size -= pMem->size; - - tdbOsFree(pMem); -} - -#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index e7b733369aa7423165dbc5c3aa40b996308a5fb8..167836ebb4bf626e51b85535f01ab5d9e9dda8cb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -63,14 +63,18 @@ struct SMemSkipListCurosr { SMemSkipListNode *pNodeC; }; +#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET)) + #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) #define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) -#define SL_HEAD_NODE(sl) ((sl)->pHead) -#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel)) +#define SL_HEAD_NODE(sl) ((sl)->pHead) +#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel)) +#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l) +#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l) // SMemTable int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { @@ -111,23 +115,18 @@ int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) { } int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) { - SMemData *pMemData; - STsdb *pTsdb = pMemTb->pTsdb; - SVnode *pVnode = pTsdb->pVnode; - SVBufPool *pPool = pVnode->inUse; - int32_t hash; - int32_t tlen; - uint8_t buf[16]; - int32_t rlen; - const uint8_t *p; - SMemSkipListNode *pSlNode; - const STSRow *pTSRow; - SMemSkipListCurosr slc = {0}; - - // search hash - hash = (pSubmitBlk->suid + pSubmitBlk->uid) % pMemTb->nBucket; - for (pMemData = pMemTb->pBuckets[hash]; pMemData; pMemData = pMemData->pHashNext) { - if (pMemData->suid == pSubmitBlk->suid && pMemData->uid == pSubmitBlk->uid) break; + SMemData *pMemData; + STsdb *pTsdb = pMemTb->pTsdb; + SVnode *pVnode = pTsdb->pVnode; + SVBufPool *pPool = pVnode->inUse; + tb_uid_t suid = pSubmitBlk->suid; + tb_uid_t uid = pSubmitBlk->uid; + int32_t iBucket; + + // search SMemData by hash + iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket); + for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) { + if (pMemData->suid == suid && pMemData->uid == uid) break; } // create pMemData if need @@ -143,8 +142,8 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p } pMemData->pHashNext = NULL; - pMemData->suid = pSubmitBlk->suid; - pMemData->uid = pSubmitBlk->uid; + pMemData->suid = suid; + pMemData->uid = uid; pMemData->minKey = TSKEY_MAX; pMemData->maxKey = TSKEY_MIN; pMemData->minVer = -1; @@ -159,55 +158,61 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p pHead->level = maxLevel; pTail->level = maxLevel; for (int iLevel = 0; iLevel < maxLevel; iLevel++) { - SL_NODE_FORWARD(pHead, iLevel) = pTail; - SL_NODE_FORWARD(pTail, iLevel) = pHead; + SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail; + SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead; } - // add to MemTable - hash = (pMemData->suid + pMemData->uid) % pMemTb->nBucket; - pMemData->pHashNext = pMemTb->pBuckets[hash]; - pMemTb->pBuckets[hash] = pMemData; + // add to hash + if (pMemTb->nHash >= pMemTb->nBucket) { + // rehash (todo) + } + iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket); + pMemData->pHashNext = pMemTb->pBuckets[iBucket]; + pMemTb->pBuckets[iBucket] = pMemData; pMemTb->nHash++; + + // sort organize (todo) } -// loop to insert data to skiplist -#if 0 - tsdbMemSkipListCursorOpen(&slc, &pMemData->sl); - p = pSubmitBlk->pData; - for (;;) { - if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break; + // do insert data to SMemData + SMemSkipListCurosr slc = {0}; + const uint8_t *p = pSubmitBlk->pData; + + // tsdbMemSkipListCursorOpen(&slc, &pMemData->sl); + for (; p - pSubmitBlk->pData < pSubmitBlk->nData;) { + // if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break; - const uint8_t *pt = p; - p = tGetBinary(p, &pTSRow, &rlen); + // const uint8_t *pt = p; + // p = tGetBinary(p, &pTSRow, &rlen); - // check the row (todo) + // // check the row (todo) - // move the cursor to position to write (todo) - int32_t c; - tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c); - ASSERT(c); + // // move the cursor to position to write (todo) + // int32_t c; + // tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c); + // ASSERT(c); - // encode row - int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); - int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt); - pSlNode = vnodeBufPoolMalloc(pPool, tsize); - pSlNode->level = level; + // // encode row + // int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); + // int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt); + // pSlNode = vnodeBufPoolMalloc(pPool, tsize); + // pSlNode->level = level; - uint8_t *pData = SL_NODE_DATA(pSlNode); - *(int64_t *)pData = version; - pData += sizeof(version); - memcpy(pData, pt, p - pt); + // uint8_t *pData = SL_NODE_DATA(pSlNode); + // *(int64_t *)pData = version; + // pData += sizeof(version); + // memcpy(pData, pt, p - pt); - // insert row - tsdbMemSkipListCursorPut(&slc, pSlNode); + // // insert row + // tsdbMemSkipListCursorPut(&slc, pSlNode); - // update status - if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts; - if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts; + // // update status + // if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts; + // if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts; } - tsdbMemSkipListCursorClose(&slc); -#endif + // tsdbMemSkipListCursorClose(&slc); + // update status if (pMemData->minVer == -1) pMemData->minVer = version; if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version; @@ -217,8 +222,4 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version; return 0; -} - -// SMemData - -// SMemSkipList \ No newline at end of file +} \ No newline at end of file diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index bfa54be71f0831ba09201927c6b853560047570d..ed15aeb0387dc0ab34e06458111bbca257047b36 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -17,6 +17,6 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -#if(${BUILD_TEST}) +if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) -#endif(${BUILD_TEST}) +endif(${BUILD_TEST}) diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 2b1b40997e3afab54046e523356ea1f8a39c8373..703ef715b635ce51879248895acdef781951ff43 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -411,7 +411,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { } uint64_t len; char* data = NULL; - if (tDecodeBinary(&decoder, (const void**)(&data), &len) < 0) { + if (tDecodeBinary(&decoder, (const uint8_t**)(&data), &len) < 0) { return NULL; } assert(len = pMsg->dataLen); @@ -670,7 +670,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { } uint64_t len; char* data = NULL; - if (tDecodeBinary(&decoder, (const void**)(&data), &len) < 0) { + if (tDecodeBinary(&decoder, (const uint8_t**)(&data), &len) < 0) { return NULL; } assert(len = pMsg->dataLen);