提交 2a41f1d3 编写于 作者: H Hongze Cheng

more work

上级 6140856e
......@@ -86,8 +86,8 @@ typedef struct STsdbFSState STsdbFSState;
#define TSDBROW_VERSION(ROW) (((ROW)->type == 0) ? (ROW)->version : (ROW)->pBlockData->aVersion[(ROW)->iRow])
#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow)
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)});
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)});
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)})
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
......@@ -132,6 +132,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
void tColDataReset(SColData *pColData, int16_t cid, int8_t type);
void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataPCmprFn(const void *p1, const void *p2);
// SBlockData
......@@ -142,6 +143,8 @@ void tBlockDataReset(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
// SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph);
......@@ -374,9 +377,9 @@ typedef struct {
int64_t nRow;
int8_t cmprAlg;
int64_t offset;
int64_t vsize; // VERSION size
int64_t ksize; // TSKEY size
int64_t bsize;
int64_t vsize; // VERSION size
int64_t ksize; // TSKEY size
int64_t bsize; // total block size
SMapData mBlockCol; // SMapData<SBlockCol>
} SSubBlock;
......@@ -412,7 +415,6 @@ struct SColData {
int32_t *aOffset;
int32_t nData;
uint8_t *pData;
uint8_t *pBuf;
};
struct SBlockData {
......
......@@ -607,7 +607,9 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
int64_t n;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SBlockCol *pBlockCol = &(SBlockCol){};
SBlockCol *pBlockCol = &(SBlockCol){0};
tBlockDataReset(pBlockData);
// realloc
code = tsdbRealloc(ppBuf1, pSubBlock->bsize);
......@@ -789,11 +791,42 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
if (code) goto _err;
// read remain block data and do merg
iSubBlock++;
for (; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
ASSERT(0);
if (pBlock->nSubBlock > 1) {
SBlockData *pBlockData1 = &(SBlockData){0};
SBlockData *pBlockData2 = &(SBlockData){0};
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2);
if (code) {
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
goto _err;
}
code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) {
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
goto _err;
}
// merge two block data
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) {
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
goto _err;
}
}
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
}
ASSERT(pBlock->nRow == pBlockData->nRow);
ASSERT(tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&tBlockDataFirstRow(pBlockData))) == 0);
ASSERT(tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&tBlockDataLastRow(pBlockData))) == 0);
if (pBuf1) tsdbFree(pBuf1);
if (pBuf2) tsdbFree(pBuf2);
return code;
......
......@@ -939,6 +939,29 @@ _exit:
return code;
}
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) {
int32_t code = 0;
pColDataDest->cid = pColDataDest->cid;
pColDataDest->type = pColDataDest->type;
pColDataDest->offsetValid = 0;
pColDataDest->nVal = pColDataSrc->nVal;
pColDataDest->flag = pColDataSrc->flag;
if (pColDataSrc->flag != HAS_NONE && pColDataSrc->flag != HAS_NULL && pColDataSrc->flag != HAS_VALUE) {
code = tsdbRealloc(&pColDataDest->pBitMap, BIT2_SIZE(pColDataDest->nVal));
if (code) goto _exit;
memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, BIT2_SIZE(pColDataSrc->nVal));
}
pColDataDest->nData = pColDataSrc->nData;
code = tsdbRealloc(&pColDataDest->pData, pColDataSrc->nData);
if (code) goto _exit;
memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataSrc->nData);
_exit:
return code;
}
static int32_t tColDataUpdateOffset(SColData *pColData) {
int32_t code = 0;
SValue value;
......@@ -1169,3 +1192,85 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
_err:
return code;
}
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) {
int32_t code = 0;
tBlockDataReset(pBlockData);
// loop to merge
int32_t iRow1 = 0;
int32_t nRow1 = pBlockData1->nRow;
int32_t iRow2 = 0;
int32_t nRow2 = pBlockData2->nRow;
TSDBROW row1;
TSDBROW row2;
int32_t c;
while (iRow1 < nRow1 && iRow2 < nRow2) {
row1 = tsdbRowFromBlockData(pBlockData1, iRow1);
row2 = tsdbRowFromBlockData(pBlockData2, iRow2);
c = tsdbKeyCmprFn(&TSDBROW_KEY(&row1), &TSDBROW_KEY(&row2));
if (c < 0) {
code = tBlockDataAppendRow(pBlockData, &row1, NULL);
if (code) goto _exit;
iRow1++;
} else if (c > 0) {
code = tBlockDataAppendRow(pBlockData, &row2, NULL);
if (code) goto _exit;
iRow2++;
} else {
ASSERT(0);
}
}
while (iRow1 < nRow1) {
row1 = tsdbRowFromBlockData(pBlockData1, iRow1);
code = tBlockDataAppendRow(pBlockData, &row1, NULL);
if (code) goto _exit;
iRow1++;
}
while (iRow2 < nRow2) {
row2 = tsdbRowFromBlockData(pBlockData2, iRow2);
code = tBlockDataAppendRow(pBlockData, &row2, NULL);
if (code) goto _exit;
iRow2++;
}
_exit:
return code;
}
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) {
int32_t code = 0;
SColData *pColDataSrc;
SColData *pColDataDest;
ASSERT(pBlockDataSrc->nRow > 0);
tBlockDataReset(pBlockDataDest);
pBlockDataDest->nRow = pBlockDataSrc->nRow;
// TSDBKEY
code = tsdbRealloc((uint8_t **)&pBlockDataDest->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow);
if (code) goto _exit;
code = tsdbRealloc((uint8_t **)&pBlockDataDest->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow);
if (code) goto _exit;
memcpy(pBlockDataDest->aVersion, pBlockDataSrc->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow);
memcpy(pBlockDataDest->aTSKEY, pBlockDataSrc->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow);
// other
for (size_t iColData = 0; iColData < taosArrayGetSize(pBlockDataSrc->aColDataP); iColData++) {
pColDataSrc = (SColData *)taosArrayGetP(pBlockDataSrc->aColDataP, iColData);
code = tBlockDataAddColData(pBlockDataDest, iColData, &pColDataDest);
if (code) goto _exit;
code = tColDataCopy(pColDataSrc, pColDataDest);
if (code) goto _exit;
}
_exit:
return code;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册