提交 b1466ad7 编写于 作者: H Hongze Cheng

make sma work

上级 a7cbb93a
......@@ -115,14 +115,13 @@ int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock);
// SBlockIdx
// #define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()})
void tBlockIdxReset(SBlockIdx *pBlockIdx);
int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph);
int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
// SColdata
#define tColDataInit() ((SColData){0})
void tColDataReset(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn);
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn);
void tColDataReset(SColData *pColData);
void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
......@@ -134,6 +133,8 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
#define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA))
int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema);
void tBlockDataClearData(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
......@@ -362,6 +363,7 @@ struct SMapData {
typedef struct {
int16_t cid;
int8_t type;
int8_t smaOn;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int32_t offset;
int32_t szBitmap; // bitmap size
......
......@@ -401,6 +401,9 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
if (code) goto _err;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
if (code) goto _err;
// loop to merge
pRow1 = tsdbTbDataIterGet(pIter);
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
......@@ -410,7 +413,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
tBlockDataClearData(pBlockData);
while (true) {
if (pRow1 == NULL && pRow2 == NULL) {
if (pBlockData->nRow == 0) {
......@@ -477,7 +480,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
tBlockDataClearData(pBlockData);
}
return code;
......@@ -496,8 +499,11 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
int64_t suid = pIter->pTbData->suid;
int64_t uid = pIter->pTbData->uid;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
tBlockDataClearData(pBlockData);
pRow = tsdbTbDataIterGet(pIter);
ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
while (true) {
......@@ -529,7 +535,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataReset(pBlockData);
tBlockDataClearData(pBlockData);
}
return code;
......@@ -599,6 +605,7 @@ static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
if (c == 0) {
nRow++;
tsdbTbDataIterNext(pIter);
} else if (c > 0) {
break;
} else {
......@@ -616,7 +623,9 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S
SBlock block;
TSDBROW *pRow;
tBlockDataReset(pBlockData);
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
......@@ -702,6 +711,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
pBlock = NULL;
}
if (pRow) {
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer);
if (code) goto _err;
}
// merge ===========
while (true) {
if (pRow == NULL && pBlock == NULL) break;
......
......@@ -895,7 +895,7 @@ static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, S
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
if (code) goto _err;
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type, 0);
tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn);
if (pBlockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
......@@ -1052,7 +1052,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData);
if (code) goto _err;
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type, 0);
tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn);
if (pBlockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
......@@ -1839,6 +1839,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
pBlockCol->cid = pColData->cid;
pBlockCol->type = pColData->type;
pBlockCol->smaOn = pColData->smaOn;
pBlockCol->flag = pColData->flag;
if (pColData->flag != HAS_NULL) {
......
......@@ -370,6 +370,7 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) {
n += tPutI16v(p ? p + n : p, pBlockCol->cid);
n += tPutI8(p ? p + n : p, pBlockCol->type);
n += tPutI8(p ? p + n : p, pBlockCol->smaOn);
n += tPutI8(p ? p + n : p, pBlockCol->flag);
if (pBlockCol->flag != HAS_NULL) {
......@@ -389,6 +390,7 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
n += tGetI16v(p + n, &pBlockCol->cid);
n += tGetI8(p + n, &pBlockCol->type);
n += tGetI8(p + n, &pBlockCol->smaOn);
n += tGetI8(p + n, &pBlockCol->flag);
ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE));
......@@ -778,10 +780,14 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
}
// SColData ========================================
void tColDataReset(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) {
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) {
pColData->cid = cid;
pColData->type = type;
pColData->smaOn = smaOn;
tColDataReset(pColData);
}
void tColDataReset(SColData *pColData) {
pColData->nVal = 0;
pColData->flag = 0;
pColData->nData = 0;
......@@ -970,6 +976,33 @@ void tBlockDataClear(SBlockData *pBlockData) {
taosArrayDestroyEx(pBlockData->aColData, tColDataClear);
}
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) {
int32_t code = 0;
SColData *pColData;
STColumn *pTColumn;
tBlockDataReset(pBlockData);
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
pTColumn = &pTSchema->columns[iColumn];
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) != 0);
}
_exit:
return code;
}
void tBlockDataClearData(SBlockData *pBlockData) {
pBlockData->nRow = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) {
SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData);
tColDataReset(pColData);
}
}
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData) {
int32_t code = 0;
SColData *pColData = NULL;
......@@ -1009,71 +1042,40 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
// OTHER
int32_t iColData = 0;
SRowIter *pIter = &((SRowIter){0});
int32_t nColData = taosArrayGetSize(pBlockData->aColDataP);
SRowIter iter = {0};
SRowIter *pIter = &iter;
SColData *pColData;
SColVal *pColVal;
ASSERT(nColData > 0);
tRowIterInit(pIter, pRow, pTSchema);
pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData);
pColVal = tRowIterNext(pIter);
pColData = (iColData < taosArrayGetSize(pBlockData->aColDataP))
? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData)
: NULL;
while (pColVal && pColData) {
if (pColVal->cid == pColData->cid) {
code = tColDataAppendValue(pColData, pColVal);
if (code) goto _err;
pColVal = tRowIterNext(pIter);
} else if (pColVal->cid > pColData->cid) {
code = tColDataAppendValue(pColData, &(COL_VAL_NONE(pColData->cid, pColData->type)));
if (code) goto _err;
} else {
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _err;
while (pColData) {
if (pColVal) {
if (pColData->cid == pColVal->cid) {
code = tColDataAppendValue(pColData, pColVal);
if (code) goto _err;
// append a NONE
tColDataReset(pColData, pColVal->cid, pColVal->type, 0);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type));
pColVal = tRowIterNext(pIter);
pColData = ((++iColData) < nColData) ? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData) : NULL;
} else if (pColData->cid < pColVal->cid) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err;
}
code = tColDataAppendValue(pColData, pColVal);
pColData = ((++iColData) < nColData) ? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData) : NULL;
} else {
pColVal = tRowIterNext(pIter);
}
} else {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err;
pColVal = tRowIterNext(pIter);
pColData = ((++iColData) < nColData) ? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData) : NULL;
}
pColData = ((++iColData) < taosArrayGetSize(pBlockData->aColDataP))
? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData)
: NULL;
}
while (pColData) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err;
pColData = ((++iColData) < taosArrayGetSize(pBlockData->aColDataP))
? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData)
: NULL;
}
while (pColVal) {
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _err;
tColDataReset(pColData, pColVal->cid, pColVal->type, 0);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type));
if (code) goto _err;
}
code = tColDataAppendValue(pColData, pColVal);
if (code) goto _err;
iColData++;
pColVal = tRowIterNext(pIter);
}
pBlockData->nRow++;
......@@ -1086,7 +1088,57 @@ _err:
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) {
int32_t code = 0;
// set target
int32_t iColData1 = 0;
int32_t nColData1 = taosArrayGetSize(pBlockData1->aColDataP);
int32_t iColData2 = 0;
int32_t nColData2 = taosArrayGetSize(pBlockData2->aColDataP);
SColData *pColData1;
SColData *pColData2;
SColData *pColData;
tBlockDataReset(pBlockData);
while (iColData1 < nColData1 && iColData2 < nColData2) {
pColData1 = (SColData *)taosArrayGetP(pBlockData1->aColDataP, iColData1);
pColData2 = (SColData *)taosArrayGetP(pBlockData2->aColDataP, iColData2);
if (pColData1->cid == pColData2->cid) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
iColData1++;
iColData2++;
} else if (pColData1->cid < pColData2->cid) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn);
iColData1++;
} else {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
iColData2++;
}
}
while (iColData1 < nColData1) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn);
iColData1++;
}
while (iColData2 < nColData2) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
iColData2++;
}
// loop to merge
int32_t iRow1 = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册