提交 e34ad23c 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_snapshot

...@@ -95,6 +95,7 @@ int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); ...@@ -95,6 +95,7 @@ int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name); tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
int metaGetTbNum(SMeta* pMeta); int metaGetTbNum(SMeta* pMeta);
......
...@@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) { ...@@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) {
} }
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) { int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
SMeta * pMeta = pReader->pMeta; SMeta *pMeta = pReader->pMeta;
STbDbKey tbDbKey = {.version = version, .uid = uid}; STbDbKey tbDbKey = {.version = version, .uid = uid};
// query table.db // query table.db
...@@ -54,7 +54,7 @@ _err: ...@@ -54,7 +54,7 @@ _err:
} }
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
SMeta * pMeta = pReader->pMeta; SMeta *pMeta = pReader->pMeta;
int64_t version; int64_t version;
// query uid.idx // query uid.idx
...@@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { ...@@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
} }
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
SMeta * pMeta = pReader->pMeta; SMeta *pMeta = pReader->pMeta;
tb_uid_t uid; tb_uid_t uid;
// query name.idx // query name.idx
...@@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { ...@@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
} }
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
void * pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
tb_uid_t uid = 0; tb_uid_t uid = 0;
...@@ -138,7 +138,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) { ...@@ -138,7 +138,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
int metaTbCursorNext(SMTbCursor *pTbCur) { int metaTbCursorNext(SMTbCursor *pTbCur) {
int ret; int ret;
void * pBuf; void *pBuf;
STbCfg tbCfg; STbCfg tbCfg;
for (;;) { for (;;) {
...@@ -159,7 +159,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) { ...@@ -159,7 +159,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
} }
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
void * pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
int64_t version; int64_t version;
SSchemaWrapper schema = {0}; SSchemaWrapper schema = {0};
...@@ -218,9 +218,9 @@ _err: ...@@ -218,9 +218,9 @@ _err:
return NULL; return NULL;
} }
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
TBC * pCur; TBC *pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL); int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
if (ret < 0) { if (ret < 0) {
return ret; return ret;
} }
...@@ -235,13 +235,13 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ ...@@ -235,13 +235,13 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
} }
void *pKey = NULL; void *pKey = NULL;
int kLen = 0; int kLen = 0;
while(1){ while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL); ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) { if (ret < 0) {
break; break;
} }
ttlKey = *(STtlIdxKey*)pKey; ttlKey = *(STtlIdxKey *)pKey;
taosArrayPush(uidList, &ttlKey.uid); taosArrayPush(uidList, &ttlKey.uid);
} }
tdbTbcClose(pCur); tdbTbcClose(pCur);
...@@ -252,11 +252,11 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ ...@@ -252,11 +252,11 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){
} }
struct SMCtbCursor { struct SMCtbCursor {
SMeta * pMeta; SMeta *pMeta;
TBC * pCur; TBC *pCur;
tb_uid_t suid; tb_uid_t suid;
void * pKey; void *pKey;
void * pVal; void *pVal;
int kLen; int kLen;
int vLen; int vLen;
}; };
...@@ -388,15 +388,15 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) { ...@@ -388,15 +388,15 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
if (ret < 0) { if (ret < 0) {
return 0; return 0;
} }
return *(tb_uid_t*)pStbCur->pKey; return *(tb_uid_t *)pStbCur->pKey;
} }
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
// SMetaReader mr = {0}; // SMetaReader mr = {0};
STSchema * pTSchema = NULL; STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL; SSchemaWrapper *pSW = NULL;
STSchemaBuilder sb = {0}; STSchemaBuilder sb = {0};
SSchema * pSchema; SSchema *pSchema;
pSW = metaGetTableSchema(pMeta, uid, sver, 0); pSW = metaGetTableSchema(pMeta, uid, sver, 0);
if (!pSW) return NULL; if (!pSW) return NULL;
...@@ -415,6 +415,51 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -415,6 +415,51 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
return pTSchema; return pTSchema;
} }
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
int32_t code = 0;
STSchema *pTSchema = NULL;
SSkmDbKey skmDbKey = {.uid = suid ? suid : uid, .sver = sver};
void *pData = NULL;
int nData = 0;
// query
metaRLock(pMeta);
if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), &pData, &nData) < 0) {
code = TSDB_CODE_NOT_FOUND;
metaULock(pMeta);
goto _err;
}
metaULock(pMeta);
// decode
SDecoder dc = {0};
SSchemaWrapper schema;
SSchemaWrapper *pSchemaWrapper = &schema;
tDecoderInit(&dc, pData, nData);
tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
tDecoderClear(&dc);
// convert
STSchemaBuilder sb = {0};
tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
for (int i = 0; i < pSchemaWrapper->nCols; i++) {
SSchema *pSchema = pSchemaWrapper->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
*ppTSchema = pTSchema;
taosMemoryFree(pSchemaWrapper->pSchema);
return code;
_err:
*ppTSchema = NULL;
return code;
}
int metaGetTbNum(SMeta *pMeta) { int metaGetTbNum(SMeta *pMeta) {
// TODO // TODO
// ASSERT(0); // ASSERT(0);
...@@ -422,11 +467,11 @@ int metaGetTbNum(SMeta *pMeta) { ...@@ -422,11 +467,11 @@ int metaGetTbNum(SMeta *pMeta) {
} }
typedef struct { typedef struct {
SMeta * pMeta; SMeta *pMeta;
TBC * pCur; TBC *pCur;
tb_uid_t uid; tb_uid_t uid;
void * pKey; void *pKey;
void * pVal; void *pVal;
int kLen; int kLen;
int vLen; int vLen;
} SMSmaCursor; } SMSmaCursor;
...@@ -498,7 +543,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) { ...@@ -498,7 +543,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
STSmaWrapper *pSW = NULL; STSmaWrapper *pSW = NULL;
SArray * pSmaIds = NULL; SArray *pSmaIds = NULL;
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) { if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
return NULL; return NULL;
...@@ -522,7 +567,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { ...@@ -522,7 +567,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
metaReaderInit(&mr, pMeta, 0); metaReaderInit(&mr, pMeta, 0);
int64_t smaId; int64_t smaId;
int smaIdx = 0; int smaIdx = 0;
STSma * pTSma = NULL; STSma *pTSma = NULL;
for (int i = 0; i < pSW->number; ++i) { for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i); smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) { if (metaGetTableEntryByUid(&mr, smaId) < 0) {
...@@ -570,7 +615,7 @@ _err: ...@@ -570,7 +615,7 @@ _err:
} }
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma * pTSma = NULL; STSma *pTSma = NULL;
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0); metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, indexUid) < 0) { if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
...@@ -592,7 +637,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { ...@@ -592,7 +637,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
} }
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) { SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
SArray * pUids = NULL; SArray *pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL; SSmaIdxKey *pSmaIdxKey = NULL;
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
...@@ -630,7 +675,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) { ...@@ -630,7 +675,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
} }
SArray *metaGetSmaTbUids(SMeta *pMeta) { SArray *metaGetSmaTbUids(SMeta *pMeta) {
SArray * pUids = NULL; SArray *pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL; SSmaIdxKey *pSmaIdxKey = NULL;
tb_uid_t lastUid = 0; tb_uid_t lastUid = 0;
...@@ -689,20 +734,20 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) { ...@@ -689,20 +734,20 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
} }
typedef struct { typedef struct {
SMeta * pMeta; SMeta *pMeta;
TBC * pCur; TBC *pCur;
tb_uid_t suid; tb_uid_t suid;
int16_t cid; int16_t cid;
int16_t type; int16_t type;
void * pKey; void *pKey;
void * pVal; void *pVal;
int32_t kLen; int32_t kLen;
int32_t vLen; int32_t vLen;
} SIdxCursor; } SIdxCursor;
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
SIdxCursor *pCursor = NULL; SIdxCursor *pCursor = NULL;
char * buf = NULL; char *buf = NULL;
int32_t maxSize = 0; int32_t maxSize = 0;
int32_t ret = 0, valid = 0; int32_t ret = 0, valid = 0;
...@@ -721,7 +766,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { ...@@ -721,7 +766,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t nKey = 0; int32_t nKey = 0;
int32_t nTagData = 0; int32_t nTagData = 0;
void * tagData = NULL; void *tagData = NULL;
if (param->val == NULL) { if (param->val == NULL) {
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode)); metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
...@@ -757,7 +802,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { ...@@ -757,7 +802,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
goto END; goto END;
} }
void * entryKey = NULL, *entryVal = NULL; void *entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal; int32_t nEntryKey, nEntryVal;
bool first = true; bool first = true;
while (1) { while (1) {
......
...@@ -327,10 +327,8 @@ static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t su ...@@ -327,10 +327,8 @@ static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t su
pCommitter->skmTable.suid = suid; pCommitter->skmTable.suid = suid;
pCommitter->skmTable.uid = uid; pCommitter->skmTable.uid = uid;
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmTable.pTSchema);
pCommitter->skmTable.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver); code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema);
if (pCommitter->skmTable.pTSchema == NULL) { if (code) goto _exit;
code = TSDB_CODE_OUT_OF_MEMORY;
}
_exit: _exit:
return code; return code;
...@@ -352,9 +350,9 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid ...@@ -352,9 +350,9 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid
pCommitter->skmRow.suid = suid; pCommitter->skmRow.suid = suid;
pCommitter->skmRow.uid = uid; pCommitter->skmRow.uid = uid;
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema);
pCommitter->skmRow.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver); code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
if (pCommitter->skmRow.pTSchema == NULL) { if (code) {
code = TSDB_CODE_OUT_OF_MEMORY; goto _exit;
} }
_exit: _exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册