提交 095ca03f 编写于 作者: H Hongze Cheng

more work

上级 6f80993d
......@@ -32,31 +32,29 @@ extern "C" {
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define TSDB_MAX_SUBBLOCKS 8
typedef struct TSDBROW TSDBROW;
typedef struct TSDBKEY TSDBKEY;
typedef struct TABLEID TABLEID;
typedef struct KEYINFO KEYINFO;
typedef struct SDelOp SDelOp;
typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx;
typedef struct STbData STbData;
typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct SMergeInfo SMergeInfo;
typedef struct STable STable;
typedef struct SMapData SMapData;
typedef struct SBlockSMA SBlockSMA;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock;
typedef struct SBlockCol SBlockCol;
typedef struct SBlockStatis SBlockStatis;
typedef struct SAggrBlkCol SAggrBlkCol;
typedef struct SBlockData SBlockData;
typedef struct SReadH SReadH;
typedef struct TSDBROW TSDBROW;
typedef struct TSDBKEY TSDBKEY;
typedef struct TABLEID TABLEID;
typedef struct SDelOp SDelOp;
typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx;
typedef struct STbData STbData;
typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct SMergeInfo SMergeInfo;
typedef struct STable STable;
typedef struct SMapData SMapData;
typedef struct SColData SColData;
typedef struct SColDataBlock SColDataBlock;
typedef struct SBlockSMA SBlockSMA;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlockInfo SBlockInfo;
typedef struct SBlock SBlock;
typedef struct SBlockCol SBlockCol;
typedef struct SBlockStatis SBlockStatis;
typedef struct SAggrBlkCol SAggrBlkCol;
typedef struct SBlockData SBlockData;
typedef struct SReadH SReadH;
#define TSDB_MAX_SUBBLOCKS 8
// tsdbMemTable ==============================================================================================
......@@ -111,7 +109,8 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
int32_t tsdbDataFReaderClose(SDataFReader *pReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlock *pBlock, SColDataBlock *pBlockData, uint8_t **ppBuf);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
int16_t *aColId, int32_t nCol, uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
// SDelFWriter
......@@ -143,6 +142,18 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
// KEYINFO
#define KEYINFO_INIT_VAL \
((KEYINFO){.maxKey.ts = TSKEY_MIN, \
.maxKey.version = 0, \
.minKey.ts = TSKEY_MAX, \
.minKey.version = INT64_MAX, \
.minVerion = INT64_MAX, \
.maxVersion = 0})
int32_t tPutKEYINFO(uint8_t *p, KEYINFO *pKeyInfo);
int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo);
// SMapData
void tMapDataReset(SMapData *pMapData);
void tMapDataClear(SMapData *pMapData);
......@@ -211,6 +222,13 @@ struct TSDBKEY {
TSKEY ts;
};
struct KEYINFO {
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t minVerion;
int64_t maxVersion;
};
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
......@@ -233,8 +251,7 @@ struct SDelDataInfo {
struct STbData {
tb_uid_t suid;
tb_uid_t uid;
TSDBKEY minKey;
TSDBKEY maxKey;
KEYINFO info;
SDelOp *pHead;
SDelOp *pTail;
SMemSkipList sl;
......@@ -244,8 +261,7 @@ struct SMemTable {
SRWLatch latch;
STsdb *pTsdb;
int32_t nRef;
TSDBKEY minKey;
TSDBKEY maxKey;
KEYINFO info;
int64_t nRow;
int64_t nDel;
SArray *aTbData; // SArray<STbData*>
......@@ -261,8 +277,8 @@ struct TSDBROW {
STSRow *pTSRow;
};
struct {
SColDataBlock *pColDataBlock;
int32_t iRow;
SBlockData *pBlockData;
int32_t iRow;
};
};
};
......@@ -270,36 +286,9 @@ struct TSDBROW {
struct SBlockIdx {
int64_t suid;
int64_t uid;
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t offset;
int64_t size;
};
typedef enum {
TSDB_SBLK_VER_0 = 0,
TSDB_SBLK_VER_MAX,
} ESBlockVer;
#define SBlockVerLatest TSDB_SBLK_VER_0
struct SBlockItem {
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t minVerion;
int64_t maxVersion;
};
struct SBlockInfo {
uint8_t flags;
int32_t nCols;
KEYINFO info;
int64_t offset;
int64_t size;
int32_t nSmaCols;
int64_t smaOffset;
int64_t smaSize;
};
typedef struct {
......@@ -308,11 +297,8 @@ typedef struct {
} SSubBlock;
struct SBlock {
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int32_t nRows;
KEYINFO info;
int32_t nRow;
int8_t last;
int8_t hasDup;
int8_t nSubBlock;
......@@ -388,10 +374,7 @@ struct SDelIdx {
};
struct SDelFile {
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
KEYINFO info;
int64_t size;
int64_t offset;
};
......@@ -405,21 +388,6 @@ struct SMapData {
uint8_t *pBuf;
};
struct SColData {
int16_t cid;
uint8_t flags;
uint32_t nData;
uint8_t *pData;
};
struct SColDataBlock {
int32_t nRow;
int64_t *aVersion;
TSKEY *aTSKey;
int32_t nColData;
SColData *aColData;
};
typedef struct {
int16_t colId;
int16_t maxIndex;
......
......@@ -31,9 +31,11 @@ typedef struct {
SDataFReader *pReader;
SMapData oBlockIdx; // SMapData<SBlockIdx>, read from reader
SMapData oBlock; // SMapData<SBlock>, read from reader
SBlockData bDataO;
SDataFWriter *pWriter;
SMapData nBlockIdx; // SMapData<SBlockIdx>, build by committer
SMapData nBlock; // SMapData<SBlock>
SBlockData bDataN;
/* commit del */
SDelFReader *pDelFReader;
SMapData oDelIdxMap; // SMapData<SDelIdx>, old
......@@ -171,7 +173,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
delIdx.minKey = TSKEY_MAX;
delIdx.maxKey = TSKEY_MIN;
delIdx.minVersion = INT64_MAX;
delIdx.maxVersion = -1;
delIdx.maxVersion = INT64_MIN;
// start
tMapDataReset(&pCommitter->oDelDataMap);
......@@ -348,54 +350,118 @@ _err:
#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
TSDBROW *pRow;
SBlock block = BLOCK_INIT_VAL;
SBlockData bData;
static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey,
bool toDataOnly) {
int32_t code = 0;
TSDBROW *pRow;
SBlock block; // TODO
if (pBlock == NULL) {
while (true) {
pRow = tsdbTbDataIterGet(pIter);
if (pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) {
if (bData.nRow == 0) {
break;
} else {
goto _write_block_data;
}
while (true) {
pRow = tsdbTbDataIterGet(pIter);
if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) {
if (pCommitter->bDataN.nRow == 0) {
break;
} else {
goto _write_block_data;
}
}
code = tsdbBlockDataAppendRow(&bData, pRow, NULL /*TODO*/);
if (code) goto _err;
code = tsdbBlockDataAppendRow(&pCommitter->bDataN, pRow, NULL /*TODO*/);
if (code) goto _err;
if (bData.nRow >= pCommitter->maxRow * 4 / 5) {
goto _write_block_data;
if (pCommitter->bDataN.nRow < pCommitter->maxRow * 4 / 5) {
continue;
}
_write_block_data:
if (!toDataOnly && pCommitter->bDataN.nRow < pCommitter->minKey) {
block.last = 1;
} else {
block.last = 0;
}
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->bDataN, NULL, pBlockIdx, &block);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock);
if (code) goto _err;
block = BLOCK_INIT_VAL;
tsdbBlockDataReset(&pCommitter->bDataN);
}
return code;
_err:
tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
int32_t nRow = 0;
SBlock block = BLOCK_INIT_VAL;
if (pBlock->last) {
// load last and merge until {pCommitter->maxKey, INT64_MAX}
} else {
// scan pIter, check how many rows in the block range
if (pBlock->nRow + nRow <= pCommitter->maxRow) {
if (pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
// add as a subblock
} else {
continue;
// load the block, merge until pBlock->maxKey
}
} else {
// load the block, merge until pBlock->maxKey
}
}
_write_block_data:
block.last = (bData.nRow > pCommitter->minRow) ? 0 : 1;
code = tsdbWriteBlockData(pCommitter->pWriter, &bData, NULL, pBlockIdx, &block);
if (code) goto _err;
return code;
}
code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock);
if (code) goto _err;
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
TSDBROW *pRow;
SBlock block = BLOCK_INIT_VAL;
SBlockData bDataN;
TSDBKEY key;
int32_t c;
// reset block and bdata
block = BLOCK_INIT_VAL;
tsdbBlockDataReset(&bData);
}
if (pBlock == NULL) {
key.ts = pCommitter->maxKey;
key.version = INT64_MAX;
code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0);
if (code) goto _err;
} else if (pBlock->last) {
// 1. read last block data
// 2. loop to merge memory data and last block data to write to .data file or .last file
// merge
code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock);
if (code) goto _err;
} else {
// while (true) {
// pRow = tsdbTbDataIterGet(pIter);
// memory
key.ts = pBlock->info.minKey.ts;
key.version = pBlock->info.minKey.version - 1;
code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1);
if (code) goto _err;
// merge or move block
pRow = tsdbTbDataIterGet(pIter);
key.ts = pRow->pTSRow->ts;
key.version = pRow->version;
// if (pRow == NULL) /* code */
// }
c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
if (c > 0) {
// move block
code = tMapDataPutItem(&pCommitter->nBlock, pBlock, tPutBlock);
if (code) goto _err;
} else if (c == 0) {
// merge
code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock);
if (code) goto _err;
} else {
ASSERT(0);
}
}
return code;
......@@ -701,7 +767,7 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) {
if (pMemTable->nRow == 0) goto _exit;
// loop
pCommitter->nextKey = pMemTable->minKey.ts;
pCommitter->nextKey = pMemTable->info.minKey.ts;
while (pCommitter->nextKey < TSKEY_MAX) {
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
......
......@@ -44,8 +44,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosInitRWLatch(&pMemTable->latch);
pMemTable->pTsdb = pTsdb;
pMemTable->nRef = 1;
pMemTable->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX};
pMemTable->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1};
pMemTable->info = KEYINFO_INIT_VAL;
pMemTable->nRow = 0;
pMemTable->nDel = 0;
pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *));
......@@ -321,8 +320,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
}
pTbData->suid = suid;
pTbData->uid = uid;
pTbData->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX};
pTbData->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1};
pTbData->info = KEYINFO_INIT_VAL;
pTbData->pHead = NULL;
pTbData->pTail = NULL;
pTbData->sl.seed = taosRand();
......@@ -512,12 +510,12 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
goto _err;
}
if (tsdbKeyCmprFn(&key, &pTbData->minKey) < 0) {
pTbData->minKey = key;
if (tsdbKeyCmprFn(&key, &pTbData->info.minKey) < 0) {
pTbData->info.minKey = key;
}
if (tsdbKeyCmprFn(&key, &pMemTable->minKey) < 0) {
pMemTable->minKey = key;
if (tsdbKeyCmprFn(&key, &pMemTable->info.minKey) < 0) {
pMemTable->info.minKey = key;
}
// forward put rest data
......@@ -539,13 +537,16 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
} while (row.pTSRow);
}
if (tsdbKeyCmprFn(&key, &pTbData->maxKey) > 0) {
pTbData->maxKey = key;
if (tsdbKeyCmprFn(&key, &pTbData->info.maxKey) > 0) {
pTbData->info.maxKey = key;
}
if (tsdbKeyCmprFn(&key, &pMemTable->maxKey) > 0) {
pMemTable->maxKey = key;
if (tsdbKeyCmprFn(&key, &pMemTable->info.maxKey) > 0) {
pMemTable->info.maxKey = key;
}
if (pTbData->info.minVerion > version) pTbData->info.minVerion = version;
if (pTbData->info.maxVersion < version) pTbData->info.maxVersion = version;
pMemTable->nRef++;
pRsp->numOfRows = nRow;
pRsp->affectedRows = nRow;
......
......@@ -65,10 +65,10 @@ enum {
};
typedef struct STableCheckInfo {
uint64_t suid;
uint64_t tableId;
TSKEY lastKey;
SBlockInfo* pCompInfo;
uint64_t suid;
uint64_t tableId;
TSKEY lastKey;
// SBlockInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks
uint8_t chosen : 2; // indicate which iterator should move forward
......
......@@ -543,7 +543,8 @@ _err:
return code;
}
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlock *pBlock, SColDataBlock *pBlockData, uint8_t **ppBuf) {
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
int16_t *aColId, int32_t nCol, uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0;
// TODO
return code;
......
......@@ -254,10 +254,7 @@ int32_t tPutBlockIdx(uint8_t *p, void *ph) {
n += tPutI64(p ? p + n : p, pBlockIdx->suid);
n += tPutI64(p ? p + n : p, pBlockIdx->uid);
n += tPutTSDBKEY(p ? p + n : p, &pBlockIdx->minKey);
n += tPutTSDBKEY(p ? p + n : p, &pBlockIdx->maxKey);
n += tPutI64v(p ? p + n : p, pBlockIdx->minVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->maxVersion);
n += tPutKEYINFO(p ? p + n : p, &pBlockIdx->info);
n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
n += tPutI64v(p ? p + n : p, pBlockIdx->size);
......@@ -270,10 +267,7 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
n += tGetI64(p + n, &pBlockIdx->suid);
n += tGetI64(p + n, &pBlockIdx->uid);
n += tGetTSDBKEY(p + n, &pBlockIdx->minKey);
n += tGetTSDBKEY(p + n, &pBlockIdx->maxKey);
n += tGetI64v(p + n, &pBlockIdx->minVersion);
n += tGetI64v(p + n, &pBlockIdx->maxVersion);
n += tGetKEYINFO(p + n, &pBlockIdx->info);
n += tGetI64v(p + n, &pBlockIdx->offset);
n += tGetI64v(p + n, &pBlockIdx->size);
......@@ -300,9 +294,9 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
SBlock *pBlock1 = (SBlock *)p1;
SBlock *pBlock2 = (SBlock *)p2;
if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) {
if (tsdbKeyCmprFn(&pBlock1->info.maxKey, &pBlock2->info.minKey) < 0) {
return -1;
} else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) {
} else if (tsdbKeyCmprFn(&pBlock1->info.minKey, &pBlock2->info.maxKey) > 0) {
return 1;
}
......@@ -368,10 +362,7 @@ int32_t tGetDelData(uint8_t *p, void *ph) {
int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tPutI64(p ? p + n : p, pDelFile->minKey);
n += tPutI64(p ? p + n : p, pDelFile->maxKey);
n += tPutI64v(p ? p + n : p, pDelFile->minVersion);
n += tPutI64v(p ? p + n : p, pDelFile->maxVersion);
n += tPutKEYINFO(p ? p + n : p, &pDelFile->info);
n += tPutI64v(p ? p + n : p, pDelFile->size);
n += tPutI64v(p ? p + n : p, pDelFile->offset);
......@@ -381,10 +372,7 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tGetI64(p + n, &pDelFile->minKey);
n += tGetI64(p + n, &pDelFile->maxKey);
n += tGetI64v(p + n, &pDelFile->minVersion);
n += tGetI64v(p + n, &pDelFile->maxVersion);
n += tGetKEYINFO(p + n, &pDelFile->info);
n += tGetI64v(p + n, &pDelFile->size);
n += tGetI64v(p + n, &pDelFile->offset);
......@@ -421,44 +409,6 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
// TODO
}
// SColDataBlock ======================================================
void tsdbColDataBlockReset(SColDataBlock *pColDataBlock) {
// TODO
}
int32_t tsdbColDataBlockAppend(SColDataBlock *pColDataBlock, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
int32_t nRow = pColDataBlock->nRow;
STColumn *pTColumn;
SColData *pColData;
SColVal colVal;
pColDataBlock->nRow++;
// version
pColDataBlock->aVersion[nRow] = pRow->version; // TODO
// ts
pColDataBlock->aTSKey[nRow] = pRow->pTSRow->ts; // TODO
// other rows
for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
pTColumn = &pTSchema->columns[iCol];
tsdbRowGetColVal(pRow, pTSchema, iCol, &colVal);
if (colVal.isNone) {
// TODO
} else if (colVal.isNull) {
// TODO
} else {
pColData->nData += tPutValue(pColData->pData + pColData->nData, &colVal.value, pTColumn->type);
}
}
return code;
}
// delete skyline ======================================================
static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) {
int32_t code = 0;
......@@ -570,4 +520,27 @@ int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *
int32_t code = 0;
// TODO
return code;
}
// KEYINFO ======================================================
int32_t tPutKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
int32_t n = 0;
n += tPutTSDBKEY(p ? p + n : p, &pKeyInfo->minKey);
n += tPutTSDBKEY(p ? p + n : p, &pKeyInfo->maxKey);
n += tPutI64v(p ? p + n : p, pKeyInfo->minVerion);
n += tPutI64v(p ? p + n : p, pKeyInfo->maxVersion);
return n;
}
int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
int32_t n = 0;
n += tGetTSDBKEY(p + n, &pKeyInfo->minKey);
n += tGetTSDBKEY(p + n, &pKeyInfo->maxKey);
n += tGetI64v(p + n, &pKeyInfo->minVerion);
n += tGetI64v(p + n, &pKeyInfo->maxVersion);
return n;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册