提交 d1c7e3a5 编写于 作者: H Hongze Cheng

more last refact

上级 f84f8837
...@@ -134,14 +134,15 @@ int32_t tGetColData(uint8_t *p, SColData *pColData); ...@@ -134,14 +134,15 @@ int32_t tGetColData(uint8_t *p, SColData *pColData);
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
#define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA)) #define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA))
#define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA)) #define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA))
int32_t tBlockDataInit(SBlockData *pBlockData); int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema); void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom); int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataClearData(SBlockData *pBlockData); void tBlockDataClearData(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
...@@ -468,13 +469,17 @@ struct SColData { ...@@ -468,13 +469,17 @@ struct SColData {
uint8_t *pData; uint8_t *pData;
}; };
// (SBlockData){.suid = 0, .uid = 0}: block data not initialized
// (SBlockData){.suid = suid, .uid = uid}: block data for ONE child table int .data file
// (SBlockData){.suid = suid, .uid = 0}: block data for N child tables int .last file
// (SBlockData){.suid = 0, .uid = uid}: block data for 1 normal table int .last/.data file
struct SBlockData { struct SBlockData {
int64_t suid; // 0 means normal table data block int64_t suid; // 0 means normal table block data, otherwise child table block data
int64_t uid; // 0 means block data in .last file, others in .data file int64_t uid; // 0 means block data in .last file, otherwise in .data file
int32_t nRow; int32_t nRow; // number of rows
int64_t *aUid; int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0)
int64_t *aVersion; int64_t *aVersion; // versions of each row
TSKEY *aTSKEY; TSKEY *aTSKEY; // timestamp of each row
SArray *aIdx; // SArray<int32_t> SArray *aIdx; // SArray<int32_t>
SArray *aColData; // SArray<SColData> SArray *aColData; // SArray<SColData>
}; };
......
...@@ -296,22 +296,28 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { ...@@ -296,22 +296,28 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) {
ASSERT(pCommitter->dReader.pReader); ASSERT(pCommitter->dReader.pReader);
ASSERT(pCommitter->dReader.pRowInfo); ASSERT(pCommitter->dReader.pRowInfo);
SBlockData *pBlockDatal = &pCommitter->dReader.bDatal;
pCommitter->dReader.iRow++; pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { if (pCommitter->dReader.iRow < pBlockDatal->nRow) {
pCommitter->dReader.pRowInfo->uid = pCommitter->dReader.bData.aUid[pCommitter->dReader.iRow]; if (pBlockDatal->uid == 0) {
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow];
}
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow);
} else { } else {
pCommitter->dReader.iBlockL++; pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL);
code = tsdbReadLastBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockL, &pCommitter->dReader.bDatal, code = tsdbReadLastBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockL, pBlockDatal, NULL, NULL);
NULL, NULL);
if (code) goto _exit; if (code) goto _exit;
pCommitter->dReader.iRow = 0; pCommitter->dReader.iRow = 0;
pCommitter->dReader.pRowInfo->suid = pCommitter->dReader.pBlockL->suid; pCommitter->dReader.pRowInfo->suid = pBlockDatal->suid;
pCommitter->dReader.pRowInfo->uid = pCommitter->dReader.bData.aUid[pCommitter->dReader.iRow]; if (pBlockDatal->uid) {
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid;
} else {
pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[0];
}
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow);
} else { } else {
pCommitter->dReader.pRowInfo = NULL; pCommitter->dReader.pRowInfo = NULL;
} }
...@@ -354,15 +360,16 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -354,15 +360,16 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
} else { } else {
pCommitter->dReader.pBlockIdx = NULL; pCommitter->dReader.pBlockIdx = NULL;
} }
tBlockDataReset(&pCommitter->dReader.bData);
// last // last
code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL); code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL);
if (code) goto _err; if (code) goto _err;
pCommitter->dReader.iBlockL = -1; pCommitter->dReader.iBlockL = -1;
pCommitter->dReader.bDatal.nRow = 0;
pCommitter->dReader.iRow = -1; pCommitter->dReader.iRow = -1;
pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo; pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo;
tBlockDataReset(&pCommitter->dReader.bDatal);
code = tsdbCommitterNextLastRow(pCommitter); code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err; if (code) goto _err;
} else { } else {
......
...@@ -591,8 +591,8 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB ...@@ -591,8 +591,8 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
ASSERT(n + sizeof(TSCKSUM) == size); ASSERT(n + sizeof(TSCKSUM) == size);
_exit:
tFree(pBuf); tFree(pBuf);
_exit:
return code; return code;
_err: _err:
...@@ -658,8 +658,8 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) ...@@ -658,8 +658,8 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf)
ASSERT(n + sizeof(TSCKSUM) == size); ASSERT(n + sizeof(TSCKSUM) == size);
_exit:
tFree(pBuf); tFree(pBuf);
_exit:
return code; return code;
_err: _err:
......
...@@ -1121,6 +1121,8 @@ static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) { ...@@ -1121,6 +1121,8 @@ static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) {
int32_t tBlockDataInit(SBlockData *pBlockData) { int32_t tBlockDataInit(SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
pBlockData->suid = 0;
pBlockData->uid = 0;
pBlockData->nRow = 0; pBlockData->nRow = 0;
pBlockData->aUid = NULL; pBlockData->aUid = NULL;
pBlockData->aVersion = NULL; pBlockData->aVersion = NULL;
...@@ -1141,36 +1143,45 @@ _exit: ...@@ -1141,36 +1143,45 @@ _exit:
return code; return code;
} }
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx);
}
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) { void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aUid); tFree((uint8_t *)pBlockData->aUid);
tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY); tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aIdx); taosArrayDestroy(pBlockData->aIdx);
taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL); taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL);
pBlockData->aColData = NULL; pBlockData->aUid = NULL;
pBlockData->aIdx = NULL;
pBlockData->aTSKEY = NULL;
pBlockData->aVersion = NULL; pBlockData->aVersion = NULL;
pBlockData->aTSKEY = NULL;
pBlockData->aIdx = NULL;
pBlockData->aColData = NULL;
} }
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { void tBlockDataReset(SBlockData *pBlockData) {
int32_t code = 0; pBlockData->suid = 0;
SColData *pColData; pBlockData->uid = 0;
STColumn *pTColumn; pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx);
}
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid) {
int32_t code = 0;
ASSERT(suid || uid);
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { pBlockData->suid = suid;
pTColumn = &pTSchema->columns[iColumn]; pBlockData->uid = uid;
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); if (pTSchema) {
if (code) goto _exit; for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
STColumn *pTColumn = &pTSchema->columns[iColumn];
SColData *pColData;
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) != 0); tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) != 0);
}
} }
_exit: _exit:
...@@ -1211,52 +1222,47 @@ _err: ...@@ -1211,52 +1222,47 @@ _err:
return code; return code;
} }
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
int32_t code = 0; int32_t code = 0;
// TSDBKEY ASSERT(pBlockData->suid || pBlockData->uid);
// uid
if (pBlockData->uid == 0) {
ASSERT(uid);
code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1));
if (code) goto _err;
pBlockData->aUid[pBlockData->nRow] = uid;
}
// version
code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1)); code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1));
if (code) goto _err; if (code) goto _err;
pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow);
// timestamp
code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1)); code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1));
if (code) goto _err; if (code) goto _err;
pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow);
pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
// OTHER // OTHER
int32_t iColData = 0; SRowIter rIter = {0};
int32_t nColData = taosArrayGetSize(pBlockData->aIdx); SColVal *pColVal;
SRowIter iter = {0};
SRowIter *pIter = &iter;
SColData *pColData;
SColVal *pColVal;
if (nColData == 0) goto _exit; tRowIterInit(&rIter, pRow, pTSchema);
pColVal = tRowIterNext(&rIter);
tRowIterInit(pIter, pRow, pTSchema); for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
pColVal = tRowIterNext(pIter);
while (pColData) {
if (pColVal) {
if (pColData->cid == pColVal->cid) {
code = tColDataAppendValue(pColData, pColVal);
if (code) goto _err;
pColVal = tRowIterNext(pIter); while (pColVal && pColVal->cid < pColData->cid) {
pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL; pColVal = tRowIterNext(&rIter);
} else if (pColData->cid < pColVal->cid) { }
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err;
pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL; if (pColVal == NULL || pColVal->cid > pColData->cid) {
} else {
pColVal = tRowIterNext(pIter);
}
} else {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err; if (code) goto _err;
} else {
pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL; code = tColDataAppendValue(pColData, pColVal);
if (code) goto _err;
pColVal = tRowIterNext(&rIter);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册