提交 9e33dfbc 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/stt_trigger_config

...@@ -69,7 +69,18 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar ...@@ -69,7 +69,18 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
memcpy(target, buf, TSDB_PASSWORD_LEN); memcpy(target, buf, TSDB_PASSWORD_LEN);
} }
#define taosGetTbHashVal(tbname, tblen, method, prefix, suffix) MurmurHash3_32((tbname), (tblen)) static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, int32_t method, int32_t prefix,
int32_t suffix) {
if (prefix == 0 && suffix == 0) {
return MurmurHash3_32(tbname, tblen);
} else {
if (tblen <= (prefix + suffix)) {
return MurmurHash3_32(tbname, tblen);
} else {
return MurmurHash3_32(tbname + prefix, tblen - prefix - suffix);
}
}
}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -103,6 +103,8 @@ static const SSysDbTableSchema userDBSchema[] = { ...@@ -103,6 +103,8 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = true}, {.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = true},
{.name = "sst_trigger", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "sst_trigger", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "table_prefix", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "table_suffix", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
}; };
static const SSysDbTableSchema userFuncSchema[] = { static const SSysDbTableSchema userFuncSchema[] = {
......
...@@ -513,6 +513,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, ...@@ -513,6 +513,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
return -1; return -1;
} }
if (dbObj.cfg.hashPrefix > 0) {
int32_t dbLen = strlen(dbObj.name) + 1;
mInfo("db:%s, hashPrefix adjust from %d to %d", dbObj.name, dbObj.cfg.hashPrefix, dbObj.cfg.hashPrefix + dbLen);
dbObj.cfg.hashPrefix += dbLen;
}
SVgObj *pVgroups = NULL; SVgObj *pVgroups = NULL;
if (mndAllocVgroup(pMnode, &dbObj, &pVgroups) != 0) { if (mndAllocVgroup(pMnode, &dbObj, &pVgroups) != 0) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create since %s", pCreate->db, terrstr());
...@@ -1710,6 +1716,16 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb, ...@@ -1710,6 +1716,16 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.sstTrigger, false); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.sstTrigger, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int16_t hashPrefix = pDb->cfg.hashPrefix;
if (hashPrefix > 0) {
hashPrefix = pDb->cfg.hashPrefix - strlen(pDb->name) - 1;
}
colDataAppend(pColInfo, rows, (const char *)&hashPrefix, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.hashSuffix, false);
} }
taosMemoryFree(buf); taosMemoryFree(buf);
......
...@@ -82,6 +82,8 @@ typedef struct SLDataIter SLDataIter; ...@@ -82,6 +82,8 @@ typedef struct SLDataIter SLDataIter;
#define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN}) #define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN})
#define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX}) #define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX})
#define TABLE_SAME_SCHEMA(SUID1, UID1, SUID2, UID2) ((SUID1) ? (SUID1) == (SUID2) : (UID1) == (UID2))
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM)) #define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
#define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \ #define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \
((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE)) ((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE))
...@@ -260,7 +262,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ...@@ -260,7 +262,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx);
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast); int8_t cmprAlg, int8_t toLast);
...@@ -270,7 +272,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); ...@@ -270,7 +272,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk);
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk); int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
...@@ -660,6 +662,12 @@ typedef struct SMergeTree { ...@@ -660,6 +662,12 @@ typedef struct SMergeTree {
const char *idStr; const char *idStr;
} SMergeTree; } SMergeTree;
typedef struct {
int64_t suid;
int64_t uid;
STSchema *pTSchema;
} SSkmInfo;
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr); STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
......
...@@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* ...@@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq*
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t *tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
......
...@@ -589,7 +589,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -589,7 +589,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
} }
tMapDataReset(&state->blockMap); tMapDataReset(&state->blockMap);
code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap); code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap);
if (code) goto _err; if (code) goto _err;
state->nBlock = state->blockMap.nItem; state->nBlock = state->blockMap.nItem;
......
...@@ -14,13 +14,8 @@ ...@@ -14,13 +14,8 @@
*/ */
#include "tsdb.h" #include "tsdb.h"
typedef struct {
int64_t suid;
int64_t uid;
STSchema *pTSchema;
} SSkmInfo;
typedef enum { MEMORY_DATA_ITER = 0, LAST_DATA_ITER } EDataIterT; typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
typedef struct { typedef struct {
SRBTreeNode n; SRBTreeNode n;
...@@ -99,7 +94,7 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter); ...@@ -99,7 +94,7 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno); static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
static int32_t tsdbNextCommitRow(SCommitter *pCommitter); static int32_t tsdbNextCommitRow(SCommitter *pCommitter);
static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
SRowInfo *pInfo1 = (SRowInfo *)p1; SRowInfo *pInfo1 = (SRowInfo *)p1;
SRowInfo *pInfo2 = (SRowInfo *)p2; SRowInfo *pInfo2 = (SRowInfo *)p2;
...@@ -325,22 +320,22 @@ _err: ...@@ -325,22 +320,22 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid) { int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
int32_t code = 0; int32_t code = 0;
if (suid) { if (suid) {
if (pCommitter->skmTable.suid == suid) { if (pSkmInfo->suid == suid) {
pCommitter->skmTable.uid = uid; pSkmInfo->uid = uid;
goto _exit; goto _exit;
} }
} else { } else {
if (pCommitter->skmTable.uid == uid) goto _exit; if (pSkmInfo->uid == uid) goto _exit;
} }
pCommitter->skmTable.suid = suid; pSkmInfo->suid = suid;
pCommitter->skmTable.uid = uid; pSkmInfo->uid = uid;
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pSkmInfo->pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, -1, &pCommitter->skmTable.pTSchema); code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
if (code) goto _exit; if (code) goto _exit;
_exit: _exit:
...@@ -382,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { ...@@ -382,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
pCommitter->dReader.pBlockIdx = pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _exit; if (code) goto _exit;
ASSERT(pCommitter->dReader.mBlock.nItem > 0); ASSERT(pCommitter->dReader.mBlock.nItem > 0);
...@@ -432,7 +427,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -432,7 +427,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
int8_t iIter = 0; int8_t iIter = 0;
for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
pIter = &pCommitter->aDataIter[iIter]; pIter = &pCommitter->aDataIter[iIter];
pIter->type = LAST_DATA_ITER; pIter->type = STT_DATA_ITER;
pIter->iStt = iStt; pIter->iStt = iStt;
code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
...@@ -498,7 +493,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -498,7 +493,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->dReader.iBlockIdx = 0; pCommitter->dReader.iBlockIdx = 0;
if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _err; if (code) goto _err;
} else { } else {
pCommitter->dReader.pBlockIdx = NULL; pCommitter->dReader.pBlockIdx = NULL;
...@@ -556,46 +551,45 @@ _err: ...@@ -556,46 +551,45 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SDataBlk block;
ASSERT(pBlockData->nRow > 0); if (pBlockData->nRow == 0) return code;
tDataBlkReset(&block); SDataBlk dataBlk;
tDataBlkReset(&dataBlk);
// info // info
block.nRow += pBlockData->nRow; dataBlk.nRow += pBlockData->nRow;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
if (iRow == 0) { if (iRow == 0) {
if (tsdbKeyCmprFn(&block.minKey, &key) > 0) { if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
block.minKey = key; dataBlk.minKey = key;
} }
} else { } else {
if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
block.hasDup = 1; dataBlk.hasDup = 1;
} }
} }
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&block.maxKey, &key) < 0) { if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
block.maxKey = key; dataBlk.maxKey = key;
} }
block.minVer = TMIN(block.minVer, key.version); dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
block.maxVer = TMAX(block.maxVer, key.version); dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
} }
// write // write
block.nSubBlock++; dataBlk.nSubBlock++;
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1], code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0); ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
if (code) goto _err; if (code) goto _err;
// put SDataBlk // put SDataBlk
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutDataBlk); code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err; if (code) goto _err;
// clear // clear
...@@ -604,39 +598,38 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { ...@@ -604,39 +598,38 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
SSttBlk blockL; SSttBlk sstBlk;
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
ASSERT(pBlockData->nRow > 0); if (pBlockData->nRow == 0) return code;
// info // info
blockL.suid = pBlockData->suid; sstBlk.suid = pBlockData->suid;
blockL.nRow = pBlockData->nRow; sstBlk.nRow = pBlockData->nRow;
blockL.minKey = TSKEY_MAX; sstBlk.minKey = TSKEY_MAX;
blockL.maxKey = TSKEY_MIN; sstBlk.maxKey = TSKEY_MIN;
blockL.minVer = VERSION_MAX; sstBlk.minVer = VERSION_MAX;
blockL.maxVer = VERSION_MIN; sstBlk.maxVer = VERSION_MIN;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
blockL.minKey = TMIN(blockL.minKey, pBlockData->aTSKEY[iRow]); sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
blockL.maxKey = TMAX(blockL.maxKey, pBlockData->aTSKEY[iRow]); sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
blockL.minVer = TMIN(blockL.minVer, pBlockData->aVersion[iRow]); sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
blockL.maxVer = TMAX(blockL.maxVer, pBlockData->aVersion[iRow]); sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
} }
blockL.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0]; sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
// write // write
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1); code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
if (code) goto _err; if (code) goto _err;
// push SSttBlk // push SSttBlk
if (taosArrayPush(pCommitter->dWriter.aSttBlk, &blockL) == NULL) { if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -647,7 +640,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { ...@@ -647,7 +640,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -692,7 +685,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -692,7 +685,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
...@@ -1046,7 +1039,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { ...@@ -1046,7 +1039,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
break; break;
} }
} }
} else if (pCommitter->pIter->type == LAST_DATA_ITER) { // last file } else if (pCommitter->pIter->type == STT_DATA_ITER) { // last file
pIter->iRow++; pIter->iRow++;
if (pIter->iRow < pIter->bData.nRow) { if (pIter->iRow < pIter->bData.nRow) {
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
...@@ -1124,15 +1117,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1124,15 +1117,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBlockData->nRow >= pCommitter->maxRow) { if (pBlockData->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
if (pBlockData->nRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
}
return code; return code;
...@@ -1193,7 +1185,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1193,7 +1185,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBDataW->nRow >= pCommitter->maxRow) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1210,15 +1202,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1210,15 +1202,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBDataW->nRow >= pCommitter->maxRow) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
if (pBDataW->nRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
}
return code; return code;
...@@ -1306,10 +1296,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { ...@@ -1306,10 +1296,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
SBlockData *pBDatal = &pCommitter->dWriter.bDatal; SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
if (pBDatal->suid || pBDatal->uid) { if (pBDatal->suid || pBDatal->uid) {
if ((pBDatal->suid != id.suid) || (id.suid == 0)) { if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
if (pBDatal->nRow) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _exit; if (code) goto _exit;
}
tBlockDataReset(pBDatal); tBlockDataReset(pBDatal);
} }
} }
...@@ -1341,7 +1329,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { ...@@ -1341,7 +1329,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
if (code) goto _err; if (code) goto _err;
if (pBDatal->nRow >= pCommitter->maxRow) { if (pBDatal->nRow >= pCommitter->maxRow) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1393,10 +1381,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1393,10 +1381,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (pBData->nRow >= pCommitter->maxRow) { if (pBData->nRow >= pCommitter->maxRow) {
if (pCommitter->toLastOnly) { if (pCommitter->toLastOnly) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} else { } else {
code = tsdbCommitDataBlock(pCommitter); code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1404,7 +1393,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1404,7 +1393,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (!pCommitter->toLastOnly && pBData->nRow) { if (!pCommitter->toLastOnly && pBData->nRow) {
if (pBData->nRow > pCommitter->minRow) { if (pBData->nRow > pCommitter->minRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} else { } else {
code = tsdbAppendLastBlock(pCommitter); code = tsdbAppendLastBlock(pCommitter);
...@@ -1437,7 +1426,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1437,7 +1426,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
// impl // impl
code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
if (code) goto _err; if (code) goto _err;
code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
if (code) goto _err; if (code) goto _err;
...@@ -1455,7 +1444,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1455,7 +1444,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
// end // end
if (pCommitter->dWriter.mBlock.nItem > 0) { if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
...@@ -1470,10 +1459,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1470,10 +1459,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err; if (code) goto _err;
if (pCommitter->dWriter.bDatal.nRow > 0) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
code = tsdbCommitLastBlock(pCommitter); pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
}
return code; return code;
......
...@@ -231,10 +231,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -231,10 +231,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey; int64_t skey = pTsdbReader->window.skey;
info.lastKey = (skey > INT64_MIN)? (skey - 1):skey; info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
} else { } else {
int64_t ekey = pTsdbReader->window.ekey; int64_t ekey = pTsdbReader->window.ekey;
info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey; info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
} }
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
...@@ -601,7 +601,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN ...@@ -601,7 +601,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
tMapDataReset(&pScanInfo->mapData); tMapDataReset(&pScanInfo->mapData);
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
sizeInDisk += pScanInfo->mapData.nData; sizeInDisk += pScanInfo->mapData.nData;
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
...@@ -1933,7 +1933,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan ...@@ -1933,7 +1933,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader); initMemDataIterator(pScanInfo, pReader);
pLBlockReader->uid = pScanInfo->uid; pLBlockReader->uid = pScanInfo->uid;
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
STimeWindow w = pLBlockReader->window; STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) { if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKey + step; w.skey = pScanInfo->lastKey + step;
...@@ -3621,7 +3621,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -3621,7 +3621,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1; int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
resetDataBlockScanInfo(pReader->status.pTableMap, ts); resetDataBlockScanInfo(pReader->status.pTableMap, ts);
int32_t code = 0; int32_t code = 0;
......
...@@ -418,21 +418,21 @@ _err: ...@@ -418,21 +418,21 @@ _err:
return code; return code;
} }
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) { int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx) {
int32_t code = 0; int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->fHead; SHeadFile *pHeadFile = &pWriter->fHead;
int64_t size; int64_t size;
int64_t n; int64_t n;
ASSERT(mBlock->nItem > 0); ASSERT(mDataBlk->nItem > 0);
// alloc // alloc
size = tPutMapData(NULL, mBlock); size = tPutMapData(NULL, mDataBlk);
code = tRealloc(&pWriter->aBuf[0], size); code = tRealloc(&pWriter->aBuf[0], size);
if (code) goto _err; if (code) goto _err;
// build // build
n = tPutMapData(pWriter->aBuf[0], mBlock); n = tPutMapData(pWriter->aBuf[0], mDataBlk);
// write // write
code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size); code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
...@@ -446,7 +446,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc ...@@ -446,7 +446,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc
tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64
" size:%" PRId64 " nItem:%d", " size:%" PRId64 " nItem:%d",
TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid, TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid,
pBlockIdx->offset, pBlockIdx->size, mBlock->nItem); pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem);
return code; return code;
_err: _err:
...@@ -872,7 +872,7 @@ _err: ...@@ -872,7 +872,7 @@ _err:
return code; return code;
} }
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) { int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
int32_t code = 0; int32_t code = 0;
int64_t offset = pBlockIdx->offset; int64_t offset = pBlockIdx->offset;
int64_t size = pBlockIdx->size; int64_t size = pBlockIdx->size;
...@@ -886,7 +886,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl ...@@ -886,7 +886,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
if (code) goto _err; if (code) goto _err;
// decode // decode
int64_t n = tGetMapData(pReader->aBuf[0], mBlock); int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk);
if (n < 0) { if (n < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -1053,6 +1053,29 @@ _err: ...@@ -1053,6 +1053,29 @@ _err:
return code; return code;
} }
int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0;
SBlockInfo *pBlockInfo = &pDataBlk->aSubBlock[0];
// alloc
code = tRealloc(&pReader->aBuf[0], pBlockInfo->szBlock);
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock);
if (code) goto _err;
// decmpr
code = tDecmprBlockData(pReader->aBuf[0], pBlockInfo->szBlock, pBlockData, &pReader->aBuf[1]);
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
...@@ -1147,8 +1170,8 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb ...@@ -1147,8 +1170,8 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
pDelFWriter->fDel = *pFile; pDelFWriter->fDel = *pFile;
tsdbDelFileName(pTsdb, pFile, fname); tsdbDelFileName(pTsdb, pFile, fname);
code = int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE;
tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH); code = tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, flag, &pDelFWriter->pWriteH);
if (code) goto _err; if (code) goto _err;
// update header // update header
......
...@@ -16,6 +16,29 @@ ...@@ -16,6 +16,29 @@
#include "tsdb.h" #include "tsdb.h"
// STsdbSnapReader ======================================== // STsdbSnapReader ========================================
typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT;
typedef struct {
SRBTreeNode n;
SRowInfo rInfo;
EFIterT type;
union {
struct {
SArray* aBlockIdx;
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock;
int32_t iBlock;
}; // .data file
struct {
int32_t iStt;
SArray* aSttBlk;
int32_t iSttBlk;
}; // .stt file
};
SBlockData bData;
int32_t iRow;
} SFDataIter;
struct STsdbSnapReader { struct STsdbSnapReader {
STsdb* pTsdb; STsdb* pTsdb;
int64_t sver; int64_t sver;
...@@ -26,146 +49,301 @@ struct STsdbSnapReader { ...@@ -26,146 +49,301 @@ struct STsdbSnapReader {
int8_t dataDone; int8_t dataDone;
int32_t fid; int32_t fid;
SDataFReader* pDataFReader; SDataFReader* pDataFReader;
SArray* aBlockIdx; // SArray<SBlockIdx> SFDataIter* pIter;
SArray* aSstBlk; // SArray<SSttBlk> SRBTree rbt;
SBlockIdx* pBlockIdx; SFDataIter aFDataIter[TSDB_MAX_STT_FILE + 1];
SSttBlk* pSstBlk; SBlockData bData;
SSkmInfo skmTable;
int32_t iBlockIdx;
int32_t iBlockL;
SMapData mBlock; // SMapData<SDataBlk>
int32_t iBlock;
SBlockData oBlockData;
SBlockData nBlockData;
// for del file // for del file
int8_t delDone; int8_t delDone;
SDelFReader* pDelFReader; SDelFReader* pDelFReader;
SArray* aDelIdx; // SArray<SDelIdx> SArray* aDelIdx; // SArray<SDelIdx>
int32_t iDelIdx; int32_t iDelIdx;
SArray* aDelData; // SArray<SDelData> SArray* aDelData; // SArray<SDelData>
uint8_t* aBuf[5];
}; };
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { extern int32_t tRowInfoCmprFn(const void* p1, const void* p2);
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
while (true) {
if (pReader->pDataFReader == NULL) {
// next
SDFileSet dFileSet = {.fid = pReader->fid}; SDFileSet dFileSet = {.fid = pReader->fid};
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT); SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
if (pSet == NULL) goto _exit; if (pSet == NULL) return code;
pReader->fid = pSet->fid;
// load pReader->fid = pSet->fid;
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pTsdb, pSet); code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
if (code) goto _err; if (code) goto _err;
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); pReader->pIter = NULL;
tRBTreeCreate(&pReader->rbt, tRowInfoCmprFn);
// .data file
SFDataIter* pIter = &pReader->aFDataIter[0];
pIter->type = SNAP_DATA_FILE_ITER;
code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx);
if (code) goto _err; if (code) goto _err;
code = tsdbReadSttBlk(pReader->pDataFReader, 0, pReader->aSstBlk); for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) {
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
if (code) goto _err; if (code) goto _err;
// init for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
pReader->iBlockIdx = 0; SDataBlk dataBlk;
if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
if (code) goto _err; if (code) goto _err;
pReader->iBlock = 0; ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid);
} else { ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid);
pReader->pBlockIdx = NULL;
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
pIter->rInfo.suid = pIter->pBlockIdx->suid;
pIter->rInfo.uid = pIter->pBlockIdx->uid;
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
goto _add_iter_and_break;
}
}
} }
pReader->iBlockL = 0; continue;
while (true) {
if (pReader->iBlockL >= taosArrayGetSize(pReader->aSstBlk)) { _add_iter_and_break:
pReader->pSstBlk = NULL; tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
break; break;
} }
pReader->pSstBlk = (SSttBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL); // .stt file
if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { pIter = &pReader->aFDataIter[1];
// TODO for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
break; pIter->type = SNAP_STT_FILE_ITER;
pIter->iStt = iStt;
code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk);
if (code) goto _err;
for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
if (pSttBlk->minVer > pReader->ever) continue;
if (pSttBlk->maxVer < pReader->sver) continue;
code = tsdbReadSttBlock(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
if (code) goto _err;
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
pIter->rInfo.suid = pIter->bData.suid;
pIter->rInfo.uid = pIter->bData.uid;
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
goto _add_iter;
} }
}
}
continue;
pReader->iBlockL++; _add_iter:
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
pIter++;
} }
tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path, tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pReader->pTsdb->pVnode),
pReader->fid); pReader->pTsdb->path, pReader->fid);
return code;
_err:
tsdbError("vgId:%d vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode),
tstrerror(code));
return code;
}
static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) { return pReader->pIter ? &pReader->pIter->rInfo : NULL; }
static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
int32_t code = 0;
if (pReader->pIter) {
SFDataIter* pIter = pReader->pIter;
while (true) {
_find_row:
for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
goto _out;
}
} }
if (pIter->type == SNAP_DATA_FILE_ITER) {
while (true) { while (true) {
if (pReader->pBlockIdx && pReader->pSstBlk) { for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
TABLEID id = {.suid = pReader->pSstBlk->suid, .uid = pReader->pSstBlk->minUid}; SDataBlk dataBlk;
tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
ASSERT(0); if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
// if (tTABLEIDCmprFn(pReader->pBlockIdx, &minId) < 0) { code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
// // TODO if (code) goto _err;
// } else if (tTABLEIDCmprFn(pReader->pBlockIdx, &maxId) < 0) {
// // TODO
// } else {
// // TODO
// }
} else if (pReader->pBlockIdx) {
while (pReader->iBlock < pReader->mBlock.nItem) {
SDataBlk block;
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetDataBlk);
if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) { pIter->iRow = -1;
// load data (todo) goto _find_row;
} }
// next pIter->iBlockIdx++;
pReader->iBlock++; if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break;
if (*ppData) break;
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
if (code) goto _err;
pIter->iBlock = -1;
} }
if (pReader->iBlock >= pReader->mBlock.nItem) { pReader->pIter = NULL;
pReader->iBlockIdx++; } else if (pIter->type == SNAP_STT_FILE_ITER) {
if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue;
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); code = tsdbReadSttBlock(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData);
if (code) goto _err; if (code) goto _err;
pReader->iBlock = 0; pIter->iRow = -1;
goto _find_row;
}
pReader->pIter = NULL;
} else { } else {
pReader->pBlockIdx = NULL; ASSERT(0);
} }
} }
if (*ppData) goto _exit; _out:
} else if (pReader->pSstBlk) { pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
while (pReader->pSstBlk) { if (pReader->pIter && pIter) {
if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo);
// load data (todo) if (c > 0) {
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
pReader->pIter = NULL;
} else {
ASSERT(c);
}
}
} }
// next if (pReader->pIter == NULL) {
pReader->iBlockL++; pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) { if (pReader->pIter) {
pReader->pSstBlk = (SSttBlk*)taosArrayGetSize(pReader->aSstBlk); tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
} else { }
pReader->pSstBlk = NULL;
} }
if (*ppData) goto _exit; return code;
_err:
return code;
}
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
ASSERT(pReader->bData.nRow);
int32_t aBufN[5] = {0};
code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN);
if (code) goto _exit;
int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
} }
} else {
SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData;
pHdr->type = SNAP_DATA_TSDB;
pHdr->size = size;
memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]);
memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]);
if (aBufN[1]) {
memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]);
}
if (aBufN[0]) {
memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]);
}
_exit:
return code;
}
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
while (true) {
if (pReader->pDataFReader == NULL) {
code = tsdbSnapReadOpenFile(pReader);
if (code) goto _err;
}
if (pReader->pDataFReader == NULL) break;
SRowInfo* pRowInfo = tsdbSnapGetRow(pReader);
if (pRowInfo == NULL) {
tsdbDataFReaderClose(&pReader->pDataFReader);
continue;
}
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
SBlockData* pBlockData = &pReader->bData;
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
if (code) goto _err;
code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema);
if (code) goto _err;
while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid);
if (code) goto _err;
code = tsdbSnapNextRow(pReader);
if (code) goto _err;
pRowInfo = tsdbSnapGetRow(pReader);
if (pRowInfo == NULL) {
tsdbDataFReaderClose(&pReader->pDataFReader); tsdbDataFReaderClose(&pReader->pDataFReader);
break; break;
} }
if (pBlockData->nRow >= 4096) break;
} }
code = tsdbSnapCmprData(pReader, ppData);
if (code) goto _err;
break;
} }
_exit:
return code; return code;
_err: _err:
...@@ -216,7 +394,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -216,7 +394,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
size += tPutDelData(NULL, pDelData); size += tPutDelData(NULL, pDelData);
} }
} }
if (size == 0) continue; if (size == 0) continue;
// org data // org data
...@@ -292,23 +469,33 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type ...@@ -292,23 +469,33 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
goto _err; goto _err;
} }
// data
pReader->fid = INT32_MIN; pReader->fid = INT32_MIN;
pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
if (pReader->aBlockIdx == NULL) { SFDataIter* pIter = &pReader->aFDataIter[iIter];
if (iIter == 0) {
pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pIter->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pReader->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); } else {
if (pReader->aSstBlk == NULL) { pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pReader->mBlock = tMapDataInit(); }
code = tBlockDataCreate(&pReader->oBlockData);
code = tBlockDataCreate(&pIter->bData);
if (code) goto _err; if (code) goto _err;
code = tBlockDataCreate(&pReader->nBlockData); }
code = tBlockDataCreate(&pReader->bData);
if (code) goto _err; if (code) goto _err;
// del
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
if (pReader->aDelIdx == NULL) { if (pReader->aDelIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -335,18 +522,26 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -335,18 +522,26 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
STsdbSnapReader* pReader = *ppReader; STsdbSnapReader* pReader = *ppReader;
if (pReader->pDataFReader) { // data
tsdbDataFReaderClose(&pReader->pDataFReader); if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader);
for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
SFDataIter* pIter = &pReader->aFDataIter[iIter];
if (iIter == 0) {
taosArrayDestroy(pIter->aBlockIdx);
tMapDataClear(&pIter->mBlock);
} else {
taosArrayDestroy(pIter->aSttBlk);
} }
taosArrayDestroy(pReader->aSstBlk);
taosArrayDestroy(pReader->aBlockIdx);
tMapDataClear(&pReader->mBlock);
tBlockDataDestroy(&pReader->oBlockData, 1);
tBlockDataDestroy(&pReader->nBlockData, 1);
if (pReader->pDelFReader) { tBlockDataDestroy(&pIter->bData, 1);
tsdbDelFReaderClose(&pReader->pDelFReader);
} }
tBlockDataDestroy(&pReader->bData, 1);
tTSchemaDestroy(pReader->skmTable.pTSchema);
// del
if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
taosArrayDestroy(pReader->aDelIdx); taosArrayDestroy(pReader->aDelIdx);
taosArrayDestroy(pReader->aDelData); taosArrayDestroy(pReader->aDelData);
...@@ -354,6 +549,10 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -354,6 +549,10 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path); tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
tFree(pReader->aBuf[iBuf]);
}
taosMemoryFree(pReader); taosMemoryFree(pReader);
*ppReader = NULL; *ppReader = NULL;
return code; return code;
...@@ -416,34 +615,31 @@ struct STsdbSnapWriter { ...@@ -416,34 +615,31 @@ struct STsdbSnapWriter {
int32_t maxRow; int32_t maxRow;
int8_t cmprAlg; int8_t cmprAlg;
int64_t commitID; int64_t commitID;
uint8_t* aBuf[5]; uint8_t* aBuf[5];
// for data file // for data file
SBlockData bData; SBlockData bData;
int32_t fid; int32_t fid;
SDataFReader* pDataFReader; TABLEID id;
SArray* aBlockIdx; // SArray<SBlockIdx> SSkmInfo skmTable;
struct {
SDataFReader* pReader;
SArray* aBlockIdx;
int32_t iBlockIdx; int32_t iBlockIdx;
SBlockIdx* pBlockIdx; SBlockIdx* pBlockIdx;
SMapData mBlock; // SMapData<SDataBlk> SMapData mDataBlk;
int32_t iBlock; int32_t iDataBlk;
SBlockData* pBlockData; SBlockData bData;
int32_t iRow; int32_t iRow;
SBlockData bDataR; } dReader;
SArray* aSstBlk; // SArray<SSttBlk> struct {
int32_t iBlockL; SDataFWriter* pWriter;
SBlockData lDataR; SArray* aBlockIdx;
SMapData mDataBlk;
SDataFWriter* pDataFWriter; SArray* aSttBlk;
SBlockIdx* pBlockIdxW; // NULL when no committing table SBlockData bData;
SDataBlk blockW; SBlockData sData;
SBlockData bDataW; } dWriter;
SBlockIdx blockIdxW;
SMapData mBlockW; // SMapData<SDataBlk>
SArray* aBlockIdxW; // SArray<SBlockIdx>
SArray* aBlockLW; // SArray<SSttBlk>
// for del file // for del file
SDelFReader* pDelFReader; SDelFReader* pDelFReader;
...@@ -454,520 +650,447 @@ struct STsdbSnapWriter { ...@@ -454,520 +650,447 @@ struct STsdbSnapWriter {
SArray* aDelIdxW; SArray* aDelIdxW;
}; };
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { // SNAP_DATA_TSDB
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
ASSERT(pWriter->pDataFWriter); ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow);
if (pWriter->pBlockIdxW == NULL) goto _exit; if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);
// consume remain rows code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
if (pWriter->pBlockData) { if (code) goto _exit;
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
while (pWriter->iRow < pWriter->pBlockData->nRow) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL,
0); // todo
if (code) goto _err;
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { pWriter->dReader.iBlockIdx++;
// pWriter->blockW.last = 0; } else {
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, pWriter->dReader.pBlockIdx = NULL;
// &pWriter->blockW, pWriter->cmprAlg); tMapDataReset(&pWriter->dReader.mDataBlk);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
if (code) goto _err;
tDataBlkReset(&pWriter->blockW);
tBlockDataClear(&pWriter->bDataW);
} }
pWriter->dReader.iDataBlk = 0; // point to the next one
tBlockDataReset(&pWriter->dReader.bData);
pWriter->dReader.iRow = 0;
pWriter->iRow++; _exit:
} return code;
} }
// write remain data if has static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
if (pWriter->bDataW.nRow > 0) { int32_t code = 0;
// pWriter->blockW.last = 0;
if (pWriter->bDataW.nRow < pWriter->minRow) {
if (pWriter->iBlock > pWriter->mBlock.nItem) {
// pWriter->blockW.last = 1;
}
}
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, while (true) {
// &pWriter->blockW, pWriter->cmprAlg); if (pWriter->dReader.pBlockIdx == NULL) break;
// if (code) goto _err; if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx;
if (code) goto _err; code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx);
} if (code) goto _exit;
while (true) { if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
SDataBlk block; code = tsdbSnapNextTableData(pWriter);
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); if (code) goto _exit;
}
// if (block.last) { _exit:
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); return code;
// if (code) goto _err; }
// tBlockReset(&block); static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
// block.last = 1; int32_t code = 0;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
// pWriter->cmprAlg);
// if (code) goto _err;
// }
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); code = tsdbSnapWriteCopyData(pWriter, pId);
if (code) goto _err; if (code) goto _err;
pWriter->iBlock++; pWriter->id.suid = pId->suid;
} pWriter->id.uid = pId->uid;
// SDataBlk code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
// code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); if (code) goto _err;
// if (code) goto _err;
// SBlockIdx tMapDataReset(&pWriter->dWriter.mDataBlk);
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) { code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema);
code = TSDB_CODE_OUT_OF_MEMORY; if (code) goto _err;
goto _err;
}
_exit:
tsdbInfo("vgId:%d, tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock); if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code;
int32_t c = 1;
if (pWriter->dReader.pBlockIdx) {
c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id);
ASSERT(c >= 0);
}
if (c == 0) {
SBlockData* pBData = &pWriter->dWriter.bData;
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid);
if (code) goto _err; if (code) goto _err;
// SBlockData if (pBData->nRow >= pWriter->maxRow) {
SDataBlk block; code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
tMapDataReset(&pWriter->mBlockW); if (code) goto _err;
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { }
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetDataBlk); }
// if (block.last) { code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
// code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); if (code) goto _err;
// if (code) goto _err;
// tBlockReset(&block); for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
// block.last = 1; SDataBlk dataBlk;
// code = tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
// tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block,
// pWriter->cmprAlg);
// if (code) goto _err;
// }
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err; if (code) goto _err;
} }
// SDataBlk code = tsdbSnapNextTableData(pWriter);
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx);
if (code) goto _err; if (code) goto _err;
}
// SBlockIdx if (pWriter->dWriter.mDataBlk.nItem) {
if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid};
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
}
pWriter->id.suid = 0;
pWriter->id.uid = 0;
_exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d, tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
int32_t code = 0; int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData; STsdb* pTsdb = pWriter->pTsdb;
int32_t iRow = 0;
TSDBROW row;
TSDBROW* pRow = &row;
// // correct schema
// code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
// if (code) goto _err;
// loop to merge
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
while (true) {
if (pRow == NULL) break;
if (pWriter->pBlockData) {
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow)); ASSERT(pWriter->dWriter.pWriter == NULL);
ASSERT(c); pWriter->fid = fid;
pWriter->id = (TABLEID){0};
SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
if (c < 0) { // Reader
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); if (pSet) {
// if (code) goto _err; code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet);
if (code) goto _err;
iRow++; code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx);
if (iRow < pWriter->pBlockData->nRow) { if (code) goto _err;
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
} else { } else {
pRow = NULL; ASSERT(pWriter->dReader.pReader == NULL);
taosArrayClear(pWriter->dReader.aBlockIdx);
} }
} else if (c > 0) { pWriter->dReader.iBlockIdx = 0; // point to the next one
// code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), code = tsdbSnapNextTableData(pWriter);
// NULL); if (code) goto _err; if (code) goto _err;
pWriter->iRow++; // Writer
if (pWriter->iRow >= pWriter->pBlockData->nRow) { SHeadFile fHead = {.commitID = pWriter->commitID};
pWriter->pBlockData = NULL; SDataFile fData = {.commitID = pWriter->commitID};
} SSmaFile fSma = {.commitID = pWriter->commitID};
SSttFile fStt = {.commitID = pWriter->commitID};
SDFileSet wSet = {.fid = pWriter->fid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
if (pSet) {
wSet.diskId = pSet->diskId;
fData = *pSet->pDataF;
fSma = *pSet->pSmaF;
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
wSet.aSttF[iStt] = pSet->aSttF[iStt];
} }
wSet.nSttF = pSet->nSttF + 1; // TODO: fix pSet->nSttF == pTsdb->maxFile
} else { } else {
TSDBKEY key = TSDBROW_KEY(pRow); SDiskID did = {0};
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
while (true) { tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
if (pWriter->iBlock >= pWriter->mBlock.nItem) break; wSet.diskId = did;
wSet.nSttF = 1;
SDataBlk block; }
int32_t c; wSet.aSttF[wSet.nSttF - 1] = &fStt;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk);
// if (block.last) {
// pWriter->pBlockData = &pWriter->bDataR;
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet);
// NULL); if (code) goto _err; pWriter->iRow = 0; if (code) goto _err;
taosArrayClear(pWriter->dWriter.aBlockIdx);
tMapDataReset(&pWriter->dWriter.mDataBlk);
taosArrayClear(pWriter->dWriter.aSttBlk);
tBlockDataReset(&pWriter->dWriter.bData);
tBlockDataReset(&pWriter->dWriter.sData);
// pWriter->iBlock++; return code;
// break;
// }
c = tsdbKeyCmprFn(&block.maxKey, &key); _err:
return code;
}
ASSERT(c); static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) {
int32_t code = 0;
if (c < 0) { ASSERT(pWriter->dWriter.pWriter);
if (pWriter->bDataW.nRow) {
// pWriter->blockW.last = 0;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
tDataBlkReset(&pWriter->blockW); // copy remain table data
tBlockDataClear(&pWriter->bDataW); TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
} code = tsdbSnapWriteCopyData(pWriter, &id);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk);
if (code) goto _err; if (code) goto _err;
pWriter->iBlock++; code =
} else { tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey); if (code) goto _err;
ASSERT(c);
if (c > 0) {
pWriter->pBlockData = &pWriter->bDataR;
// code =
// tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL,
// NULL);
// if (code) goto _err;
pWriter->iRow = 0;
pWriter->iBlock++;
}
break;
}
}
if (pWriter->pBlockData) continue;
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); // Indices
// if (code) goto _err; code = tsdbWriteBlockIdx(pWriter->dWriter.pWriter, pWriter->dWriter.aBlockIdx);
if (code) goto _err;
iRow++; code = tsdbWriteSttBlk(pWriter->dWriter.pWriter, pWriter->dWriter.aSttBlk);
if (iRow < pBlockData->nRow) { if (code) goto _err;
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
} else {
pRow = NULL;
}
}
_check_write: code = tsdbUpdateDFileSetHeader(pWriter->dWriter.pWriter);
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; if (code) goto _err;
_write_block: code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->dWriter.pWriter->wSet);
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, if (code) goto _err;
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); code = tsdbDataFWriterClose(&pWriter->dWriter.pWriter, 1);
if (code) goto _err; if (code) goto _err;
tDataBlkReset(&pWriter->blockW); if (pWriter->dReader.pReader) {
tBlockDataClear(&pWriter->bDataW); code = tsdbDataFReaderClose(&pWriter->dReader.pReader);
if (code) goto _err;
} }
_exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d, vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
int32_t code = 0; int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
// end last table write if should SBlockData* pBData = &pWriter->bData;
if (pWriter->pBlockIdxW) { TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]};
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
TSDBKEY key = TSDBROW_KEY(&row);
*done = 0;
while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow ||
pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) {
// Merge row by row
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
TSDBKEY tKey = TSDBROW_KEY(&trow);
ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid);
int32_t c = tsdbKeyCmprFn(&key, &tKey);
if (c < 0) { if (c < 0) {
// end code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
// reset
pWriter->pBlockIdxW = NULL;
} else if (c > 0) { } else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid);
if (code) goto _err;
} else {
ASSERT(0); ASSERT(0);
} }
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg);
if (code) goto _err;
} }
// start new table data write if need if (c < 0) {
if (pWriter->pBlockIdxW == NULL) { *done = 1;
// write table data ahead goto _exit;
while (true) { }
if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; }
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); // Merge row by block
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); SDataBlk tDataBlk = {.minKey = key, .maxKey = key};
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
SDataBlk dataBlk;
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
if (c >= 0) break; int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk);
if (c < 0) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg);
if (code) goto _err;
code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx); code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err;
} else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
if (code) goto _err; if (code) goto _err;
pWriter->iBlockIdx++; if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg);
if (code) goto _err;
} }
// reader *done = 1;
pWriter->pBlockIdx = NULL; goto _exit;
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { } else {
ASSERT(pWriter->pDataFReader); code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
if (code) goto _err;
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); pWriter->dReader.iRow = 0;
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
ASSERT(c >= 0);
if (c == 0) { pWriter->dReader.iDataBlk++;
pWriter->pBlockIdx = pBlockIdx; break;
pWriter->iBlockIdx++; }
} }
} }
if (pWriter->pBlockIdx) { _exit:
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock); return code;
_err:
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
int32_t code = 0;
TABLEID id = {.suid = pWriter->bData.suid,
.uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]};
TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow);
SBlockData* pBData = &pWriter->dWriter.sData;
if (pBData->suid || pBData->uid) {
if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} else {
tMapDataReset(&pWriter->mBlock); pBData->suid = 0;
pBData->uid = 0;
}
} }
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
// writer if (pBData->suid == 0 && pBData->uid == 0) {
pWriter->pBlockIdxW = &pWriter->blockIdxW; code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
pWriter->pBlockIdxW->suid = id.suid; if (code) goto _err;
pWriter->pBlockIdxW->uid = id.uid;
tDataBlkReset(&pWriter->blockW); code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema);
tBlockDataReset(&pWriter->bDataW); if (code) goto _err;
tMapDataReset(&pWriter->mBlockW);
} }
ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); code = tBlockDataAppendRow(pBData, &row, NULL, id.uid);
ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); if (code) goto _err;
code = tsdbSnapWriteTableDataImpl(pWriter); if (pBData->nRow >= pWriter->maxRow) {
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
if (code) goto _err; if (code) goto _err;
}
_exit: _exit:
tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDataFWriter == NULL) goto _exit; SBlockData* pBlockData = &pWriter->bData;
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};
// finish current table // End last table data write if need
if (tTABLEIDCmprFn(&pWriter->id, &id) != 0) {
code = tsdbSnapWriteTableDataEnd(pWriter); code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
// move remain table
while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx));
if (code) goto _err;
pWriter->iBlockIdx++;
} }
// write remain stuff // Start new table data write if need
if (taosArrayGetSize(pWriter->aBlockLW) > 0) { if (pWriter->id.suid == 0 && pWriter->id.uid == 0) {
code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW); code = tsdbSnapWriteTableDataStart(pWriter, &id);
if (code) goto _err; if (code) goto _err;
} }
if (taosArrayGetSize(pWriter->aBlockIdx) > 0) { // Merge with .data file data
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW); int8_t done = 0;
if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) {
code = tsdbSnapWriteToDataFile(pWriter, iRow, &done);
if (code) goto _err; if (code) goto _err;
} }
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); // Append to the .stt data block (todo: check if need to set/reload sst block)
if (code) goto _err; if (!done) {
code = tsdbSnapWriteToSttFile(pWriter, iRow);
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
if (code) goto _err;
if (pWriter->pDataFReader) {
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
if (code) goto _err; if (code) goto _err;
} }
_exit: _exit:
tsdbInfo("vgId:%d, vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
int64_t n;
// decode
SBlockData* pBlockData = &pWriter->bData; SBlockData* pBlockData = &pWriter->bData;
code = tDecmprBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pHdr->size - sizeof(TABLEID), pBlockData,
pWriter->aBuf);
if (code) goto _err;
// open file // Decode data
TSDBKEY keyFirst = {.version = pBlockData->aVersion[0], .ts = pBlockData->aTSKEY[0]}; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TSDBKEY keyLast = {.version = pBlockData->aVersion[pBlockData->nRow - 1], code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf);
.ts = pBlockData->aTSKEY[pBlockData->nRow - 1]};
int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
// end last file data write if need
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
pWriter->fid = fid; ASSERT(pBlockData->nRow > 0);
// read // Loop to handle each row
SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
if (pSet) { TSKEY ts = pBlockData->aTSKEY[iRow];
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision);
if (code) goto _err;
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) {
if (code) goto _err; if (pWriter->dWriter.pWriter) {
ASSERT(fid > pWriter->fid);
code = tsdbReadSttBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk); code = tsdbSnapWriteCloseFile(pWriter);
if (code) goto _err; if (code) goto _err;
} else {
ASSERT(pWriter->pDataFReader == NULL);
taosArrayClear(pWriter->aBlockIdx);
taosArrayClear(pWriter->aSstBlk);
}
pWriter->iBlockIdx = 0;
pWriter->pBlockIdx = NULL;
tMapDataReset(&pWriter->mBlock);
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
pWriter->iBlockL = 0;
tBlockDataReset(&pWriter->bDataR);
tBlockDataReset(&pWriter->lDataR);
// write
SHeadFile fHead;
SDataFile fData;
SSttFile fLast;
SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSttF[0] = &fLast, .pSmaF = &fSma};
if (pSet) {
wSet.diskId = pSet->diskId;
wSet.fid = fid;
wSet.nSttF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = *pSet->pDataF;
fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0};
fSma = *pSet->pSmaF;
} else {
wSet.diskId = (SDiskID){.level = 0, .id = 0};
wSet.fid = fid;
wSet.nSttF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
} }
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); code = tsdbSnapWriteOpenFile(pWriter, fid);
if (code) goto _err; if (code) goto _err;
taosArrayClear(pWriter->aBlockIdxW);
taosArrayClear(pWriter->aBlockLW);
tMapDataReset(&pWriter->mBlockW);
pWriter->pBlockIdxW = NULL;
tBlockDataReset(&pWriter->bDataW);
} }
code = tsdbSnapWriteTableData(pWriter, id); code = tsdbSnapWriteRowData(pWriter, iRow);
if (code) goto _err; if (code) goto _err;
}
tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow);
return code; return code;
_err: _err:
...@@ -976,10 +1099,41 @@ _err: ...@@ -976,10 +1099,41 @@ _err:
return code; return code;
} }
// SNAP_DATA_DEL
static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0;
while (true) {
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
if (tTABLEIDCmprFn(pDelIdx, pId) >= 0) break;
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
if (code) goto _exit;
SDelIdx delIdx = *pDelIdx;
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _exit;
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pWriter->iDelIdx++;
}
_exit:
return code;
}
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
// Open del file if not opened yet
if (pWriter->pDelFWriter == NULL) { if (pWriter->pDelFWriter == NULL) {
SDelFile* pDelFile = pWriter->fs.pDelFile; SDelFile* pDelFile = pWriter->fs.pDelFile;
...@@ -990,38 +1144,28 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 ...@@ -990,38 +1144,28 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR); code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
if (code) goto _err; if (code) goto _err;
} else {
taosArrayClear(pWriter->aDelIdxR);
} }
pWriter->iDelIdx = 0;
// writer // writer
SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0}; SDelFile delFile = {.commitID = pWriter->commitID};
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
if (code) goto _err; if (code) goto _err;
taosArrayClear(pWriter->aDelIdxW);
} }
// process the del data SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); TABLEID id = *(TABLEID*)pHdr->data;
while (true) {
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
if (tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) >= 0) break;
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
if (code) goto _err;
SDelIdx delIdx = *pDelIdx; // Move write data < id
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx); code = tsdbSnapMoveWriteDelData(pWriter, &id);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) { // Merge incoming data with current
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->iDelIdx++;
}
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) && if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) { tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
...@@ -1055,7 +1199,6 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 ...@@ -1055,7 +1199,6 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
goto _err; goto _err;
} }
_exit:
return code; return code;
_err: _err:
...@@ -1068,24 +1211,15 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -1068,24 +1211,15 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) goto _exit; if (pWriter->pDelFWriter == NULL) return code;
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
code = tsdbSnapMoveWriteDelData(pWriter, &id);
if (code) goto _err; if (code) goto _err;
SDelIdx delIdx = *pDelIdx; code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW);
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
if (code) goto _err; if (code) goto _err;
...@@ -1100,7 +1234,6 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -1100,7 +1234,6 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
if (code) goto _err; if (code) goto _err;
} }
_exit:
tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path); tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
...@@ -1110,6 +1243,7 @@ _err: ...@@ -1110,6 +1243,7 @@ _err:
return code; return code;
} }
// APIs
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
int32_t code = 0; int32_t code = 0;
STsdbSnapWriter* pWriter = NULL; STsdbSnapWriter* pWriter = NULL;
...@@ -1135,39 +1269,38 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1135,39 +1269,38 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pWriter->commitID = pTsdb->pVnode->state.commitID; pWriter->commitID = pTsdb->pVnode->state.commitID;
// for data file // SNAP_DATA_TSDB
code = tBlockDataCreate(&pWriter->bData); code = tBlockDataCreate(&pWriter->bData);
if (code) goto _err; if (code) goto _err;
pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->aBlockIdx == NULL) { pWriter->fid = INT32_MIN;
pWriter->id = (TABLEID){0};
// Reader
pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->dReader.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
code = tBlockDataCreate(&pWriter->bDataR); code = tBlockDataCreate(&pWriter->dReader.bData);
if (code) goto _err; if (code) goto _err;
pWriter->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); // Writer
if (pWriter->aSstBlk == NULL) { pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->dWriter.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx)); if (pWriter->dWriter.aSttBlk == NULL) {
if (pWriter->aBlockIdxW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
code = tBlockDataCreate(&pWriter->bDataW); code = tBlockDataCreate(&pWriter->dWriter.bData);
if (code) goto _err;
code = tBlockDataCreate(&pWriter->dWriter.sData);
if (code) goto _err; if (code) goto _err;
pWriter->aBlockLW = taosArrayInit(0, sizeof(SSttBlk)); // SNAP_DATA_DEL
if (pWriter->aBlockLW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// for del file
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxR == NULL) { if (pWriter->aDelIdxR == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1188,6 +1321,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1188,6 +1321,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code)); tstrerror(code));
...@@ -1198,14 +1332,17 @@ _err: ...@@ -1198,14 +1332,17 @@ _err:
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
STsdbSnapWriter* pWriter = *ppWriter; STsdbSnapWriter* pWriter = *ppWriter;
STsdb* pTsdb = pWriter->pTsdb;
if (rollback) { if (rollback) {
ASSERT(0); ASSERT(0);
// code = tsdbFSRollback(pWriter->pTsdb->pFS); // code = tsdbFSRollback(pWriter->pTsdb->pFS);
// if (code) goto _err; // if (code) goto _err;
} else { } else {
code = tsdbSnapWriteDataEnd(pWriter); if (pWriter->dWriter.pWriter) {
code = tsdbSnapWriteCloseFile(pWriter);
if (code) goto _err; if (code) goto _err;
}
code = tsdbSnapWriteDelEnd(pWriter); code = tsdbSnapWriteDelEnd(pWriter);
if (code) goto _err; if (code) goto _err;
...@@ -1213,14 +1350,44 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { ...@@ -1213,14 +1350,44 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs); code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs);
if (code) goto _err; if (code) goto _err;
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs); code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs);
if (code) goto _err; if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
} }
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
}
// SNAP_DATA_DEL
taosArrayDestroy(pWriter->aDelIdxW);
taosArrayDestroy(pWriter->aDelData);
taosArrayDestroy(pWriter->aDelIdxR);
// SNAP_DATA_TSDB
// Writer
tBlockDataDestroy(&pWriter->dWriter.sData, 1);
tBlockDataDestroy(&pWriter->dWriter.bData, 1);
taosArrayDestroy(pWriter->dWriter.aSttBlk);
tMapDataClear(&pWriter->dWriter.mDataBlk);
taosArrayDestroy(pWriter->dWriter.aBlockIdx);
// Reader
tBlockDataDestroy(&pWriter->dReader.bData, 1);
tMapDataClear(&pWriter->dReader.mDataBlk);
taosArrayDestroy(pWriter->dReader.aBlockIdx);
tBlockDataDestroy(&pWriter->bData, 1);
tTSchemaDestroy(pWriter->skmTable.pTSchema);
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
tFree(pWriter->aBuf[iBuf]); tFree(pWriter->aBuf[iBuf]);
} }
tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
...@@ -1245,8 +1412,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1245,8 +1412,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
goto _exit; goto _exit;
} else { } else {
if (pWriter->pDataFWriter) { if (pWriter->dWriter.pWriter) {
code = tsdbSnapWriteDataEnd(pWriter); code = tsdbSnapWriteCloseFile(pWriter);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1259,7 +1426,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1259,7 +1426,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
_exit: _exit:
tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
......
...@@ -354,7 +354,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -354,7 +354,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} break; } break;
case SNAP_DATA_TSDB: { case SNAP_DATA_TSDB:
case SNAP_DATA_DEL: {
// tsdb // tsdb
if (pWriter->pTsdbSnapWriter == NULL) { if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
......
...@@ -221,6 +221,7 @@ ...@@ -221,6 +221,7 @@
./test.sh -f tsim/table/describe.sim ./test.sh -f tsim/table/describe.sim
./test.sh -f tsim/table/double.sim ./test.sh -f tsim/table/double.sim
./test.sh -f tsim/table/float.sim ./test.sh -f tsim/table/float.sim
./test.sh -f tsim/table/hash.sim
./test.sh -f tsim/table/int.sim ./test.sh -f tsim/table/int.sim
./test.sh -f tsim/table/limit.sim ./test.sh -f tsim/table/limit.sim
./test.sh -f tsim/table/smallint.sim ./test.sh -f tsim/table/smallint.sim
......
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
#define TSDB_DATA_INT_NULL 0x80000000LL
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000LL
void abs_max(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
printf("abs_max input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 5) {
r=*(long *)dataOutput;
*numOfOutput=0;
for(i=0;i<numOfRows;++i) {
if (*((long *)data + i) == TSDB_DATA_BIGINT_NULL) {
continue;
}
*numOfOutput=1;
long v = abs(*((long *)data + i));
if (v > r) {
r = v;
}
}
*(long *)dataOutput=r;
printf("abs_max out, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput);
}
}
void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
int i;
int r = 0;
printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
*numOfOutput=1;
printf("abs_max finalize, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput);
}
void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int r = 0;
if (numOfRows > 0) {
r = *((long *)data);
}
printf("abs_max_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
for (int i = 1; i < numOfRows; ++i) {
printf("abs_max_merge %d - %ld\n", i, *((long *)data + i));
if (*((long*)data + i) > r) {
r= *((long*)data + i);
}
}
*(long*)dataOutput=r;
if (numOfRows > 0) {
*numOfOutput=1;
} else {
*numOfOutput=0;
}
printf("abs_max_merge, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput);
}
int abs_max_init(SUdfInit* buf) {
printf("abs_max init\n");
return 0;
}
void abs_max_destroy(SUdfInit* buf) {
printf("abs_max destroy\n");
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
for(i=0;i<numOfRows;++i) {
printf("input %d - %d", i, *((int *)data + i));
*((int *)dataOutput+i)=*((int *)data + i) + 1;
printf(", output %d\n", *((int *)dataOutput+i));
if (tsOutput) {
*(long long*)tsOutput=1000000;
}
}
*numOfOutput=numOfRows;
printf("add_one out, numOfOutput:%d\n", *numOfOutput);
}
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
typedef struct SDemo{
double sum;
int num;
short otype;
}SDemo;
#define FLOAT_NULL 0x7FF00000 // it is an NAN
#define DOUBLE_NULL 0x7FFFFF0000000000LL // it is an NAN
void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
double r = 0;
SDemo *p = (SDemo *)interBuf;
SDemo *q = (SDemo *)dataOutput;
printf("demo input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, interBUf:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, interBuf, tsOutput, numOfOutput, buf);
for(i=0;i<numOfRows;++i) {
if (itype == 4) {
r=*((int *)data+i);
} else if (itype == 6) {
r=*((float *)data+i);
} else if (itype == 7) {
r=*((double *)data+i);
}
p->sum += r*r;
}
p->otype = otype;
p->num += numOfRows;
q->sum = p->sum;
q->num = p->num;
q->otype = p->otype;
*numOfOutput=1;
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}
void demo_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int i;
SDemo *p = (SDemo *)data;
SDemo res = {0};
printf("demo_merge input data:%p, rows:%d, dataoutput:%p, numOfOutput:%p, buf:%p\n", data, numOfRows, dataOutput, numOfOutput, buf);
for(i=0;i<numOfRows;++i) {
res.sum += p->sum * p->sum;
res.num += p->num;
p++;
}
p->sum = res.sum;
p->num = res.num;
*numOfOutput=1;
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}
void demo_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
SDemo *p = (SDemo *)interBuf;
printf("demo_finalize interbuf:%p, numOfOutput:%p, buf:%p, sum:%f, num:%d\n", interBuf, numOfOutput, buf, p->sum, p->num);
if (p->otype == 6) {
if (p->num != 30000) {
*(unsigned int *)dataOutput = FLOAT_NULL;
} else {
*(float *)dataOutput = (float)(p->sum / p->num);
}
printf("finalize values:%f\n", *(float *)dataOutput);
} else if (p->otype == 7) {
if (p->num != 30000) {
*(unsigned long long *)dataOutput = DOUBLE_NULL;
} else {
*(double *)dataOutput = (double)(p->sum / p->num);
}
printf("finalize values:%f\n", *(double *)dataOutput);
}
*numOfOutput=1;
printf("demo finalize, numOfOutput:%d\n", *numOfOutput);
}
int demo_init(SUdfInit* buf) {
printf("demo init\n");
return 0;
}
void demo_destroy(SUdfInit* buf) {
printf("demo destroy\n");
}
funcName = "test"
global = {}
function test_init()
return global
end
function test_add(rows, ans, key)
t = {}
t["sum"] = 0.0
t["num"] = 0
for i=1, #rows do
t["sum"] = t["sum"] + rows[i] * rows[i]
end
t["num"] = #rows
if (ans[key] ~= nil)
then
ans[key]["sum"] = ans[key]["sum"] + t["sum"]
ans[key]["num"] = ans[key]["num"] + t["num"]
else
ans[key] = t
end
return ans;
end
function test_finalize(ans, key)
local ret = 0.0
if (ans[key] ~= nil and ans[key]["num"] == 30000)
then
ret = ans[key]["sum"]/ans[key]["num"]
ans[key]["sum"] = 0.0
ans[key]["num"] = 0
else
ret = inf
end
return ret, ans
end
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
#define TSDB_DATA_INT_NULL 0x80000000LL
void sum_double(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
printf("sum_double input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
r=*(int *)dataOutput;
*numOfOutput=0;
for(i=0;i<numOfRows;++i) {
if (*((int *)data + i) == TSDB_DATA_INT_NULL) {
continue;
}
*numOfOutput=1;
r+=*((int *)data + i);
*(int *)dataOutput=r;
}
printf("sum_double out, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
}
}
void sum_double_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
int i;
int r = 0;
printf("sum_double_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
*numOfOutput=1;
*(int*)(buf->ptr)=*(int*)dataOutput*2;
*(int*)dataOutput=*(int*)(buf->ptr);
printf("sum_double finalize, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
}
void sum_double_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int r = 0;
int sum = 0;
printf("sum_double_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
for (int i = 0; i < numOfRows; ++i) {
printf("sum_double_merge %d - %d\n", i, *((int*)data + i));
sum +=*((int*)data + i);
}
*(int*)dataOutput+=sum;
if (numOfRows > 0) {
*numOfOutput=1;
} else {
*numOfOutput=0;
}
printf("sum_double_merge, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
}
int sum_double_init(SUdfInit* buf) {
buf->maybe_null=1;
buf->ptr = taosMemoryMalloc(sizeof(int));
printf("sum_double init\n");
return 0;
}
void sum_double_destroy(SUdfInit* buf) {
taosMemoryFree(buf->ptr);
printf("sum_double destroy\n");
}
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
#=========== prepare
#sql create database d1 vgroups 2
sql create database d1 vgroups 2 table_prefix 3 table_suffix 2
sql select * from information_schema.ins_databases
print $data(d1)[27] $data(d1)[28]
if $data(d1)[27] != 3 then
return -1
endi
if $data(d1)[28] != 2 then
return -1
endi
sql use d1;
sql create table st (ts timestamp, i int) tags (j int);
sql create table st_ct_1 using st tags(3) st_ct_2 using st tags(4) st_ct_3 using st tags(5) st_ct_4 using st tags(6) st_ct_5 using st tags(7)
sql insert into st_ct_1 values(now+1s, 1)
sql insert into st_ct_1 values(now+2s, 2)
sql insert into st_ct_1 values(now+3s, 3)
sql insert into st_ct_2 values(now+1s, 1)
sql insert into st_ct_2 values(now+2s, 2)
sql insert into st_ct_2 values(now+3s, 3)
sql insert into st_ct_3 values(now+1s, 1)
sql insert into st_ct_3 values(now+2s, 2)
sql insert into st_ct_3 values(now+3s, 2)
sql insert into st_ct_4 values(now+1s, 1)
sql insert into st_ct_4 values(now+2s, 2)
sql insert into st_ct_4 values(now+3s, 2)
sql insert into st_ct_5 values(now+1s, 1)
sql insert into st_ct_5 values(now+2s, 2)
sql insert into st_ct_5 values(now+3s, 2)
# check query
sql select * from st
if $rows != 15 then
return -1
endi
# check table vgroup
sql select * from information_schema.ins_tables where db_name = 'd1'
if $data(st_ct_1)[6] != 2 then
return -1
endi
if $data(st_ct_2)[6] != 2 then
return -1
endi
if $data(st_ct_3)[6] != 2 then
return -1
endi
if $data(st_ct_4)[6] != 2 then
return -1
endi
if $data(st_ct_5)[6] != 2 then
return -1
endi
# check invalid table name
sql create table c1 using st tags(3)
sql create table c12 using st tags(3)
sql create table c123 using st tags(3)
sql create table c1234 using st tags(3)
sql create table c12345 using st tags(3)
sql select * from information_schema.ins_tables where db_name = 'd1'
if $data(c1)[6] != 2 then
return -1
endi
if $data(c12)[6] != 3 then
return -1
endi
if $data(c123)[6] != 2 then
return -1
endi
if $data(c1234)[6] != 3 then
return -1
endi
if $data(c12345)[6] != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册