提交 0525b4fa 编写于 作者: H Hongze Cheng

more code

上级 f9870c36
...@@ -94,7 +94,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); ...@@ -94,7 +94,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
void tRowIterClose(SRowIter **ppIter); void tRowIterClose(SRowIter **ppIter);
SColVal *tRowIterNext(SRowIter *pIter); SColVal *tRowIterNext(SRowIter *pIter);
int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SArray *aColData, int32_t nColData); int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData);
// STag ================================ // STag ================================
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
......
...@@ -52,11 +52,13 @@ _exit: ...@@ -52,11 +52,13 @@ _exit:
return code; return code;
} }
static FORCE_INLINE void tFree(uint8_t *pBuf) { #define tFree(BUF) \
if (pBuf) { do { \
taosMemoryFree(pBuf - sizeof(int64_t)); if (BUF) { \
} taosMemoryFree((uint8_t *)(BUF) - sizeof(int64_t)); \
} (BUF) = NULL; \
} \
} while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -841,11 +841,11 @@ _exit: ...@@ -841,11 +841,11 @@ _exit:
return &pIter->cv; return &pIter->cv;
} }
static int32_t tRowAppendNoneToColData(SArray *aColData, int32_t nColData) { static int32_t tRowAppendNoneToColData(SColData *aColData, int32_t nColData) {
int32_t code = 0; int32_t code = 0;
for (int32_t iColData = 0; iColData < nColData; iColData++) { for (int32_t iColData = 0; iColData < nColData; iColData++) {
SColData *pColData = taosArrayGet(aColData, iColData); SColData *pColData = &aColData[iColData];
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
} }
...@@ -853,11 +853,11 @@ static int32_t tRowAppendNoneToColData(SArray *aColData, int32_t nColData) { ...@@ -853,11 +853,11 @@ static int32_t tRowAppendNoneToColData(SArray *aColData, int32_t nColData) {
_exit: _exit:
return code; return code;
} }
static int32_t tRowAppendNullToColData(SArray *aColData, int32_t nColData, STSchema *pSchema) { static int32_t tRowAppendNullToColData(SColData *aColData, int32_t nColData, STSchema *pSchema) {
int32_t code = 0; int32_t code = 0;
int32_t iColData = 0; int32_t iColData = 0;
SColData *pColData = taosArrayGet(aColData, iColData); SColData *pColData = &aColData[iColData];
int32_t iTColumn = 1; int32_t iTColumn = 1;
STColumn *pTColumn = &pSchema->columns[iTColumn]; STColumn *pTColumn = &pSchema->columns[iTColumn];
...@@ -866,12 +866,12 @@ static int32_t tRowAppendNullToColData(SArray *aColData, int32_t nColData, STSch ...@@ -866,12 +866,12 @@ static int32_t tRowAppendNullToColData(SArray *aColData, int32_t nColData, STSch
if (pTColumn->colId == pColData->cid) { // NULL if (pTColumn->colId == pColData->cid) { // NULL
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
pTColumn = (++iTColumn < pSchema->numOfCols) ? &pSchema->columns[iTColumn] : NULL; pTColumn = (++iTColumn < pSchema->numOfCols) ? &pSchema->columns[iTColumn] : NULL;
} else if (pTColumn->colId > pColData->cid) { // NONE } else if (pTColumn->colId > pColData->cid) { // NONE
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
} else { } else {
pTColumn = (++iTColumn < pSchema->numOfCols) ? &pSchema->columns[iTColumn] : NULL; pTColumn = (++iTColumn < pSchema->numOfCols) ? &pSchema->columns[iTColumn] : NULL;
} }
...@@ -879,18 +879,18 @@ static int32_t tRowAppendNullToColData(SArray *aColData, int32_t nColData, STSch ...@@ -879,18 +879,18 @@ static int32_t tRowAppendNullToColData(SArray *aColData, int32_t nColData, STSch
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
} }
} }
_exit: _exit:
return code; return code;
} }
static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SArray *aColData, int32_t nColData) { static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) {
int32_t code = 0; int32_t code = 0;
int32_t iColData = 0; int32_t iColData = 0;
SColData *pColData = taosArrayGet(aColData, iColData); SColData *pColData = &aColData[iColData];
int32_t iTColumn = 1; int32_t iTColumn = 1;
STColumn *pTColumn = &pTSchema->columns[iTColumn]; STColumn *pTColumn = &pTSchema->columns[iTColumn];
...@@ -971,13 +971,13 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SArray * ...@@ -971,13 +971,13 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SArray *
} }
_continue: _continue:
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
pTColumn = (++iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; pTColumn = (++iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
} else if (pTColumn->colId > pColData->cid) { // NONE } else if (pTColumn->colId > pColData->cid) { // NONE
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
} else { } else {
pTColumn = (++iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; pTColumn = (++iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
} }
...@@ -985,20 +985,20 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SArray * ...@@ -985,20 +985,20 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SArray *
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
} }
} }
_exit: _exit:
return code; return code;
} }
static int32_t tRowAppendKVToColData(SRow *pRow, STSchema *pTSchema, SArray *aColData, int32_t nColData) { static int32_t tRowAppendKVToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) {
int32_t code = 0; int32_t code = 0;
SKVIdx *pKVIdx = (SKVIdx *)pRow->data; SKVIdx *pKVIdx = (SKVIdx *)pRow->data;
uint8_t *pv = NULL; uint8_t *pv = NULL;
int32_t iColData = 0; int32_t iColData = 0;
SColData *pColData = taosArrayGet(aColData, iColData); SColData *pColData = &aColData[iColData];
int32_t iTColumn = 1; int32_t iTColumn = 1;
STColumn *pTColumn = &pTSchema->columns[iTColumn]; STColumn *pTColumn = &pTSchema->columns[iTColumn];
int32_t iCol = 0; int32_t iCol = 0;
...@@ -1054,26 +1054,26 @@ static int32_t tRowAppendKVToColData(SRow *pRow, STSchema *pTSchema, SArray *aCo ...@@ -1054,26 +1054,26 @@ static int32_t tRowAppendKVToColData(SRow *pRow, STSchema *pTSchema, SArray *aCo
if (code) goto _exit; if (code) goto _exit;
_continue: _continue:
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
pTColumn = (++iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; pTColumn = (++iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
} else if (pTColumn->colId > pColData->cid) { } else if (pTColumn->colId > pColData->cid) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
} else { } else {
pTColumn = (++iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; pTColumn = (++iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
} }
} else { } else {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
pColData = (++iColData < nColData) ? taosArrayGet(aColData, iColData) : NULL; pColData = (++iColData < nColData) ? &aColData[iColData] : NULL;
} }
} }
_exit: _exit:
return code; return code;
} }
int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SArray *aColData, int32_t nColData) { int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) {
ASSERT(pRow->sver == pTSchema->version); ASSERT(pRow->sver == pTSchema->version);
ASSERT(nColData > 0); ASSERT(nColData > 0);
...@@ -1529,7 +1529,7 @@ void tColDataDestroy(void *ph) { ...@@ -1529,7 +1529,7 @@ void tColDataDestroy(void *ph) {
SColData *pColData = (SColData *)ph; SColData *pColData = (SColData *)ph;
tFree(pColData->pBitMap); tFree(pColData->pBitMap);
tFree((uint8_t *)pColData->aOffset); tFree(pColData->aOffset);
tFree(pColData->pData); tFree(pColData->pData);
} }
......
...@@ -154,6 +154,7 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs); ...@@ -154,6 +154,7 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs);
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
#define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA)) #define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA))
#define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA)) #define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA))
#define tBlockDataGetColDataByIdx(PBLOCKDATA, IDX) (&(PBLOCKDATA)->aColData[IDX])
int32_t tBlockDataCreate(SBlockData *pBlockData); int32_t tBlockDataCreate(SBlockData *pBlockData);
void tBlockDataDestroy(SBlockData *pBlockData); void tBlockDataDestroy(SBlockData *pBlockData);
...@@ -161,10 +162,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchem ...@@ -161,10 +162,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchem
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataAddColData(SBlockData *pBlockData, SColData **ppColData);
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[], int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
int32_t aBufN[]); int32_t aBufN[]);
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]); int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]);
...@@ -480,7 +478,7 @@ struct SBlockData { ...@@ -480,7 +478,7 @@ struct SBlockData {
int64_t *aVersion; // versions of each row int64_t *aVersion; // versions of each row
TSKEY *aTSKEY; // timestamp of each row TSKEY *aTSKEY; // timestamp of each row
int32_t nColData; int32_t nColData;
SArray *aColData; // SArray<SColData> SColData *aColData;
}; };
struct TABLEID { struct TABLEID {
......
...@@ -23,7 +23,7 @@ void tMapDataReset(SMapData *pMapData) { ...@@ -23,7 +23,7 @@ void tMapDataReset(SMapData *pMapData) {
} }
void tMapDataClear(SMapData *pMapData) { void tMapDataClear(SMapData *pMapData) {
tFree((uint8_t *)pMapData->aOffset); tFree(pMapData->aOffset);
tFree(pMapData->pData); tFree(pMapData->pData);
pMapData->pData = NULL; pMapData->pData = NULL;
pMapData->aOffset = NULL; pMapData->aOffset = NULL;
...@@ -928,27 +928,49 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) { ...@@ -928,27 +928,49 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) {
pBlockData->aVersion = NULL; pBlockData->aVersion = NULL;
pBlockData->aTSKEY = NULL; pBlockData->aTSKEY = NULL;
pBlockData->nColData = 0; pBlockData->nColData = 0;
pBlockData->aColData = taosArrayInit(0, sizeof(SColData)); pBlockData->aColData = NULL;
if (pBlockData->aColData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit: _exit:
return code; return code;
} }
void tBlockDataDestroy(SBlockData *pBlockData) { void tBlockDataDestroy(SBlockData *pBlockData) {
tFree((uint8_t *)pBlockData->aUid); tFree(pBlockData->aUid);
tFree((uint8_t *)pBlockData->aVersion); tFree(pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY); tFree(pBlockData->aTSKEY);
taosArrayDestroyEx(pBlockData->aColData, tColDataDestroy);
pBlockData->aUid = NULL; for (int32_t i = 0; i < pBlockData->nColData; i++) {
pBlockData->aVersion = NULL; tColDataDestroy(&pBlockData->aColData[i]);
pBlockData->aTSKEY = NULL; }
if (pBlockData->aColData) {
taosMemoryFree(pBlockData->aColData);
pBlockData->aColData = NULL; pBlockData->aColData = NULL;
}
} }
static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) {
int32_t code = 0;
if (pBlockData->nColData > nColData) {
for (int32_t i = nColData; i < pBlockData->nColData; i++) {
tColDataDestroy(&pBlockData->aColData[i]);
}
} else if (pBlockData->nColData < nColData) {
SColData *aColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SBlockData) * nColData);
if (aColData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pBlockData->aColData = aColData;
memset(&pBlockData->aColData[pBlockData->nColData], 0, sizeof(SBlockData) * (nColData - pBlockData->nColData));
}
pBlockData->nColData = nColData;
_exit:
return code;
}
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) { int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) {
int32_t code = 0; int32_t code = 0;
...@@ -958,37 +980,35 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, ...@@ -958,37 +980,35 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema,
pBlockData->uid = pId->uid; pBlockData->uid = pId->uid;
pBlockData->nRow = 0; pBlockData->nRow = 0;
pBlockData->nColData = 0;
if (aCid) { if (aCid) {
code = tBlockDataAdjustColData(pBlockData, nCid);
if (code) goto _exit;
int32_t iColumn = 1; int32_t iColumn = 1;
STColumn *pTColumn = &pTSchema->columns[iColumn]; STColumn *pTColumn = &pTSchema->columns[iColumn];
for (int32_t iCid = 0; iCid < nCid; iCid++) { for (int32_t iCid = 0; iCid < nCid; iCid++) {
while (pTColumn && pTColumn->colId < aCid[iCid]) { ASSERT(pTColumn);
while (pTColumn->colId < aCid[iCid]) {
iColumn++; iColumn++;
pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL; ASSERT(iColumn < pTSchema->numOfCols);
pTColumn = &pTSchema->columns[iColumn];
} }
if (pTColumn == NULL) { ASSERT(pTColumn->colId == aCid[iCid]);
break; tColDataInit(&pBlockData->aColData[iCid], pTColumn->colId, pTColumn->type,
} else if (pTColumn->colId == aCid[iCid]) { (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
SColData *pColData;
code = tBlockDataAddColData(pBlockData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
iColumn++; iColumn++;
pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL; pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL;
} }
}
} else { } else {
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { code = tBlockDataAdjustColData(pBlockData, pTSchema->numOfCols - 1);
STColumn *pTColumn = &pTSchema->columns[iColumn];
SColData *pColData;
code = tBlockDataAddColData(pBlockData, &pColData);
if (code) goto _exit; if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
STColumn *pTColumn = &pTSchema->columns[iColData + 1];
tColDataInit(&pBlockData->aColData[iColData], pTColumn->colId, pTColumn->type,
(pTColumn->flags & COL_SMA_ON) ? 1 : 0);
} }
} }
...@@ -999,8 +1019,6 @@ _exit: ...@@ -999,8 +1019,6 @@ _exit:
void tBlockDataReset(SBlockData *pBlockData) { void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->suid = 0; pBlockData->suid = 0;
pBlockData->uid = 0; pBlockData->uid = 0;
pBlockData->nRow = 0;
pBlockData->nColData = 0;
} }
void tBlockDataClear(SBlockData *pBlockData) { void tBlockDataClear(SBlockData *pBlockData) {
...@@ -1008,33 +1026,10 @@ void tBlockDataClear(SBlockData *pBlockData) { ...@@ -1008,33 +1026,10 @@ void tBlockDataClear(SBlockData *pBlockData) {
pBlockData->nRow = 0; pBlockData->nRow = 0;
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); tColDataClear(tBlockDataGetColDataByIdx(pBlockData, iColData));
tColDataClear(pColData);
} }
} }
int32_t tBlockDataAddColData(SBlockData *pBlockData, SColData **ppColData) {
int32_t code = 0;
SColData *pColData = NULL;
if (pBlockData->nColData >= taosArrayGetSize(pBlockData->aColData)) {
if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
pColData = (SColData *)taosArrayGet(pBlockData->aColData, pBlockData->nColData);
pBlockData->nColData++;
*ppColData = pColData;
return code;
_err:
*ppColData = NULL;
return code;
}
static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow) { static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow) {
int32_t code = 0; int32_t code = 0;
...@@ -1095,12 +1090,14 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -1095,12 +1090,14 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
SColVal cv = {0}; SColVal cv = {0};
if (pRow->type == 0) { if (pRow->type == TSDBROW_ROW_FMT) {
code = tRowAppendToColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData); code = tRowAppendToColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData);
if (code) goto _err; if (code) goto _err;
} else { } else if (pRow->type == TSDBROW_COL_FMT) {
code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow); code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow);
if (code) goto _err; if (code) goto _err;
} else {
ASSERT(0);
} }
pBlockData->nRow++; pBlockData->nRow++;
...@@ -1110,133 +1107,13 @@ _err: ...@@ -1110,133 +1107,13 @@ _err:
return code; return code;
} }
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom) {
int32_t code = 0;
int32_t iColData = 0;
for (int32_t iColDataFrom = 0; iColDataFrom < pBlockDataFrom->nColData; iColDataFrom++) {
SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom);
while (true) {
SColData *pColData;
if (iColData < pBlockData->nColData) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
} else {
pColData = NULL;
}
if (pColData == NULL || pColData->cid > pColDataFrom->cid) {
code = tBlockDataAddColData(pBlockData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
}
iColData++;
break;
} else if (pColData->cid == pColDataFrom->cid) {
iColData++;
break;
} else {
iColData++;
}
}
}
_exit:
return code;
}
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) {
int32_t code = 0;
ASSERT(pBlockData->suid == pBlockData1->suid);
ASSERT(pBlockData->uid == pBlockData1->uid);
ASSERT(pBlockData1->nRow > 0);
ASSERT(pBlockData2->nRow > 0);
tBlockDataClear(pBlockData);
TSDBROW row1 = tsdbRowFromBlockData(pBlockData1, 0);
TSDBROW row2 = tsdbRowFromBlockData(pBlockData2, 0);
TSDBROW *pRow1 = &row1;
TSDBROW *pRow2 = &row2;
while (pRow1 && pRow2) {
int32_t c = tsdbRowCmprFn(pRow1, pRow2);
if (c < 0) {
code = tBlockDataAppendRow(pBlockData, pRow1, NULL,
pBlockData1->uid ? pBlockData1->uid : pBlockData1->aUid[pRow1->iRow]);
if (code) goto _exit;
pRow1->iRow++;
if (pRow1->iRow < pBlockData1->nRow) {
*pRow1 = tsdbRowFromBlockData(pBlockData1, pRow1->iRow);
} else {
pRow1 = NULL;
}
} else if (c > 0) {
code = tBlockDataAppendRow(pBlockData, pRow2, NULL,
pBlockData2->uid ? pBlockData2->uid : pBlockData2->aUid[pRow2->iRow]);
if (code) goto _exit;
pRow2->iRow++;
if (pRow2->iRow < pBlockData2->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockData2, pRow2->iRow);
} else {
pRow2 = NULL;
}
} else {
ASSERT(0);
}
}
while (pRow1) {
code = tBlockDataAppendRow(pBlockData, pRow1, NULL,
pBlockData1->uid ? pBlockData1->uid : pBlockData1->aUid[pRow1->iRow]);
if (code) goto _exit;
pRow1->iRow++;
if (pRow1->iRow < pBlockData1->nRow) {
*pRow1 = tsdbRowFromBlockData(pBlockData1, pRow1->iRow);
} else {
pRow1 = NULL;
}
}
while (pRow2) {
code = tBlockDataAppendRow(pBlockData, pRow2, NULL,
pBlockData2->uid ? pBlockData2->uid : pBlockData2->aUid[pRow2->iRow]);
if (code) goto _exit;
pRow2->iRow++;
if (pRow2->iRow < pBlockData2->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockData2, pRow2->iRow);
} else {
pRow2 = NULL;
}
}
_exit:
return code;
}
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx) {
ASSERT(idx >= 0 && idx < pBlockData->nColData);
return (SColData *)taosArrayGet(pBlockData->aColData, idx);
}
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) { void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) {
ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID);
int32_t lidx = 0; int32_t lidx = 0;
int32_t ridx = pBlockData->nColData - 1; int32_t ridx = pBlockData->nColData - 1;
while (lidx <= ridx) { while (lidx <= ridx) {
int32_t midx = (lidx + ridx) / 2; int32_t midx = (lidx + ridx) >> 2;
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx);
int32_t c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1); int32_t c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1);
...@@ -1383,16 +1260,26 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin ...@@ -1383,16 +1260,26 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
// loop to decode each column data // loop to decode each column data
if (hdr.szBlkCol == 0) goto _exit; if (hdr.szBlkCol == 0) goto _exit;
int32_t nColData = 0;
int32_t nt = 0; int32_t nt = 0;
while (nt < hdr.szBlkCol) { while (nt < hdr.szBlkCol) {
SBlockCol blockCol = {0}; SBlockCol blockCol = {0};
nt += tGetBlockCol(pIn + n + nt, &blockCol); nt += tGetBlockCol(pIn + n + nt, &blockCol);
ASSERT(nt <= hdr.szBlkCol); ++nColData;
}
ASSERT(nt == hdr.szBlkCol);
SColData *pColData; code = tBlockDataAdjustColData(pBlockData, nColData);
code = tBlockDataAddColData(pBlockData, &pColData);
if (code) goto _exit; if (code) goto _exit;
nt = 0;
int32_t iColData = 0;
while (nt < hdr.szBlkCol) {
SBlockCol blockCol = {0};
nt += tGetBlockCol(pIn + n + nt, &blockCol);
SColData *pColData = &pBlockData->aColData[iColData++];
tColDataInit(pColData, blockCol.cid, blockCol.type, blockCol.smaOn); tColDataInit(pColData, blockCol.cid, blockCol.type, blockCol.smaOn);
if (blockCol.flag == HAS_NULL) { if (blockCol.flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册