diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index f1f96bfedd880466bea08d2e87ad8f22341f70bb..ef931ed3b1c52b7bdc9d12da77f3bdc8ad1f7837 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -61,9 +61,10 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow); // STag int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag); void tTagFree(STag *pTag); -void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nData); -int32_t tEncodeTag(SEncoder *pEncoder, STag *pTag); -int32_t tDecodeTag(SDecoder *pDecoder, const STag **ppTag); +int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag); +void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData); +int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); +int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); // STRUCT ================= struct STColumn { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e318978339316de794cb8d455f7e1f68a82800a1..0ba1d0c0f2c660aa2f573b35036269b73ef3199e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -313,6 +313,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519) #define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a) #define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b) +#define TSDB_CODE_VND_READ_END TAOS_DEF_ERROR_CODE(0, 0x051c) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f82df0d9bc5b670f12f896406a8b3df399a52a60..e8d7e3ac0933532a4ad4f55509df575d2eaa177b 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -581,7 +581,52 @@ void tTagFree(STag *pTag) { if (pTag) taosMemoryFree(pTag); } -void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nData) { +int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag) { + STagVal *pTagVals; + int16_t nTags = 0; + SSchema *pColumn; + uint8_t *p; + uint32_t n; + + pTagVals = (STagVal *)taosMemoryMalloc(sizeof(*pTagVals) * nCols); + if (pTagVals == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < nCols; i++) { + pColumn = &pSchema[i]; + + if (i == iCol) { + p = pData; + n = nData; + } else { + tTagGet(pTag, pColumn->colId, pColumn->type, &p, &n); + } + + if (p == NULL) continue; + + ASSERT(IS_VAR_DATA_TYPE(pColumn->type) || n == pColumn->bytes); + + pTagVals[nTags].cid = pColumn->colId; + pTagVals[nTags].type = pColumn->type; + pTagVals[nTags].nData = n; + pTagVals[nTags].pData = p; + + nTags++; + } + + // create new tag + if (tTagNew(pTagVals, nTags, ppTag) < 0) { + taosMemoryFree(pTagVals); + return -1; + } + + taosMemoryFree(pTagVals); + return 0; +} + +void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData) { STagIdx *pTagIdx = bsearch(&((STagIdx){.cid = cid}), pTag->idx, pTag->nTag, sizeof(STagIdx), tTagIdxCmprFn); if (pTagIdx == NULL) { *ppData = NULL; @@ -597,18 +642,11 @@ void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nD } } -int32_t tEncodeTag(SEncoder *pEncoder, STag *pTag) { - // return tEncodeBinary(pEncoder, (uint8_t *)pTag, pTag->len); - ASSERT(0); - return 0; +int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag) { + return tEncodeBinary(pEncoder, (const uint8_t *)pTag, pTag->len); } -int32_t tDecodeTag(SDecoder *pDecoder, const STag **ppTag) { - // uint32_t n; - // return tDecodeBinary(pDecoder, (const uint8_t **)ppTag, &n); - ASSERT(0); - return 0; -} +int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag) { return tDecodeBinary(pDecoder, (uint8_t **)ppTag, NULL); } #if 1 // =================================================================================================================== static void dataColSetNEleNull(SDataCol *pCol, int nEle); @@ -1087,7 +1125,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { kvRowSetNCols(row, pBuilder->nCols); kvRowSetLen(row, tlen); - if(pBuilder->nCols > 0){ + if (pBuilder->nCols > 0) { memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 4141485d28baf839a222676b4e9eb50286156280..d988f97188b9330e1229368554b0f75a5713025b 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -13,6 +13,8 @@ target_sources( "src/vnd/vnodeModule.c" "src/vnd/vnodeSvr.c" "src/vnd/vnodeSync.c" + "src/vnd/vnodeSnapshot.c" + "src/vnd/vnodeUtil.c" # meta "src/meta/metaOpen.c" @@ -22,6 +24,7 @@ target_sources( "src/meta/metaQuery.c" "src/meta/metaCommit.c" "src/meta/metaEntry.c" + "src/meta/metaSnapshot.c" # sma "src/sma/sma.c" @@ -44,6 +47,7 @@ target_sources( "src/tsdb/tsdbReadImpl.c" # "src/tsdb/tsdbSma.c" "src/tsdb/tsdbWrite.c" + "src/tsdb/tsdbSnapshot.c" # tq "src/tq/tq.c" diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9e33973c05de18139b642d1af2e854f2f6dc712c..60262451745b4cc5b49b00a1a02386cd060f460d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -39,9 +39,10 @@ extern "C" { #endif // vnode -typedef struct SVnode SVnode; -typedef struct STsdbCfg STsdbCfg; // todo: remove -typedef struct SVnodeCfg SVnodeCfg; +typedef struct SVnode SVnode; +typedef struct STsdbCfg STsdbCfg; // todo: remove +typedef struct SVnodeCfg SVnodeCfg; +typedef struct SVSnapshotReader SVSnapshotReader; extern const SVnodeCfg vnodeCfgDefault; @@ -59,13 +60,14 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); - int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); - int64_t vnodeGetSyncHandle(SVnode *pVnode); void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); +int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever); +int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader); +int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData); // meta typedef struct SMeta SMeta; // todo: remove diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 24b3f458b1962089e05d8eadcf2b23b177956109..faf0ddcd4af958f1aad496cbf020720881a4c8dc 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -47,15 +47,17 @@ extern "C" { #endif -typedef struct SVnodeInfo SVnodeInfo; -typedef struct SMeta SMeta; -typedef struct SSma SSma; -typedef struct STsdb STsdb; -typedef struct STQ STQ; -typedef struct SVState SVState; -typedef struct SVBufPool SVBufPool; -typedef struct SQWorker SQHandle; -typedef struct STsdbKeepCfg STsdbKeepCfg; +typedef struct SVnodeInfo SVnodeInfo; +typedef struct SMeta SMeta; +typedef struct SSma SSma; +typedef struct STsdb STsdb; +typedef struct STQ STQ; +typedef struct SVState SVState; +typedef struct SVBufPool SVBufPool; +typedef struct SQWorker SQHandle; +typedef struct STsdbKeepCfg STsdbKeepCfg; +typedef struct SMetaSnapshotReader SMetaSnapshotReader; +typedef struct STsdbSnapshotReader STsdbSnapshotReader; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" @@ -67,8 +69,10 @@ typedef struct STsdbKeepCfg STsdbKeepCfg; #define VNODE_RSMA2_DIR "rsma2" // vnd.h -void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); -void vnodeBufPoolFree(SVBufPool* pPool, void* p); +void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); +void vnodeBufPoolFree(SVBufPool* pPool, void* p); +int32_t vnodeRealloc(void** pp, int32_t size); +void vnodeFree(void* p); // meta typedef struct SMCtbCursor SMCtbCursor; @@ -95,6 +99,9 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid); STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy); SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid); SArray* metaGetSmaTbUids(SMeta* pMeta); +int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever); +int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader); +int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); @@ -112,6 +119,9 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, void* pMemRef); int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo); +int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); +int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); +int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData); // tq STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c new file mode 100644 index 0000000000000000000000000000000000000000..5757039d55d410808b4eeb57d2e09286b7939004 --- /dev/null +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "meta.h" + +struct SMetaSnapshotReader { + SMeta* pMeta; + TBC* pTbc; + int64_t sver; + int64_t ever; +}; + +int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever) { + int32_t code = 0; + int32_t c = 0; + SMetaSnapshotReader* pMetaReader = NULL; + + pMetaReader = (SMetaSnapshotReader*)taosMemoryCalloc(1, sizeof(*pMetaReader)); + if (pMetaReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pMetaReader->pMeta = pMeta; + pMetaReader->sver = sver; + pMetaReader->ever = ever; + code = tdbTbcOpen(pMeta->pTbDb, &pMetaReader->pTbc, NULL); + if (code) { + goto _err; + } + + code = tdbTbcMoveTo(pMetaReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c); + if (code) { + goto _err; + } + + *ppReader = pMetaReader; + return code; + +_err: + *ppReader = NULL; + return code; +} + +int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader) { + if (pReader) { + tdbTbcClose(pReader->pTbc); + taosMemoryFree(pReader); + } + return 0; +} + +int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nDatap) { + const void* pKey = NULL; + const void* pData = NULL; + int32_t nKey = 0; + int32_t nData = 0; + int32_t code = 0; + + for (;;) { + code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData); + if (code || ((STbDbKey*)pData)->version > pReader->ever) { + return TSDB_CODE_VND_READ_END; + } + + if (((STbDbKey*)pData)->version < pReader->sver) { + continue; + } + + break; + } + + // copy the data + if (vnodeRealloc(ppData, nData) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + memcpy(*ppData, pData, nData); + *nDatap = nData; + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index a792343380aa5799d0ae303eb5b37ace65419f04..462d461a8a23fec805a9739f59a841572ef0bcee 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -23,6 +23,7 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry); +static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type); int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { SMetaEntry me = {0}; @@ -71,64 +72,71 @@ _err: } int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { - TBC *pNameIdxc = NULL; - TBC *pUidIdxc = NULL; - TBC *pCtbIdxc = NULL; - SCtbIdxKey *pCtbIdxKey; - const void *pKey = NULL; - int nKey; - const void *pData = NULL; - int nData; - int c, ret; - - // prepare uid idx cursor - tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); - ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c); - if (ret < 0 || c != 0) { - terrno = TSDB_CODE_VND_TB_NOT_EXIST; - tdbTbcClose(pUidIdxc); - goto _err; - } - - // prepare name idx cursor - tdbTbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn); - ret = tdbTbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c); - if (ret < 0 || c != 0) { - ASSERT(0); + void *pKey = NULL; + int nKey = 0; + void *pData = NULL; + int nData = 0; + int c = 0; + int rc = 0; + + // check if super table exists + rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData); + if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) { + terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; + return -1; } - tdbTbcDelete(pUidIdxc); - tdbTbcDelete(pNameIdxc); - tdbTbcClose(pUidIdxc); - tdbTbcClose(pNameIdxc); + // drop all child tables + TBC *pCtbIdxc = NULL; + SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t)); - // loop to drop each child table tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn); - ret = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); - if (ret < 0 || (c < 0 && tdbTbcMoveToNext(pCtbIdxc) < 0)) { + rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); + if (rc < 0) { tdbTbcClose(pCtbIdxc); - goto _exit; + metaWLock(pMeta); + goto _drop_super_table; } for (;;) { - tdbTbcGet(pCtbIdxc, &pKey, &nKey, NULL, NULL); - pCtbIdxKey = (SCtbIdxKey *)pKey; + rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, NULL, NULL); + if (rc < 0) break; - if (pCtbIdxKey->suid > pReq->suid) break; + if (((SCtbIdxKey *)pKey)->suid < pReq->suid) { + continue; + } else if (((SCtbIdxKey *)pKey)->suid > pReq->suid) { + break; + } - // drop the child table (TODO) + taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid)); + } + + tdbTbcClose(pCtbIdxc); - if (tdbTbcMoveToNext(pCtbIdxc) < 0) break; + metaWLock(pMeta); + + for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) { + tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild); + metaDropTableByUid(pMeta, uid, NULL); } + taosArrayDestroy(pArray); + + // drop super table +_drop_super_table: + tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData); + tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = *(int64_t *)pData, .uid = pReq->suid}, sizeof(STbDbKey), + &pMeta->txn); + tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn); + tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn); + + metaULock(pMeta); + _exit: + tdbFree(pKey); + tdbFree(pData); metaDebug("vgId:%d super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid); return 0; - -_err: - metaError("vgId:%d failed to drop super table %s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, - pReq->suid, tstrerror(terrno)); - return -1; } int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { @@ -256,122 +264,63 @@ _err: } int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) { - TBC *pTbDbc = NULL; - TBC *pUidIdxc = NULL; - TBC *pNameIdxc = NULL; - const void *pData; - int nData; - tb_uid_t uid; - int64_t tver; - SMetaEntry me = {0}; - SDecoder coder = {0}; - int8_t type; - int64_t ctime; - tb_uid_t suid; - int c = 0, ret; - - // search & delete the name idx - tdbTbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn); - ret = tdbTbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c); - if (ret < 0 || !tdbTbcIsValid(pNameIdxc) || c) { - tdbTbcClose(pNameIdxc); + void *pData = NULL; + int nData = 0; + int rc = 0; + tb_uid_t uid; + int type; + + rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData); + if (rc < 0) { terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; return -1; } - - ret = tdbTbcGet(pNameIdxc, NULL, NULL, &pData, &nData); - if (ret < 0) { - ASSERT(0); - return -1; - } - uid = *(tb_uid_t *)pData; - tdbTbcDelete(pNameIdxc); - tdbTbcClose(pNameIdxc); - - // search & delete uid idx - tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); - ret = tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); - if (ret < 0 || c != 0) { - ASSERT(0); - return -1; - } - - ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); - if (ret < 0) { - ASSERT(0); - return -1; - } - - tver = *(int64_t *)pData; - tdbTbcDelete(pUidIdxc); - tdbTbcClose(pUidIdxc); - - // search and get meta entry - tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); - ret = tdbTbcMoveTo(pTbDbc, &(STbDbKey){.uid = uid, .version = tver}, sizeof(STbDbKey), &c); - if (ret < 0 || c != 0) { - ASSERT(0); - return -1; - } + metaWLock(pMeta); + metaDropTableByUid(pMeta, uid, &type); + metaULock(pMeta); - ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); - if (ret < 0) { - ASSERT(0); - return -1; + if (type == TSDB_CHILD_TABLE && tbUids) { + taosArrayPush(tbUids, &uid); } - // decode entry - void *pDataCopy = taosMemoryMalloc(nData); // remove the copy (todo) - memcpy(pDataCopy, pData, nData); - tDecoderInit(&coder, pDataCopy, nData); - ret = metaDecodeEntry(&coder, &me); - if (ret < 0) { - ASSERT(0); - return -1; - } + tdbFree(pData); + return 0; +} - type = me.type; - if (type == TSDB_CHILD_TABLE) { - ctime = me.ctbEntry.ctime; - suid = me.ctbEntry.suid; - taosArrayPush(tbUids, &me.uid); - } else if (type == TSDB_NORMAL_TABLE) { - ctime = me.ntbEntry.ctime; - suid = 0; - } else { - ASSERT(0); - } +static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { + void *pData = NULL; + int nData = 0; + int rc = 0; + int64_t version; + SMetaEntry e = {0}; + SDecoder dc = {0}; - taosMemoryFree(pDataCopy); - tDecoderClear(&coder); - tdbTbcClose(pTbDbc); + rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData); + version = *(int64_t *)pData; - if (type == TSDB_CHILD_TABLE) { - // remove the pCtbIdx - TBC *pCtbIdxc = NULL; - tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn); + tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData); - ret = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = uid}, sizeof(SCtbIdxKey), &c); - if (ret < 0 || c != 0) { - ASSERT(0); - return -1; - } + tDecoderInit(&dc, pData, nData); + metaDecodeEntry(&dc, &e); - tdbTbcDelete(pCtbIdxc); - tdbTbcClose(pCtbIdxc); + if (type) *type = e.type; - // remove tags from pTagIdx (todo) - } else if (type == TSDB_NORMAL_TABLE) { - // remove from pSkmDb - } else { - ASSERT(0); + tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pMeta->txn); + tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, &pMeta->txn); + tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), &pMeta->txn); + if (e.type == TSDB_CHILD_TABLE) { + tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn); + } else if (e.type == TSDB_NORMAL_TABLE) { + // drop schema.db (todo) + // drop ttl.idx (todo) + } else if (e.type == TSDB_SUPER_TABLE) { + // drop schema.db (todo) } - // remove from ttl (todo) - if (ctime > 0) { - } + tDecoderClear(&dc); + tdbFree(pData); return 0; } @@ -608,14 +557,14 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA // TODO : need to update tag index } ctbEntry.version = version; - if(pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON){ + if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) { ctbEntry.ctbEntry.pTags = taosMemoryMalloc(pAlterTbReq->nTagVal); - if(ctbEntry.ctbEntry.pTags == NULL){ + if (ctbEntry.ctbEntry.pTags == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - memcpy((void*)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal); - }else{ + memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal); + } else { SKVRowBuilder kvrb = {0}; const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags; SKVRow pNewTag = NULL; @@ -649,7 +598,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA tDecoderClear(&dc1); tDecoderClear(&dc2); - if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void*)ctbEntry.ctbEntry.pTags); + if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void *)ctbEntry.ctbEntry.pTags); if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf); if (stbEntry.pBuf) tdbFree(stbEntry.pBuf); tdbTbcClose(pTbDbc); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c new file mode 100644 index 0000000000000000000000000000000000000000..79989a55601b99e681c573cae1f5c26e38cd7421 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" + +struct STsdbSnapshotReader { + STsdb* pTsdb; + // TODO +}; + +int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever) { + // TODO + return 0; +} + +int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader) { + // TODO + return 0; +} + +int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData) { + // TODO + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c new file mode 100644 index 0000000000000000000000000000000000000000..baa8422307dd7785201bcc4b8b632bb3c05a37cb --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeInt.h" + +struct SVSnapshotReader { + SVnode *pVnode; + int64_t sver; + int64_t ever; + int8_t isMetaEnd; + int8_t isTsdbEnd; + SMetaSnapshotReader *pMetaReader; + STsdbSnapshotReader *pTsdbReader; + void *pData; + int32_t nData; +}; + +int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever) { + SVSnapshotReader *pReader = NULL; + + pReader = (SVSnapshotReader *)taosMemoryCalloc(1, sizeof(*pReader)); + if (pReader == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pVnode = pVnode; + pReader->sver = sver; + pReader->ever = ever; + pReader->isMetaEnd = 0; + pReader->isTsdbEnd = 0; + + if (metaSnapshotReaderOpen(pVnode->pMeta, &pReader->pMetaReader, sver, ever) < 0) { + taosMemoryFree(pReader); + goto _err; + } + + if (tsdbSnapshotReaderOpen(pVnode->pTsdb, &pReader->pTsdbReader, sver, ever) < 0) { + metaSnapshotReaderClose(pReader->pMetaReader); + taosMemoryFree(pReader); + goto _err; + } + +_exit: + *ppReader = pReader; + return 0; + +_err: + *ppReader = NULL; + return -1; +} + +int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader) { + if (pReader) { + vnodeFree(pReader->pData); + tsdbSnapshotReaderClose(pReader->pTsdbReader); + metaSnapshotReaderClose(pReader->pMetaReader); + taosMemoryFree(pReader); + } + return 0; +} + +int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData) { + int32_t code = 0; + + if (!pReader->isMetaEnd) { + code = metaSnapshotRead(pReader->pMetaReader, &pReader->pData, &pReader->nData); + if (code) { + if (code == TSDB_CODE_VND_READ_END) { + pReader->isMetaEnd = 1; + } else { + return code; + } + } else { + *ppData = pReader->pData; + *nData = pReader->nData; + return code; + } + } + + if (!pReader->isTsdbEnd) { + code = tsdbSnapshotRead(pReader->pTsdbReader, &pReader->pData, &pReader->nData); + if (code) { + if (code == TSDB_CODE_VND_READ_END) { + pReader->isTsdbEnd = 1; + } else { + return code; + } + } else { + *ppData = pReader->pData; + *nData = pReader->nData; + return code; + } + } + + code = TSDB_CODE_VND_READ_END; + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeUtil.c b/source/dnode/vnode/src/vnd/vnodeUtil.c new file mode 100644 index 0000000000000000000000000000000000000000..cd942099bc8924fde06ea912b0eecdfbe72603cb --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeUtil.c @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnd.h" + +int32_t vnodeRealloc(void** pp, int32_t size) { + uint8_t* p = NULL; + int32_t csize = 0; + + if (*pp) { + p = (uint8_t*)(*pp) - sizeof(int32_t); + csize = *(int32_t*)p; + } + + if (csize >= size) { + return 0; + } + + p = (uint8_t*)taosMemoryRealloc(p, size); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + *(int32_t*)p = size; + *pp = p + sizeof(int32_t); + + return 0; +} + +void vnodeFree(void* p) { + if (p) { + taosMemoryFree(((uint8_t*)p) - sizeof(int32_t)); + } +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a4e39260375c57dac6b437b65df1ee2b1701a9d6..6eb4f9310ba058f5f7f210058050d6df1eea3887 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -315,6 +315,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_READ_END, "Read end") // tsdb