提交 39e48a7e 编写于 作者: H Hongze Cheng

more work

上级 a7ea0bfe
......@@ -82,10 +82,12 @@ typedef struct STsdbFSState STsdbFSState;
// tsdbUtil.c ==============================================================================================
// TSDBROW
#define TSDBROW_TS(ROW) (((ROW)->type == 0) ? (ROW)->pTSRow->ts : (ROW)->pBlockData->aTSKEY[(ROW)->iRow])
#define TSDBROW_VERSION(ROW) (((ROW)->type == 0) ? (ROW)->version : (ROW)->pBlockData->aVersion[(ROW)->iRow])
#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow)
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)});
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .pTSRow = (IROW)});
TSDBKEY tsdbRowKey(TSDBROW *pRow);
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)});
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
......@@ -132,6 +134,8 @@ int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataPCmprFn(const void *p1, const void *p2);
// SBlockData
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData);
......@@ -175,7 +179,8 @@ TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
// tsdbFile.c ==============================================================================================
typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT;
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]);
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]);
int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype);
// SDelFile
#define tsdbDelFileCreate() \
((SDelFile){ \
......@@ -195,16 +200,18 @@ SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
// tsdbReaderWriter.c ==============================================================================================
// SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync);
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock);
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter);
// SDataFReader
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader *pReader);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
......@@ -470,11 +477,13 @@ struct SBlockSMA {
SColSMA *aColSMA;
};
#pragma pack(push, 1)
struct SBlockDataHdr {
uint32_t delimiter;
int64_t suid;
int64_t uid;
};
#pragma pack(pop)
struct SHeadFile {
int64_t commitID;
......
......@@ -16,9 +16,9 @@
#include "tsdb.h"
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
int32_t code = 0;
SLRUCache *pCache = NULL;
size_t cfgCapacity = 1024 * 1024; // TODO: get cfg from tsdb config
size_t cfgCapacity = 1024 * 1024; // TODO: get cfg from tsdb config
pCache = taosLRUCacheInit(cfgCapacity, -1, .5);
if (pCache == NULL) {
......@@ -44,35 +44,33 @@ void tsdbCloseCache(SLRUCache *pCache) {
static void getTableCacheKey(tb_uid_t uid, const char *cacheType, char *key, int *len) {
int keyLen = 0;
snprintf(key, 30, "%"PRIi64 "%s", uid, cacheType);
snprintf(key, 30, "%" PRIi64 "%s", uid, cacheType);
*len = strlen(key);
}
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) {
taosMemoryFree(value);
}
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); }
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
int32_t code = 0;
STSRow *cacheRow = NULL;
char key[32] = {0};
int keyLen = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
cacheRow = (STSRow *) taosLRUCacheValue(pCache, h);
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (row->ts >= cacheRow->ts) {
if (row->ts > cacheRow->ts) {
tdRowCpy(cacheRow, row);
tdRowCpy(cacheRow, row);
}
}
} else {
cacheRow = tdRowDup(row);
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow),
deleter, NULL, TAOS_LRU_PRIORITY_LOW);
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
......@@ -87,7 +85,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) {
metaReaderClear(&mr); // table not esist
metaReaderClear(&mr); // table not esist
return 0;
}
......@@ -116,9 +114,9 @@ static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow
tsdbTbDataIterCreate(pMem, NULL, 1, &iter);
if (iter != NULL) {
TSDBROW *row = tsdbTbDataIterGet(iter);
TSDBROW *row = tsdbTbDataIterGet(iter);
tsdbTbDataIterDestroy(iter);
tsdbTbDataIterDestroy(iter);
}
}
} else {
......@@ -153,7 +151,7 @@ _err:
}
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
int32_t code = 0;
int32_t code = 0;
SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
for (; pDelData; pDelData = pDelData->pNext) {
......@@ -163,7 +161,8 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
return code;
}
static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx,
SArray *aDelData) {
int32_t code = 0;
if (pMem) {
......@@ -185,7 +184,8 @@ _err:
return code;
}
static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aSkyline) {
static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx,
SArray *aSkyline) {
int32_t code = 0;
SArray *aDelData = taosArrayInit(32, sizeof(SDelData));
......@@ -219,10 +219,8 @@ _err:
return code;
}
static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet,
SArray *pSkyline,
STsdb *pTsdb,
STSRow **pLastRow) {
static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, SArray *pSkyline,
STsdb *pTsdb, STSRow **pLastRow) {
int32_t code = 0;
TSDBROW *pMemRow = NULL;
......@@ -247,7 +245,7 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile
SBlockData *pBlockData;
tsdbDataFReaderClose(pDataFReader);
tsdbDataFReaderClose(&pDataFReader);
_err:
return code;
......@@ -258,10 +256,10 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
STbData *pMem = NULL;
STbData *pIMem = NULL;
STbDataIter iter; // mem buffer skip list iterator
STbDataIter iiter; // imem buffer skip list iterator
STbData *pMem = NULL;
STbData *pIMem = NULL;
STbDataIter iter; // mem buffer skip list iterator
STbDataIter iiter; // imem buffer skip list iterator
if (pTsdb->mem) {
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
......@@ -280,7 +278,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
*ppRow = NULL;
SDelFReader *pDelFReader;
//code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL);
// code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL);
if (code) goto _err;
SDelIdx delIdx;
......@@ -297,18 +295,18 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
tsdbFSIterOpen(pTsdb->fs, TSDB_FS_ITER_BACKWARD, &fsiter);
do {
*/
SDFileSet *pFileSet = NULL;
//pFileSet = tsdbFSIterGet(fsiter);
code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow);
if (code < 0) {
goto _err;
}
SDFileSet *pFileSet = NULL;
// pFileSet = tsdbFSIterGet(fsiter);
if (*ppRow != NULL) {
//break;
}
/*
code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow);
if (code < 0) {
goto _err;
}
if (*ppRow != NULL) {
// break;
}
/*
} while (fsHasNext = tsdbFSIterNext(fsiter))
*/
......@@ -323,13 +321,13 @@ _err:
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
*ppRow = (STSRow *) taosLRUCacheValue(pCache, h);
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
code = mergeLastRow(uid, pTsdb, &pRow);
......@@ -344,14 +342,14 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
taosLRUCacheRelease(pCache, h, true);
//void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
......
......@@ -23,6 +23,7 @@ typedef struct {
int8_t precision;
int32_t minRow;
int32_t maxRow;
int8_t cmprAlg;
// --------------
TSKEY nextKey; // reset by each table commit
int32_t commitFid;
......@@ -39,6 +40,9 @@ typedef struct {
SMapData nBlockMap; // SMapData<SBlock>
SBlock nBlock;
SBlockData nBlockData;
int64_t suid;
int64_t uid;
STSchema *pTSchema;
/* commit del */
SDelFReader *pDelFReader;
SMapData oDelIdxMap; // SMapData<SDelIdx>, old
......@@ -710,50 +714,119 @@ _err:
return code;
}
static int32_t tsdbCommitterUpdateSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0;
if (pCommitter->pTSchema) {
if (pCommitter->suid == suid) {
if (suid == 0) {
if (pCommitter->uid == uid && sver == pCommitter->pTSchema->version) goto _exit;
} else {
if (sver == pCommitter->pTSchema->version) goto _exit;
}
}
}
_update_schema:
pCommitter->suid = suid;
pCommitter->uid = uid;
tTSchemaDestroy(pCommitter->pTSchema);
pCommitter->pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
if (pCommitter->pTSchema == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
_exit:
return code;
}
static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
STbDataIter *pIter = &(STbDataIter){0};
TSDBKEY key = {.ts = pCommitter->minKey, .version = VERSION_MIN};
TSDBROW row;
TSDBROW *pRow;
// create iter
tsdbTbDataIterOpen(pTbData, &key, 0, pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow == NULL || tsdbRowKey(pRow).ts > pCommitter->maxKey) goto _exit;
if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) goto _exit;
// main loop
SMapData *mBlock = &pCommitter->nBlockMap;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pTbData->suid, .uid = pTbData->uid};
SBlock *pBlock = &pCommitter->nBlock;
SBlockData *pBlockData = &pCommitter->nBlockData;
tMapDataReset(mBlock);
tBlockIdxReset(pBlockIdx);
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
while (pRow != NULL && tsdbRowKey(pRow).ts <= pCommitter->maxKey) {
code = tBlockDataAppendRow(pBlockData, pRow, NULL);
while (pRow != NULL && TSDBROW_TS(pRow) <= pCommitter->maxKey) {
code = tsdbCommitterUpdateSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
if (code) goto _err;
pBlock->minVersion = TMIN(pBlock->minVersion, tsdbRowKey(pRow).version);
pBlock->maxVersion = TMAX(pBlock->maxVersion, tsdbRowKey(pRow).version);
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow));
pBlock->nRow++;
// next
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
// write the block and do something
ASSERT(0);
// // SBlock
// pBlock->last = 0;
// pBlock->cmprAlg = pCommitter->cmprAlg;
// code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock);
// if (code) goto _err;
// // SBlockIdx
// pBlockIdx->minKey = TMIN(pBlockIdx->minKey, pBlock->minKey.ts);
// pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts);
// pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion);
// pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion);
// tBlockReset(pBlock);
// tBlockDataReset(pBlockData);
}
}
if (pBlockData->nRow > 0) {
// write the block to file
// SBlock
row = tBlockDataFirstRow(pBlockData);
if (tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&row)) > 0) pBlock->minKey = TSDBROW_KEY(&row);
row = tBlockDataLastRow(pBlockData);
if (tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&row)) < 0) pBlock->maxKey = TSDBROW_KEY(&row);
pBlock->last = 1;
pBlock->cmprAlg = pCommitter->cmprAlg;
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock);
if (code) goto _err;
// SBlockIdx
code = tMapDataPutItem(mBlock, pBlock, tPutBlock);
if (code) goto _err;
pBlockIdx->minKey = TMIN(pBlockIdx->minKey, pBlock->minKey.ts);
pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts);
pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion);
pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion);
}
// write block
code = tsdbWriteBlock(pCommitter->pWriter, mBlock, NULL, pBlockIdx);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
if (code) goto _err;
_exit:
if (pRow) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, tsdbRowKey(pRow).ts);
}
if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
return code;
_err:
......@@ -856,70 +929,6 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
}
}
#if 0
while (true) {
if (pTbData == NULL && pBlockIdx == NULL) break;
if (pTbData && pBlockIdx) {
c = tTABLEIDCmprFn(pTbData, pBlockIdx);
if (c == 0) {
goto _commit_mem_and_disk_data;
} else if (c < 0) {
goto _commit_mem_data;
} else {
goto _commit_disk_data;
}
} else if (pTbData) {
goto _commit_mem_data;
} else {
goto _commit_disk_data;
}
_commit_mem_data:
code = tsdbCommitTableData(pCommitter, pTbData, NULL);
if (code) goto _err;
iTbData++;
if (iTbData < nTbData) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
} else {
pTbData = NULL;
}
continue;
_commit_disk_data:
code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
if (code) goto _err;
iBlockIdx++;
if (iBlockIdx < nBlockIdx) {
tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
} else {
pBlockIdx = NULL;
}
continue;
_commit_mem_and_disk_data:
code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
if (code) goto _err;
iTbData++;
iBlockIdx++;
if (iTbData < nTbData) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
} else {
pTbData = NULL;
}
if (iBlockIdx < nBlockIdx) {
tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
} else {
pBlockIdx = NULL;
}
continue;
}
#endif
return code;
_err:
......@@ -938,12 +947,16 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
if (code) goto _err;
// upsert SDFileSet
code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
if (code) goto _err;
// close and sync
code = tsdbDataFWriterClose(pCommitter->pWriter, 1);
code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
if (code) goto _err;
if (pCommitter->pReader) {
code = tsdbDataFReaderClose(pCommitter->pReader);
code = tsdbDataFReaderClose(&pCommitter->pReader);
goto _err;
}
......@@ -995,6 +1008,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
pCommitter->precision = pTsdb->keepCfg.precision;
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
code = tsdbFSBegin(pTsdb->fs);
if (code) goto _err;
......
......@@ -45,6 +45,38 @@ void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char
}
}
int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype) {
int32_t n = 0;
switch (ftype) {
case TSDB_HEAD_FILE: {
SHeadFile *pHeadFile = &pSet->fHead;
n += tPutI64(p + n, pHeadFile->commitID);
n += tPutI64(p + n, pHeadFile->size);
n += tPutI64(p + n, pHeadFile->offset);
} break;
case TSDB_DATA_FILE: {
SDataFile *pDataFile = &pSet->fData;
n += tPutI64(p + n, pDataFile->commitID);
n += tPutI64(p + n, pDataFile->size);
} break;
case TSDB_LAST_FILE: {
SLastFile *pLastFile = &pSet->fLast;
n += tPutI64(p + n, pLastFile->commitID);
n += tPutI64(p + n, pLastFile->size);
} break;
case TSDB_SMA_FILE: {
SSmaFile *pSmaFile = &pSet->fSma;
n += tPutI64(p + n, pSmaFile->commitID);
n += tPutI64(p + n, pSmaFile->size);
} break;
default:
ASSERT(0);
}
return n;
}
// SHeadFile ===============================================
// SDataFile ===============================================
......
......@@ -460,33 +460,35 @@ _err:
return code;
}
int32_t tsdbDataFReaderClose(SDataFReader *pReader) {
int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
int32_t code = 0;
if (taosCloseFile(&pReader->pHeadFD) < 0) {
if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&pReader->pDataFD) < 0) {
if (taosCloseFile(&(*ppReader)->pDataFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&pReader->pLastFD) < 0) {
if (taosCloseFile(&(*ppReader)->pLastFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&pReader->pSmaFD) < 0) {
if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosMemoryFree(*ppReader);
*ppReader = NULL;
return code;
_err:
tsdbError("vgId:%d data file reader close failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -615,6 +617,8 @@ struct SDataFWriter {
TdFilePtr pSmaFD;
};
SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter) { return &pWriter->wSet; }
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
int32_t code = 0;
int32_t flag;
......@@ -751,55 +755,58 @@ _err:
return code;
}
int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync) {
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
int32_t code = 0;
STsdb *pTsdb = (*ppWriter)->pTsdb;
if (sync) {
if (taosFsyncFile(pWriter->pHeadFD) < 0) {
if (taosFsyncFile((*ppWriter)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pWriter->pDataFD) < 0) {
if (taosFsyncFile((*ppWriter)->pDataFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pWriter->pLastFD) < 0) {
if (taosFsyncFile((*ppWriter)->pLastFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pWriter->pSmaFD) < 0) {
if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
if (taosCloseFile(&pWriter->pHeadFD) < 0) {
if (taosCloseFile(&(*ppWriter)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&pWriter->pDataFD) < 0) {
if (taosCloseFile(&(*ppWriter)->pDataFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&pWriter->pLastFD) < 0) {
if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&pWriter->pSmaFD) < 0) {
if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosMemoryFree(*ppWriter);
*ppWriter = NULL;
return code;
_err:
tsdbError("vgId:%d data file writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d data file writer close failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -821,7 +828,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
// head ==============
// build
memset(*ppBuf, 0, size);
// tPutHeadFileHdr(*ppBuf, pHeadFile);
tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_HEAD_FILE);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
......@@ -839,7 +846,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
// data ==============
memset(*ppBuf, 0, size);
// tPutDataFileHdr(*ppBuf, pDataFile);
tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_DATA_FILE);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
......@@ -857,7 +864,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
// last ==============
memset(*ppBuf, 0, size);
// tPutLastFileHdr(*ppBuf, pLastFile);
tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_LAST_FILE);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
......@@ -875,7 +882,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
// sma ==============
memset(*ppBuf, 0, size);
// tPutSmaFileHdr(*ppBuf, pSmaFile);
tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_SMA_FILE);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
......@@ -902,12 +909,13 @@ _err:
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size = 0;
int64_t size;
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
int64_t n = 0;
int64_t n;
uint8_t *pBuf = NULL;
// prepare
size = 0;
size += tPutU32(NULL, TSDB_FILE_DLMT);
size = size + tPutMapData(NULL, mBlockIdx) + sizeof(TSCKSUM);
......@@ -917,6 +925,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **
if (code) goto _err;
// build
n = 0;
n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutMapData(*ppBuf, mBlockIdx);
taosCalcChecksumAppend(0, *ppBuf, size);
......@@ -944,20 +953,17 @@ _err:
}
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
ASSERT(mBlock->nItem > 0);
// prepare
size = 0;
size += tPutU32(NULL, TSDB_FILE_DLMT);
size += tPutI64(NULL, pBlockIdx->suid);
size += tPutI64(NULL, pBlockIdx->uid);
size = size + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM);
size = sizeof(SBlockDataHdr) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
......@@ -966,9 +972,8 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf,
// build
n = 0;
n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutI64(*ppBuf + n, pBlockIdx->suid);
n += tPutI64(*ppBuf + n, pBlockIdx->uid);
*(SBlockDataHdr *)(*ppBuf) = hdr;
n += sizeof(hdr);
n += tPutMapData(*ppBuf + n, mBlock);
taosCalcChecksumAppend(0, *ppBuf, size);
......@@ -1001,17 +1006,25 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
SBlockIdx *pBlockIdx, SBlock *pBlock) {
int32_t code = 0;
SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++];
SBlockCol bCol;
SBlockCol *pBlockCol = &(SBlockCol){0};
int64_t size;
int64_t n;
TdFilePtr pFileFD = pWriter->pDataFD; // TODO
TdFilePtr pFileFD = pBlock->last ? pWriter->pLastFD : pWriter->pDataFD;
SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
TSCKSUM cksm;
uint8_t *p;
int64_t offset;
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
pSubBlock->offset = 0; // TODO: set as file offset
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
if (pBlock->last) {
pSubBlock->offset = pWriter->wSet.fLast.size;
} else {
pSubBlock->offset = pWriter->wSet.fData.size;
}
pSubBlock->bsize = 0;
// HDR
......@@ -1100,7 +1113,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
// other columns
offset = 0;
tMapDataClear(&pSubBlock->mBlockCol);
tMapDataReset(&pSubBlock->mBlockCol);
for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aColDataP); iCol++) {
SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iCol);
......@@ -1108,18 +1121,18 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if (pColData->flag == HAS_NONE) continue;
bCol.cid = pColData->cid;
bCol.type = pColData->type;
bCol.flag = pColData->flag;
pBlockCol->cid = pColData->cid;
pBlockCol->type = pColData->type;
pBlockCol->flag = pColData->flag;
if (pColData->flag != HAS_NULL) {
cksm = 0;
bCol.offset = offset;
bCol.size = 0;
pBlockCol->offset = offset;
pBlockCol->size = 0;
// bitmap
if (pColData->flag != HAS_VALUE) {
// TODO: optimize bitmap part
// optimize bitmap storage (todo)
n = taosWriteFile(pFileFD, pColData->pBitMap, BIT2_SIZE(pBlockData->nRow));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
......@@ -1127,7 +1140,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
}
cksm = taosCalcChecksum(cksm, pColData->pBitMap, n);
bCol.size += n;
pBlockCol->size += n;
}
// data
......@@ -1138,7 +1151,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
bCol.size += n;
pBlockCol->size += n;
// checksum
cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData);
......@@ -1147,7 +1160,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
bCol.size += n;
pBlockCol->size += n;
} else {
size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM);
......@@ -1160,8 +1173,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
}
// data
n = tDataTypes->compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size, pBlock->cmprAlg,
*ppBuf2, size);
n = tDataTypes[pColData->type].compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size,
pBlock->cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
......@@ -1171,29 +1184,40 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
n += sizeof(TSCKSUM);
ASSERT(n <= size);
taosCalcChecksumAppend(cksm, *ppBuf1, n);
bCol.size += n;
// write
n = taosWriteFile(pFileFD, *ppBuf1, bCol.size);
n = taosWriteFile(pFileFD, *ppBuf1, n);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pBlockCol->size += n;
}
// state
offset += bCol.size;
pSubBlock->bsize += bCol.size;
offset += pBlockCol->size;
pSubBlock->bsize += pBlockCol->size;
}
code = tMapDataPutItem(&pSubBlock->mBlockCol, &bCol, tPutBlockCol);
code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol);
if (code) goto _err;
}
if (pBlock->last) {
pWriter->wSet.fLast.size += pSubBlock->bsize;
} else {
pWriter->wSet.fData.size += pSubBlock->bsize;
}
tsdbFree(pBuf1);
tsdbFree(pBuf2);
return code;
_err:
tsdbError("vgId:%d write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tsdbFree(pBuf1);
tsdbFree(pBuf2);
return code;
}
......
......@@ -326,6 +326,7 @@ void tBlockReset(SBlock *pBlock) {
pBlock->maxVersion = VERSION_MIN;
pBlock->nRow = 0;
pBlock->last = -1;
pBlock->hasDup = 0;
pBlock->cmprAlg = -1;
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
pBlock->aSubBlock[iSubBlock].offset = -1;
......@@ -566,14 +567,6 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK
// }
// TSDBROW ======================================================
TSDBKEY tsdbRowKey(TSDBROW *pRow) {
if (pRow->type == 0) {
return (TSDBKEY){.version = pRow->version, .ts = pRow->pTSRow->ts};
} else {
return (TSDBKEY){.version = pRow->pBlockData->aVersion[pRow->iRow], .ts = pRow->pBlockData->aTSKEY[pRow->iRow]};
}
}
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
STColumn *pTColumn = &pTSchema->columns[iCol];
SValue value;
......@@ -661,7 +654,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
// SRowMerger ======================================================
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = tsdbRowKey(pRow);
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0};
STColumn *pTColumn;
......@@ -702,7 +695,7 @@ void tRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); }
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
int32_t code = 0;
TSDBKEY key = tsdbRowKey(pRow);
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0};
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts);
......@@ -1067,15 +1060,14 @@ static SColData *tBlockDataAddBlockCol(SBlockData *pBlockData, int32_t iColData,
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = tsdbRowKey(pRow);
// TSDBKEY
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1));
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1));
if (code) goto _err;
pBlockData->aVersion[pBlockData->nRow] = key.version;
pBlockData->aTSKEY[pBlockData->nRow] = key.ts;
pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow);
pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
// OTHER
int32_t iColData = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册