提交 b3da0e2a 编写于 作者: H Hongze Cheng

meta snapshot write

上级 be85e272
......@@ -57,6 +57,9 @@ int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
// metaCommit ==================
static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); }
// metaTable ==================
int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME);
struct SMeta {
TdThreadRwlock lock;
......
......@@ -166,6 +166,19 @@ _err:
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
// TODO
SMeta* pMeta = pWriter->pMeta;
SMetaEntry metaEntry = {0};
SDecoder* pDecoder = &(SDecoder){0};
tDecoderInit(pDecoder, pData, nData);
metaDecodeEntry(pDecoder, &metaEntry);
code = metaHandleEntry(pMeta, &metaEntry);
if (code) goto _err;
return code;
_err:
metaError("vgId:%d meta snapshot write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code;
}
\ No newline at end of file
......@@ -16,7 +16,6 @@
#include "meta.h"
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
......@@ -50,7 +49,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
if (pMeta->pTagIvtIdx == NULL || pCtbEntry == NULL) {
return -1;
}
void * data = pCtbEntry->ctbEntry.pTags;
void *data = pCtbEntry->ctbEntry.pTags;
const char *tagName = pSchema->name;
tb_uid_t suid = pCtbEntry->ctbEntry.suid;
......@@ -69,7 +68,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
char type = pTagVal->type;
char * key = pTagVal->pKey;
char *key = pTagVal->pKey;
int32_t nKey = strlen(key);
SIndexTerm *term = NULL;
......@@ -77,13 +76,13 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
// handle null value
} else if (type == TSDB_DATA_TYPE_NCHAR) {
if (pTagVal->nData > 0) {
char * val = taosMemoryCalloc(1, pTagVal->nData + VARSTR_HEADER_SIZE);
char *val = taosMemoryCalloc(1, pTagVal->nData + VARSTR_HEADER_SIZE);
int32_t len = taosUcs4ToMbs((TdUcs4 *)pTagVal->pData, pTagVal->nData, val + VARSTR_HEADER_SIZE);
memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE);
type = TSDB_DATA_TYPE_VARCHAR;
term = indexTermCreate(suid, ADD_VALUE, type, key, nKey, val, len);
} else if (pTagVal->nData == 0) {
char * val = NULL;
char *val = NULL;
int32_t len = 0;
// handle NULL key
}
......@@ -112,9 +111,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int vLen = 0;
const void *pKey = NULL;
const void *pVal = NULL;
void * pBuf = NULL;
void *pBuf = NULL;
int32_t szBuf = 0;
void * p = NULL;
void *p = NULL;
SMetaReader mr = {0};
// validate req
......@@ -172,7 +171,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
}
// drop all child tables
TBC * pCtbIdxc = NULL;
TBC *pCtbIdxc = NULL;
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
......@@ -228,8 +227,8 @@ _exit:
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
TBC * pUidIdxc = NULL;
TBC * pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pTbDbc = NULL;
const void *pData;
int nData;
int64_t oversion;
......@@ -352,7 +351,7 @@ _err:
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
void * pData = NULL;
void *pData = NULL;
int nData = 0;
int rc = 0;
tb_uid_t uid;
......@@ -380,20 +379,20 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
metaWLock(pMeta);
int ret = metaTtlSmaller(pMeta, ttl, tbUids);
if(ret != 0){
if (ret != 0) {
metaULock(pMeta);
return ret;
}
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, *uid, NULL);
metaDebug("ttl drop table:%"PRId64, *uid);
metaDebug("ttl drop table:%" PRId64, *uid);
}
metaULock(pMeta);
return 0;
}
static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME){
static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME) {
int64_t ttlDays;
int64_t ctime;
if (pME->type == TSDB_CHILD_TABLE) {
......@@ -415,13 +414,12 @@ static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME){
static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
STtlIdxKey ttlKey = {0};
metaBuildTtlIdxKey(&ttlKey, pME);
if(ttlKey.dtime == 0) return 0;
if (ttlKey.dtime == 0) return 0;
return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), &pMeta->txn);
}
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
void * pData = NULL;
void *pData = NULL;
int nData = 0;
int rc = 0;
SMetaEntry e = {0};
......@@ -440,8 +438,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, &pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), &pMeta->txn);
if(e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e);
if (e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e);
if (e.type == TSDB_CHILD_TABLE) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn);
......@@ -459,14 +456,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
}
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) {
void * pVal = NULL;
void *pVal = NULL;
int nVal = 0;
const void * pData = NULL;
const void *pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
int64_t oversion;
SSchema * pColumn = NULL;
SSchema *pColumn = NULL;
SMetaEntry entry = {0};
SSchemaWrapper *pSchema;
int c;
......@@ -620,7 +617,7 @@ _err:
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry ctbEntry = {0};
SMetaEntry stbEntry = {0};
void * pVal = NULL;
void *pVal = NULL;
int nVal = 0;
int ret;
int c;
......@@ -651,7 +648,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
oversion = *(int64_t *)pData;
// search table.db
TBC * pTbDbc = NULL;
TBC *pTbDbc = NULL;
SDecoder dc1 = {0};
SDecoder dc2 = {0};
......@@ -675,7 +672,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaDecodeEntry(&dc2, &stbEntry);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema * pColumn = NULL;
SSchema *pColumn = NULL;
int32_t iCol = 0;
for (;;) {
pColumn = NULL;
......@@ -705,8 +702,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
} else {
const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags;
STag * pNewTag = NULL;
SArray * pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
STag *pNewTag = NULL;
SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
if (!pTagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......@@ -765,9 +762,9 @@ _err:
}
static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
void * pVal = NULL;
void *pVal = NULL;
int nVal = 0;
const void * pData = NULL;
const void *pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
......@@ -816,22 +813,22 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
metaWLock(pMeta);
// build SMetaEntry
if (entry.type == TSDB_CHILD_TABLE) {
if(pAlterTbReq->updateTTL) {
if (pAlterTbReq->updateTTL) {
metaDeleteTtlIdx(pMeta, &entry);
entry.ctbEntry.ttlDays = pAlterTbReq->newTTL;
metaUpdateTtlIdx(pMeta, &entry);
}
if(pAlterTbReq->newCommentLen >= 0) {
if (pAlterTbReq->newCommentLen >= 0) {
entry.ctbEntry.commentLen = pAlterTbReq->newCommentLen;
entry.ctbEntry.comment = pAlterTbReq->newComment;
}
} else {
if(pAlterTbReq->updateTTL) {
if (pAlterTbReq->updateTTL) {
metaDeleteTtlIdx(pMeta, &entry);
entry.ntbEntry.ttlDays = pAlterTbReq->newTTL;
metaUpdateTtlIdx(pMeta, &entry);
}
if(pAlterTbReq->newCommentLen >= 0) {
if (pAlterTbReq->newCommentLen >= 0) {
entry.ntbEntry.commentLen = pAlterTbReq->newCommentLen;
entry.ntbEntry.comment = pAlterTbReq->newComment;
}
......@@ -869,8 +866,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey;
void * pKey = NULL;
void * pVal = NULL;
void *pKey = NULL;
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
SEncoder coder = {0};
......@@ -930,7 +927,7 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
STtlIdxKey ttlKey = {0};
metaBuildTtlIdxKey(&ttlKey, pME);
if(ttlKey.dtime == 0) return 0;
if (ttlKey.dtime == 0) return 0;
return tdbTbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
}
......@@ -976,19 +973,19 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
}
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
void * pData = NULL;
void *pData = NULL;
int nData = 0;
STbDbKey tbDbKey = {0};
SMetaEntry stbEntry = {0};
STagIdxKey * pTagIdxKey = NULL;
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
const void * pTagData = NULL; //
const void *pTagData = NULL; //
int32_t nTagData = 0;
SDecoder dc = {0};
// get super table
if(tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0){
if (tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
return -1;
}
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
......@@ -1030,7 +1027,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SEncoder coder = {0};
void * pVal = NULL;
void *pVal = NULL;
int vLen = 0;
int rcode = 0;
SSkmDbKey skmDbKey = {0};
......@@ -1072,7 +1069,7 @@ _exit:
return rcode;
}
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
metaWLock(pMeta);
// save to table.db
......
......@@ -449,6 +449,8 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
} else if (c < 0) {
code = tsdbSnapWriteAppendData(pWriter, pData, nData);
if (code) goto _err;
break;
} else {
// commit the block
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册