提交 49fde0b1 编写于 作者: H Hongze Cheng

more work

上级 6baae7d5
......@@ -141,17 +141,17 @@ int32_t tGetColData(uint8_t *p, SColData *pColData);
int32_t tBlockDataCreate(SBlockData *pBlockData);
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema);
int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId);
int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
#if 1
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
#endif
......@@ -261,10 +261,8 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, int16_t *aColId,
int32_t nColId);
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, int16_t *aColId,
int32_t nColId);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData);
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData);
// SDelFWriter
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
......
......@@ -835,583 +835,72 @@ _err:
return code;
}
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, int16_t *aColId,
int32_t nColId) {
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData) {
int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[0], 0, pBlockData);
if (code) goto _err;
for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
// TODO
ASSERT(0);
}
return code;
_err:
tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, int16_t *aColId,
int32_t nColId) {
int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData);
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
#if 0
static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf,
uint8_t **ppBuf) {
int32_t code = 0;
#if 0
int64_t size;
int64_t n;
if (!taosCheckChecksumWhole(pBuf, pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
pColData->nVal = pSubBlock->nRow;
pColData->flag = pBlockCol->flag;
// BITMAP
if (pBlockCol->flag != HAS_VALUE) {
ASSERT(pBlockCol->szBitmap);
size = BIT2_SIZE(pColData->nVal);
code = tRealloc(&pColData->pBitMap, size);
if (code) goto _err;
code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
if (code) goto _err;
n = tsDecompressTinyint(pBuf, pBlockCol->szBitmap, size, pColData->pBitMap, size, TWO_STAGE_COMP, *ppBuf,
size + COMP_OVERFLOW_BYTES);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
ASSERT(n == size);
} else {
ASSERT(pBlockCol->szBitmap == 0);
}
pBuf = pBuf + pBlockCol->szBitmap;
// OFFSET
if (IS_VAR_DATA_TYPE(pColData->type)) {
ASSERT(pBlockCol->szOffset);
size = sizeof(int32_t) * pColData->nVal;
code = tRealloc((uint8_t **)&pColData->aOffset, size);
if (code) goto _err;
code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
if (code) goto _err;
n = tsDecompressInt(pBuf, pBlockCol->szOffset, pColData->nVal, (char *)pColData->aOffset, size, TWO_STAGE_COMP,
*ppBuf, size + COMP_OVERFLOW_BYTES);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
ASSERT(n == size);
} else {
ASSERT(pBlockCol->szOffset == 0);
}
pBuf = pBuf + pBlockCol->szOffset;
// VALUE
pColData->nData = pBlockCol->szOrigin;
code = tRealloc(&pColData->pData, pColData->nData);
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
memcpy(pColData->pData, pBuf, pColData->nData);
} else {
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tRealloc(ppBuf, pColData->nData + COMP_OVERFLOW_BYTES);
if (code) goto _err;
}
n = tDataTypes[pBlockCol->type].decompFunc(pBuf, pBlockCol->szValue, pSubBlock->nRow, pColData->pData,
pColData->nData, pSubBlock->cmprAlg, *ppBuf,
pColData->nData + COMP_OVERFLOW_BYTES);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
ASSERT(n == pColData->nData);
}
return code;
_err:
#endif
return code;
}
static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
int32_t code = 0;
#if 0
TdFilePtr pFD = pReader->pDataFD;
SBlockInfo *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SArray *aBlockCol = NULL;
int64_t offset;
int64_t size;
int64_t n;
tBlockDataReset(pBlockData);
pBlockData->nRow = pSubBlock->nRow;
// TSDBKEY and SBlockCol
if (nCol == 1) {
offset = pSubBlock->offset + pSubBlock->szBlockCol + sizeof(TSCKSUM);
size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
} else {
offset = pSubBlock->offset;
size = pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
}
code = tRealloc(ppBuf1, size);
if (code) goto _err;
n = taosLSeekFile(pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosReadFile(pFD, *ppBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
if (nCol == 1) {
code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
if (code) goto _err;
goto _exit;
} else {
aBlockCol = taosArrayInit(0, sizeof(SBlockCol));
if (aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (pBlock->nSubBlock > 1) {
SBlockData bData1;
SBlockData bData2;
code = tsdbReadBlockCol(*ppBuf1, pSubBlock->szBlock, NULL /*todo*/, aBlockCol);
// create
code = tBlockDataCreate(&bData1);
if (code) goto _err;
code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM), ppBuf2);
code = tBlockDataCreate(&bData2);
if (code) goto _err;
}
for (int32_t iCol = 1; iCol < nCol; iCol++) {
void *p = taosArraySearch(aBlockCol, &(SBlockCol){.cid = aColId[iCol]}, tBlockColCmprFn, TD_EQ);
if (p) {
SBlockCol *pBlockCol = (SBlockCol *)p;
SColData *pColData;
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _err;
tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn);
if (pBlockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
} else {
offset = pSubBlock->offset + pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion +
pSubBlock->szTSKEY + sizeof(TSCKSUM) + pBlockCol->offset;
size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
code = tRealloc(ppBuf1, size);
if (code) goto _err;
// seek
n = taosLSeekFile(pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pFD, *ppBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbReadColDataImpl(pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2);
if (code) goto _err;
}
}
}
_exit:
taosArrayDestroy(aBlockCol);
return code;
_err:
taosArrayDestroy(aBlockCol);
#endif
return code;
}
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0;
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
ASSERT(aColId[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
// init
tBlockDataInitEx(&bData1, pBlockData);
tBlockDataInitEx(&bData2, pBlockData);
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2);
if (code) goto _err;
if (pBlock->nSubBlock > 1) {
SBlockData *pBlockData1 = &(SBlockData){0};
SBlockData *pBlockData2 = &(SBlockData){0};
tBlockDataCreate(pBlockData1);
tBlockDataCreate(pBlockData2);
for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2);
if (code) goto _err;
code = tBlockDataCopy(pBlockData, pBlockData2);
code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[iSubBlock], 0, &bData1);
if (code) {
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
goto _err;
}
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
code = tBlockDataCopy(pBlockData, &bData2);
if (code) {
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
goto _err;
}
}
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
}
tFree(pBuf1);
tFree(pBuf2);
return code;
_err:
tsdbError("vgId:%d, tsdb read col data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf1);
tFree(pBuf2);
return code;
}
static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32_t iSubBlock, SBlockData *pBlockData,
uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0;
#if 0
uint8_t *p;
int64_t size;
int64_t n;
TdFilePtr pFD = pReader->pDataFD;
SBlockInfo *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SArray *aBlockCol = NULL;
tBlockDataReset(pBlockData);
// realloc
code = tRealloc(ppBuf1, pSubBlock->szBlock);
if (code) goto _err;
// seek
n = taosLSeekFile(pFD, pSubBlock->offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pFD, *ppBuf1, pSubBlock->szBlock);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pSubBlock->szBlock) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
pBlockData->nRow = pSubBlock->nRow;
// TSDBKEY
p = *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM);
code = tsdbReadBlockDataKey(pBlockData, pSubBlock, p, ppBuf2);
if (code) goto _err;
// COLUMNS
aBlockCol = taosArrayInit(0, sizeof(SBlockCol));
if (aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbReadBlockCol(pSubBlock, *ppBuf1, aBlockCol);
if (code) goto _err;
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) {
SColData *pColData;
SBlockCol *pBlockCol = (SBlockCol *)taosArrayGet(aBlockCol, iBlockCol);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData);
if (code) goto _err;
tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn);
if (pBlockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
} else {
p = *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY +
sizeof(TSCKSUM) + pBlockCol->offset;
code = tsdbReadColDataImpl(pSubBlock, pBlockCol, pColData, p, ppBuf2);
if (code) goto _err;
}
}
taosArrayDestroy(aBlockCol);
return code;
_err:
tsdbError("vgId:%d, tsdb read sub block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
taosArrayDestroy(aBlockCol);
#endif
return code;
}
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
int32_t code = 0;
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
// read the first sub-block
int32_t iSubBlock = 0;
code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2);
if (code) goto _err;
// read remain block data and do merg
if (pBlock->nSubBlock > 1) {
SBlockData *pBlockData1 = &(SBlockData){0};
SBlockData *pBlockData2 = &(SBlockData){0};
tBlockDataCreate(pBlockData1);
tBlockDataCreate(pBlockData2);
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
code = tBlockDataMerge(&bData1, &bData2, pBlockData);
if (code) {
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
goto _err;
}
code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) {
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
goto _err;
}
// merge two block data
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) {
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
goto _err;
}
}
tBlockDataDestroy(pBlockData1, 1);
tBlockDataDestroy(pBlockData2, 1);
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
}
ASSERT(pBlock->nRow == pBlockData->nRow);
ASSERT(tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&tBlockDataFirstRow(pBlockData))) == 0);
ASSERT(tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&tBlockDataLastRow(pBlockData))) == 0);
tFree(pBuf1);
tFree(pBuf2);
return code;
_err:
tsdbError("vgId:%d, tsdb read block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf1);
tFree(pBuf2);
tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData) {
int32_t code = 0;
#if 0
tBlockDataReset(pBlockData);
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
// realloc
code = tRealloc(ppBuf1, pBlockL->szBlock);
if (code) goto _err;
// seek
int64_t n = taosLSeekFile(pReader->pLastFD, pBlockL->offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pReader->pLastFD, *ppBuf1, pBlockL->szBlock);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pBlockL->szBlock) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// decode block col
SDiskDataHdr hdr;
SArray *aBlockCol = taosArrayInit(0, sizeof(SBlockCol));
uint8_t *p = *ppBuf1;
if (aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbReadBlockCol(p, pBlockL->szBlockCol, &hdr, aBlockCol);
if (code) goto _err;
p += pBlockL->szBlockCol + sizeof(TSCKSUM);
// checksum
if (!taosCheckChecksumWhole(p, pBlockL->szUid + pBlockL->szVer + pBlockL->szTSKEY + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// UID
if (hdr.uid == 0) {
code = tsdbReadDataArray(p, pBlockL->szUid, pBlockL->nRow, TSDB_DATA_TYPE_BIGINT, pBlockL->cmprAlg,
(uint8_t **)&pBlockData->aUid, ppBuf2);
if (code) goto _err;
} else {
ASSERT(pBlockL->szUid == 0);
}
p += pBlockL->szUid;
// VERSION
code = tsdbReadDataArray(p, pBlockL->szVer, pBlockL->nRow, TSDB_DATA_TYPE_BIGINT, pBlockL->cmprAlg,
(uint8_t **)&pBlockData->aVersion, ppBuf2);
if (code) goto _err;
p += pBlockL->szVer;
// TSKEY
code = tsdbReadDataArray(p, pBlockL->szTSKEY, pBlockL->nRow, TSDB_DATA_TYPE_TIMESTAMP, pBlockL->cmprAlg,
(uint8_t **)&pBlockData->aTSKEY, ppBuf2);
if (code) goto _err;
p += pBlockL->szTSKEY;
p += sizeof(TSCKSUM);
// COLUMN
code = tBlockDataSetSchema(pBlockData, NULL, hdr.suid, hdr.uid);
code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData);
if (code) goto _err;
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) {
SBlockCol *pBlockCol = (SBlockCol *)taosArrayGet(aBlockCol, iBlockCol);
SColData *pColData;
// checksum
if (!taosCheckChecksumWhole(p, pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// add SColData
code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData);
if (code) goto _err;
tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn);
pColData->nVal = pBlockL->nRow;
pColData->flag = pBlockCol->flag;
// bitmap
if (pBlockCol->szBitmap) {
code = tsdbReadDataArray(p, pBlockCol->szBitmap, );
if (code) goto _err;
}
p += pBlockCol->szBitmap;
// offset
if (pBlockCol->szOffset) {
code = tsdbReadDataArray(p, pBlockCol->szOffset, );
if (code) goto _err;
}
p += pBlockCol->szOffset;
// value
pColData->nData = pBlockCol->szOrigin;
if (pColData->nData) {
// TODO
}
}
taosArrayDestroy(aBlockCol);
return code;
_err:
tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
#endif
return code;
}
#endif
// SDataFWriter ====================================================
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
......
......@@ -949,13 +949,14 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) {
int32_t size;
ASSERT(pColDataSrc->nVal > 0);
ASSERT(pColDataDest->cid = pColDataSrc->cid);
ASSERT(pColDataDest->type = pColDataSrc->type);
pColDataDest->cid = pColDataSrc->cid;
pColDataDest->type = pColDataSrc->type;
pColDataDest->smaOn = pColDataSrc->smaOn;
pColDataDest->nVal = pColDataSrc->nVal;
pColDataDest->flag = pColDataSrc->flag;
// bitmap
if (pColDataSrc->flag != HAS_NONE && pColDataSrc->flag != HAS_NULL && pColDataSrc->flag != HAS_VALUE) {
size = BIT2_SIZE(pColDataSrc->nVal);
code = tRealloc(&pColDataDest->pBitMap, size);
......@@ -963,6 +964,7 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) {
memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, size);
}
// offset
if (IS_VAR_DATA_TYPE(pColDataDest->type)) {
size = sizeof(int32_t) * pColDataSrc->nVal;
......@@ -972,9 +974,10 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) {
memcpy(pColDataDest->aOffset, pColDataSrc->aOffset, size);
}
// value
pColDataDest->nData = pColDataSrc->nData;
code = tRealloc(&pColDataDest->pData, pColDataSrc->nData);
if (code) goto _exit;
pColDataDest->nData = pColDataSrc->nData;
memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataDest->nData);
_exit:
......@@ -1169,32 +1172,25 @@ _exit:
return code;
}
int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId) {
int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom) {
int32_t code = 0;
ASSERT(0);
ASSERT(suid || uid);
ASSERT(pBlockDataFrom->suid || pBlockDataFrom->uid);
pBlockData->suid = suid;
pBlockData->uid = uid;
pBlockData->suid = pBlockDataFrom->suid;
pBlockData->uid = pBlockDataFrom->uid;
pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx);
if (aColId) {
int16_t lcid = -1;
for (int32_t iColId = 0; iColId < taosArrayGetSize(aColId); iColId++) {
int16_t cid = *(int16_t *)taosArrayGet(aColId, iColId);
ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID);
ASSERT(cid > lcid);
lcid = cid;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockDataFrom->aIdx); iColData++) {
SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColData);
SColData *pColData;
code = tBlockDataAddColData(pBlockData, iColId, &pColData);
if (code) goto _exit;
SColData *pColData;
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, cid, TSDB_DATA_TYPE_NULL, -1);
}
tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn);
}
_exit:
......@@ -1339,128 +1335,112 @@ _exit:
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) {
int32_t code = 0;
// set target
int32_t iColData1 = 0;
int32_t nColData1 = taosArrayGetSize(pBlockData1->aIdx);
int32_t iColData2 = 0;
int32_t nColData2 = taosArrayGetSize(pBlockData2->aIdx);
SColData *pColData1;
SColData *pColData2;
SColData *pColData;
ASSERT(pBlockData->suid == pBlockData1->suid);
ASSERT(pBlockData->uid == pBlockData1->uid);
ASSERT(pBlockData1->nRow > 0);
ASSERT(pBlockData2->nRow > 0);
tBlockDataReset(pBlockData);
while (iColData1 < nColData1 && iColData2 < nColData2) {
pColData1 = tBlockDataGetColDataByIdx(pBlockData1, iColData1);
pColData2 = tBlockDataGetColDataByIdx(pBlockData2, iColData2);
tBlockDataClear(pBlockData);
if (pColData1->cid == pColData2->cid) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
TSDBROW row1 = tsdbRowFromBlockData(pBlockData1, 0);
TSDBROW row2 = tsdbRowFromBlockData(pBlockData2, 0);
TSDBROW *pRow1 = &row1;
TSDBROW *pRow2 = &row2;
iColData1++;
iColData2++;
} else if (pColData1->cid < pColData2->cid) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
while (pRow1 && pRow2) {
int32_t c = tsdbRowCmprFn(pRow1, pRow2);
if (c < 0) {
code = tBlockDataAppendRow(pBlockData, pRow1, NULL,
pBlockData1->uid ? pBlockData1->uid : pBlockData1->aUid[pRow1->iRow]);
if (code) goto _exit;
tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn);
iColData1++;
} else {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
pRow1->iRow++;
if (pRow1->iRow < pBlockData1->nRow) {
*pRow1 = tsdbRowFromBlockData(pBlockData1, pRow1->iRow);
} else {
pRow1 = NULL;
}
} else if (c > 0) {
code = tBlockDataAppendRow(pBlockData, pRow2, NULL,
pBlockData2->uid ? pBlockData2->uid : pBlockData2->aUid[pRow2->iRow]);
if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
iColData2++;
pRow2->iRow++;
if (pRow2->iRow < pBlockData2->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockData2, pRow2->iRow);
} else {
pRow2 = NULL;
}
} else {
ASSERT(0);
}
}
while (iColData1 < nColData1) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn);
iColData1++;
}
while (iColData2 < nColData2) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
while (pRow1) {
code = tBlockDataAppendRow(pBlockData, pRow1, NULL,
pBlockData1->uid ? pBlockData1->uid : pBlockData1->aUid[pRow1->iRow]);
if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
iColData2++;
}
// loop to merge
int32_t iRow1 = 0;
int32_t nRow1 = pBlockData1->nRow;
int32_t iRow2 = 0;
int32_t nRow2 = pBlockData2->nRow;
TSDBROW row1;
TSDBROW row2;
int32_t c;
while (iRow1 < nRow1 && iRow2 < nRow2) {
row1 = tsdbRowFromBlockData(pBlockData1, iRow1);
row2 = tsdbRowFromBlockData(pBlockData2, iRow2);
c = tsdbKeyCmprFn(&TSDBROW_KEY(&row1), &TSDBROW_KEY(&row2));
if (c < 0) {
// code = tBlockDataAppendRow(pBlockData, &row1, NULL);
if (code) goto _exit;
iRow1++;
} else if (c > 0) {
// code = tBlockDataAppendRow(pBlockData, &row2, NULL);
if (code) goto _exit;
iRow2++;
pRow1->iRow++;
if (pRow1->iRow < pBlockData1->nRow) {
*pRow1 = tsdbRowFromBlockData(pBlockData1, pRow1->iRow);
} else {
ASSERT(0);
pRow1 = NULL;
}
}
while (iRow1 < nRow1) {
row1 = tsdbRowFromBlockData(pBlockData1, iRow1);
// code = tBlockDataAppendRow(pBlockData, &row1, NULL);
while (pRow2) {
code = tBlockDataAppendRow(pBlockData, pRow2, NULL,
pBlockData2->uid ? pBlockData2->uid : pBlockData2->aUid[pRow2->iRow]);
if (code) goto _exit;
iRow1++;
}
while (iRow2 < nRow2) {
row2 = tsdbRowFromBlockData(pBlockData2, iRow2);
// code = tBlockDataAppendRow(pBlockData, &row2, NULL);
if (code) goto _exit;
iRow2++;
pRow2->iRow++;
if (pRow2->iRow < pBlockData2->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockData2, pRow2->iRow);
} else {
pRow2 = NULL;
}
}
_exit:
return code;
}
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) {
int32_t code = 0;
SColData *pColDataSrc;
SColData *pColDataDest;
int32_t tBlockDataCopy(SBlockData *pSrc, SBlockData *pDest) {
int32_t code = 0;
tBlockDataClear(pDest);
ASSERT(pBlockDataSrc->nRow > 0);
ASSERT(pDest->suid == pSrc->suid);
ASSERT(pDest->uid == pSrc->uid);
ASSERT(pSrc->nRow == pDest->nRow);
ASSERT(taosArrayGetSize(pSrc->aIdx) == taosArrayGetSize(pDest->aIdx));
tBlockDataReset(pBlockDataDest);
pDest->nRow = pSrc->nRow;
pBlockDataDest->nRow = pBlockDataSrc->nRow;
// TSDBKEY
code = tRealloc((uint8_t **)&pBlockDataDest->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow);
if (pSrc->uid == 0) {
code = tRealloc((uint8_t **)&pDest->aUid, sizeof(int64_t) * pDest->nRow);
if (code) goto _exit;
memcpy(pDest->aUid, pSrc->aUid, sizeof(int64_t) * pDest->nRow);
}
code = tRealloc((uint8_t **)&pDest->aVersion, sizeof(int64_t) * pDest->nRow);
if (code) goto _exit;
code = tRealloc((uint8_t **)&pBlockDataDest->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow);
memcpy(pDest->aVersion, pSrc->aVersion, sizeof(int64_t) * pDest->nRow);
code = tRealloc((uint8_t **)&pDest->aTSKEY, sizeof(TSKEY) * pDest->nRow);
if (code) goto _exit;
memcpy(pBlockDataDest->aVersion, pBlockDataSrc->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow);
memcpy(pBlockDataDest->aTSKEY, pBlockDataSrc->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow);
memcpy(pDest->aTSKEY, pSrc->aTSKEY, sizeof(TSKEY) * pDest->nRow);
// other
for (size_t iColData = 0; iColData < taosArrayGetSize(pBlockDataSrc->aIdx); iColData++) {
pColDataSrc = tBlockDataGetColDataByIdx(pBlockDataSrc, iColData);
code = tBlockDataAddColData(pBlockDataDest, iColData, &pColDataDest);
if (code) goto _exit;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pSrc->aIdx); iColData++) {
SColData *pColSrc = tBlockDataGetColDataByIdx(pSrc, iColData);
SColData *pColDest = tBlockDataGetColDataByIdx(pDest, iColData);
ASSERT(pColSrc->cid == pColDest->cid);
ASSERT(pColSrc->type == pColDest->type);
code = tColDataCopy(pColDataSrc, pColDataDest);
code = tColDataCopy(pColSrc, pColDest);
if (code) goto _exit;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册