未验证 提交 c7d1f6e6 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #12978 from taosdata/feature/data_format

feat: drop stable
......@@ -61,9 +61,10 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag);
void tTagFree(STag *pTag);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nData);
int32_t tEncodeTag(SEncoder *pEncoder, STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, const STag **ppTag);
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
// STRUCT =================
struct STColumn {
......
......@@ -313,6 +313,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
#define TSDB_CODE_VND_READ_END TAOS_DEF_ERROR_CODE(0, 0x051c)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
......@@ -581,7 +581,52 @@ void tTagFree(STag *pTag) {
if (pTag) taosMemoryFree(pTag);
}
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nData) {
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag) {
STagVal *pTagVals;
int16_t nTags = 0;
SSchema *pColumn;
uint8_t *p;
uint32_t n;
pTagVals = (STagVal *)taosMemoryMalloc(sizeof(*pTagVals) * nCols);
if (pTagVals == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < nCols; i++) {
pColumn = &pSchema[i];
if (i == iCol) {
p = pData;
n = nData;
} else {
tTagGet(pTag, pColumn->colId, pColumn->type, &p, &n);
}
if (p == NULL) continue;
ASSERT(IS_VAR_DATA_TYPE(pColumn->type) || n == pColumn->bytes);
pTagVals[nTags].cid = pColumn->colId;
pTagVals[nTags].type = pColumn->type;
pTagVals[nTags].nData = n;
pTagVals[nTags].pData = p;
nTags++;
}
// create new tag
if (tTagNew(pTagVals, nTags, ppTag) < 0) {
taosMemoryFree(pTagVals);
return -1;
}
taosMemoryFree(pTagVals);
return 0;
}
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData) {
STagIdx *pTagIdx = bsearch(&((STagIdx){.cid = cid}), pTag->idx, pTag->nTag, sizeof(STagIdx), tTagIdxCmprFn);
if (pTagIdx == NULL) {
*ppData = NULL;
......@@ -597,18 +642,11 @@ void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nD
}
}
int32_t tEncodeTag(SEncoder *pEncoder, STag *pTag) {
// return tEncodeBinary(pEncoder, (uint8_t *)pTag, pTag->len);
ASSERT(0);
return 0;
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag) {
return tEncodeBinary(pEncoder, (const uint8_t *)pTag, pTag->len);
}
int32_t tDecodeTag(SDecoder *pDecoder, const STag **ppTag) {
// uint32_t n;
// return tDecodeBinary(pDecoder, (const uint8_t **)ppTag, &n);
ASSERT(0);
return 0;
}
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag) { return tDecodeBinary(pDecoder, (uint8_t **)ppTag, NULL); }
#if 1 // ===================================================================================================================
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
......@@ -1087,7 +1125,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
kvRowSetNCols(row, pBuilder->nCols);
kvRowSetLen(row, tlen);
if(pBuilder->nCols > 0){
if (pBuilder->nCols > 0) {
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
}
......
......@@ -13,6 +13,8 @@ target_sources(
"src/vnd/vnodeModule.c"
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeUtil.c"
# meta
"src/meta/metaOpen.c"
......@@ -22,6 +24,7 @@ target_sources(
"src/meta/metaQuery.c"
"src/meta/metaCommit.c"
"src/meta/metaEntry.c"
"src/meta/metaSnapshot.c"
# sma
"src/sma/sma.c"
......@@ -44,6 +47,7 @@ target_sources(
"src/tsdb/tsdbReadImpl.c"
# "src/tsdb/tsdbSma.c"
"src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbSnapshot.c"
# tq
"src/tq/tq.c"
......
......@@ -39,9 +39,10 @@ extern "C" {
#endif
// vnode
typedef struct SVnode SVnode;
typedef struct STsdbCfg STsdbCfg; // todo: remove
typedef struct SVnodeCfg SVnodeCfg;
typedef struct SVnode SVnode;
typedef struct STsdbCfg STsdbCfg; // todo: remove
typedef struct SVnodeCfg SVnodeCfg;
typedef struct SVSnapshotReader SVSnapshotReader;
extern const SVnodeCfg vnodeCfgDefault;
......@@ -59,13 +60,14 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
int64_t vnodeGetSyncHandle(SVnode *pVnode);
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
// meta
typedef struct SMeta SMeta; // todo: remove
......
......@@ -47,15 +47,17 @@
extern "C" {
#endif
typedef struct SVnodeInfo SVnodeInfo;
typedef struct SMeta SMeta;
typedef struct SSma SSma;
typedef struct STsdb STsdb;
typedef struct STQ STQ;
typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SVnodeInfo SVnodeInfo;
typedef struct SMeta SMeta;
typedef struct SSma SSma;
typedef struct STsdb STsdb;
typedef struct STQ STQ;
typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SMetaSnapshotReader SMetaSnapshotReader;
typedef struct STsdbSnapshotReader STsdbSnapshotReader;
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
......@@ -67,8 +69,10 @@ typedef struct STsdbKeepCfg STsdbKeepCfg;
#define VNODE_RSMA2_DIR "rsma2"
// vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
int32_t vnodeRealloc(void** pp, int32_t size);
void vnodeFree(void* p);
// meta
typedef struct SMCtbCursor SMCtbCursor;
......@@ -95,6 +99,9 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta);
int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
......@@ -112,6 +119,9 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
void* pMemRef);
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
// tq
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
struct SMetaSnapshotReader {
SMeta* pMeta;
TBC* pTbc;
int64_t sver;
int64_t ever;
};
int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever) {
int32_t code = 0;
int32_t c = 0;
SMetaSnapshotReader* pMetaReader = NULL;
pMetaReader = (SMetaSnapshotReader*)taosMemoryCalloc(1, sizeof(*pMetaReader));
if (pMetaReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMetaReader->pMeta = pMeta;
pMetaReader->sver = sver;
pMetaReader->ever = ever;
code = tdbTbcOpen(pMeta->pTbDb, &pMetaReader->pTbc, NULL);
if (code) {
goto _err;
}
code = tdbTbcMoveTo(pMetaReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) {
goto _err;
}
*ppReader = pMetaReader;
return code;
_err:
*ppReader = NULL;
return code;
}
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader) {
if (pReader) {
tdbTbcClose(pReader->pTbc);
taosMemoryFree(pReader);
}
return 0;
}
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nDatap) {
const void* pKey = NULL;
const void* pData = NULL;
int32_t nKey = 0;
int32_t nData = 0;
int32_t code = 0;
for (;;) {
code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData);
if (code || ((STbDbKey*)pData)->version > pReader->ever) {
return TSDB_CODE_VND_READ_END;
}
if (((STbDbKey*)pData)->version < pReader->sver) {
continue;
}
break;
}
// copy the data
if (vnodeRealloc(ppData, nData) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
memcpy(*ppData, pData, nData);
*nDatap = nData;
return code;
}
\ No newline at end of file
......@@ -23,6 +23,7 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry me = {0};
......@@ -71,64 +72,71 @@ _err:
}
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
TBC *pNameIdxc = NULL;
TBC *pUidIdxc = NULL;
TBC *pCtbIdxc = NULL;
SCtbIdxKey *pCtbIdxKey;
const void *pKey = NULL;
int nKey;
const void *pData = NULL;
int nData;
int c, ret;
// prepare uid idx cursor
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c != 0) {
terrno = TSDB_CODE_VND_TB_NOT_EXIST;
tdbTbcClose(pUidIdxc);
goto _err;
}
// prepare name idx cursor
tdbTbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
if (ret < 0 || c != 0) {
ASSERT(0);
void *pKey = NULL;
int nKey = 0;
void *pData = NULL;
int nData = 0;
int c = 0;
int rc = 0;
// check if super table exists
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
}
tdbTbcDelete(pUidIdxc);
tdbTbcDelete(pNameIdxc);
tdbTbcClose(pUidIdxc);
tdbTbcClose(pNameIdxc);
// drop all child tables
TBC *pCtbIdxc = NULL;
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
// loop to drop each child table
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (ret < 0 || (c < 0 && tdbTbcMoveToNext(pCtbIdxc) < 0)) {
rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (rc < 0) {
tdbTbcClose(pCtbIdxc);
goto _exit;
metaWLock(pMeta);
goto _drop_super_table;
}
for (;;) {
tdbTbcGet(pCtbIdxc, &pKey, &nKey, NULL, NULL);
pCtbIdxKey = (SCtbIdxKey *)pKey;
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, NULL, NULL);
if (rc < 0) break;
if (pCtbIdxKey->suid > pReq->suid) break;
if (((SCtbIdxKey *)pKey)->suid < pReq->suid) {
continue;
} else if (((SCtbIdxKey *)pKey)->suid > pReq->suid) {
break;
}
// drop the child table (TODO)
taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid));
}
tdbTbcClose(pCtbIdxc);
if (tdbTbcMoveToNext(pCtbIdxc) < 0) break;
metaWLock(pMeta);
for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild);
metaDropTableByUid(pMeta, uid, NULL);
}
taosArrayDestroy(pArray);
// drop super table
_drop_super_table:
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = *(int64_t *)pData, .uid = pReq->suid}, sizeof(STbDbKey),
&pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
metaULock(pMeta);
_exit:
tdbFree(pKey);
tdbFree(pData);
metaDebug("vgId:%d super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
return 0;
_err:
metaError("vgId:%d failed to drop super table %s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name,
pReq->suid, tstrerror(terrno));
return -1;
}
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
......@@ -256,122 +264,63 @@ _err:
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
TBC *pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pNameIdxc = NULL;
const void *pData;
int nData;
tb_uid_t uid;
int64_t tver;
SMetaEntry me = {0};
SDecoder coder = {0};
int8_t type;
int64_t ctime;
tb_uid_t suid;
int c = 0, ret;
// search & delete the name idx
tdbTbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
if (ret < 0 || !tdbTbcIsValid(pNameIdxc) || c) {
tdbTbcClose(pNameIdxc);
void *pData = NULL;
int nData = 0;
int rc = 0;
tb_uid_t uid;
int type;
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
if (rc < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
}
ret = tdbTbcGet(pNameIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
}
uid = *(tb_uid_t *)pData;
tdbTbcDelete(pNameIdxc);
tdbTbcClose(pNameIdxc);
// search & delete uid idx
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
}
tver = *(int64_t *)pData;
tdbTbcDelete(pUidIdxc);
tdbTbcClose(pUidIdxc);
// search and get meta entry
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
ret = tdbTbcMoveTo(pTbDbc, &(STbDbKey){.uid = uid, .version = tver}, sizeof(STbDbKey), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
metaWLock(pMeta);
metaDropTableByUid(pMeta, uid, &type);
metaULock(pMeta);
ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
if (type == TSDB_CHILD_TABLE && tbUids) {
taosArrayPush(tbUids, &uid);
}
// decode entry
void *pDataCopy = taosMemoryMalloc(nData); // remove the copy (todo)
memcpy(pDataCopy, pData, nData);
tDecoderInit(&coder, pDataCopy, nData);
ret = metaDecodeEntry(&coder, &me);
if (ret < 0) {
ASSERT(0);
return -1;
}
tdbFree(pData);
return 0;
}
type = me.type;
if (type == TSDB_CHILD_TABLE) {
ctime = me.ctbEntry.ctime;
suid = me.ctbEntry.suid;
taosArrayPush(tbUids, &me.uid);
} else if (type == TSDB_NORMAL_TABLE) {
ctime = me.ntbEntry.ctime;
suid = 0;
} else {
ASSERT(0);
}
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
void *pData = NULL;
int nData = 0;
int rc = 0;
int64_t version;
SMetaEntry e = {0};
SDecoder dc = {0};
taosMemoryFree(pDataCopy);
tDecoderClear(&coder);
tdbTbcClose(pTbDbc);
rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
version = *(int64_t *)pData;
if (type == TSDB_CHILD_TABLE) {
// remove the pCtbIdx
TBC *pCtbIdxc = NULL;
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
ret = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = uid}, sizeof(SCtbIdxKey), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &e);
tdbTbcDelete(pCtbIdxc);
tdbTbcClose(pCtbIdxc);
if (type) *type = e.type;
// remove tags from pTagIdx (todo)
} else if (type == TSDB_NORMAL_TABLE) {
// remove from pSkmDb
} else {
ASSERT(0);
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, &pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), &pMeta->txn);
if (e.type == TSDB_CHILD_TABLE) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn);
} else if (e.type == TSDB_NORMAL_TABLE) {
// drop schema.db (todo)
// drop ttl.idx (todo)
} else if (e.type == TSDB_SUPER_TABLE) {
// drop schema.db (todo)
}
// remove from ttl (todo)
if (ctime > 0) {
}
tDecoderClear(&dc);
tdbFree(pData);
return 0;
}
......@@ -608,14 +557,14 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
// TODO : need to update tag index
}
ctbEntry.version = version;
if(pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON){
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
ctbEntry.ctbEntry.pTags = taosMemoryMalloc(pAlterTbReq->nTagVal);
if(ctbEntry.ctbEntry.pTags == NULL){
if (ctbEntry.ctbEntry.pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy((void*)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
}else{
memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
} else {
SKVRowBuilder kvrb = {0};
const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags;
SKVRow pNewTag = NULL;
......@@ -649,7 +598,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear(&dc1);
tDecoderClear(&dc2);
if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void*)ctbEntry.ctbEntry.pTags);
if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbTbcClose(pTbDbc);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
struct STsdbSnapshotReader {
STsdb* pTsdb;
// TODO
};
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever) {
// TODO
return 0;
}
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader) {
// TODO
return 0;
}
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData) {
// TODO
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
struct SVSnapshotReader {
SVnode *pVnode;
int64_t sver;
int64_t ever;
int8_t isMetaEnd;
int8_t isTsdbEnd;
SMetaSnapshotReader *pMetaReader;
STsdbSnapshotReader *pTsdbReader;
void *pData;
int32_t nData;
};
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever) {
SVSnapshotReader *pReader = NULL;
pReader = (SVSnapshotReader *)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pVnode = pVnode;
pReader->sver = sver;
pReader->ever = ever;
pReader->isMetaEnd = 0;
pReader->isTsdbEnd = 0;
if (metaSnapshotReaderOpen(pVnode->pMeta, &pReader->pMetaReader, sver, ever) < 0) {
taosMemoryFree(pReader);
goto _err;
}
if (tsdbSnapshotReaderOpen(pVnode->pTsdb, &pReader->pTsdbReader, sver, ever) < 0) {
metaSnapshotReaderClose(pReader->pMetaReader);
taosMemoryFree(pReader);
goto _err;
}
_exit:
*ppReader = pReader;
return 0;
_err:
*ppReader = NULL;
return -1;
}
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader) {
if (pReader) {
vnodeFree(pReader->pData);
tsdbSnapshotReaderClose(pReader->pTsdbReader);
metaSnapshotReaderClose(pReader->pMetaReader);
taosMemoryFree(pReader);
}
return 0;
}
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData) {
int32_t code = 0;
if (!pReader->isMetaEnd) {
code = metaSnapshotRead(pReader->pMetaReader, &pReader->pData, &pReader->nData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
pReader->isMetaEnd = 1;
} else {
return code;
}
} else {
*ppData = pReader->pData;
*nData = pReader->nData;
return code;
}
}
if (!pReader->isTsdbEnd) {
code = tsdbSnapshotRead(pReader->pTsdbReader, &pReader->pData, &pReader->nData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
pReader->isTsdbEnd = 1;
} else {
return code;
}
} else {
*ppData = pReader->pData;
*nData = pReader->nData;
return code;
}
}
code = TSDB_CODE_VND_READ_END;
return code;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
int32_t vnodeRealloc(void** pp, int32_t size) {
uint8_t* p = NULL;
int32_t csize = 0;
if (*pp) {
p = (uint8_t*)(*pp) - sizeof(int32_t);
csize = *(int32_t*)p;
}
if (csize >= size) {
return 0;
}
p = (uint8_t*)taosMemoryRealloc(p, size);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*(int32_t*)p = size;
*pp = p + sizeof(int32_t);
return 0;
}
void vnodeFree(void* p) {
if (p) {
taosMemoryFree(((uint8_t*)p) - sizeof(int32_t));
}
}
\ No newline at end of file
......@@ -315,6 +315,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_READ_END, "Read end")
// tsdb
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册