提交 b20d7d56 编写于 作者: H Hongze Cheng

more work

上级 23c838c9
......@@ -55,9 +55,9 @@ int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type);
// STSRow2
#define COL_VAL_NONE(CID) ((SColVal){.cid = (CID), .isNone = 1})
#define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1})
#define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)})
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow);
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
......
......@@ -633,15 +633,15 @@ void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal
}
_return_none:
*pColVal = COL_VAL_NONE(pTColumn->colId);
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return;
_return_null:
*pColVal = COL_VAL_NULL(pTColumn->colId);
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return;
_return_value:
*pColVal = COL_VAL_VALUE(pTColumn->colId, value);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
return;
}
......
......@@ -114,7 +114,7 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph);
void tColDataReset(SColData *pColData, int16_t cid, int8_t type);
void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
void tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataCmprFn(const void *p1, const void *p2);
// SBlockData
int32_t tBlockDataInit(SBlockData *pBlockData);
......@@ -365,10 +365,12 @@ struct SAggrBlkCol {
struct SColData {
int16_t cid;
int8_t type;
uint8_t flags;
int8_t offsetValid;
int32_t nVal;
uint8_t flag;
uint8_t *pBitMap;
int32_t *pOfst;
uint32_t nData;
int32_t *aOffset;
int32_t nData;
uint8_t *pData;
};
......
......@@ -925,21 +925,21 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aColDataP); iCol++) {
SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iCol);
ASSERT(pColData->flags);
ASSERT(pColData->flag);
if (pColData->flags == HAS_NONE) continue;
if (pColData->flag == HAS_NONE) continue;
bCol.cid = pColData->cid;
bCol.type = pColData->type;
bCol.flag = pColData->flags;
bCol.flag = pColData->flag;
if (pColData->flags != HAS_NULL) {
if (pColData->flag != HAS_NULL) {
cksm = 0;
bCol.offset = offset;
bCol.size = 0;
// bitmap
if (pColData->flags != HAS_VALUE) {
if (pColData->flag != HAS_VALUE) {
// TODO: optimize bitmap part
n = taosWriteFile(pFileFD, pColData->pBitMap, BIT2_SIZE(pBlockData->nRow));
if (n < 0) {
......
......@@ -520,11 +520,11 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
// sizeof(SBlockCol), tColDataCmprFn, TD_EQ);
if (p) {
pColData = (SColData *)p;
ASSERT(pColData->flags);
ASSERT(pColData->flag);
if (pColData->flags == HAS_NONE) {
if (pColData->flag == HAS_NONE) {
goto _return_none;
} else if (pColData->flags == HAS_NULL) {
} else if (pColData->flag == HAS_NULL) {
goto _return_null;
} else {
uint8_t v = GET_BIT2(pColData->pBitMap, pRow->iRow);
......@@ -551,15 +551,15 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
}
_return_none:
*pColVal = COL_VAL_NONE(pTColumn->colId);
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return;
_return_null:
*pColVal = COL_VAL_NULL(pTColumn->colId);
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return;
_return_value:
*pColVal = COL_VAL_VALUE(pTColumn->colId, value);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
return;
}
......@@ -753,7 +753,9 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
void tColDataReset(SColData *pColData, int16_t cid, int8_t type) {
pColData->cid = cid;
pColData->type = type;
pColData->flags = 0;
pColData->nVal = 0;
pColData->flag = 0;
pColData->offsetValid = 0;
pColData->nData = 0;
}
......@@ -761,23 +763,121 @@ void tColDataClear(void *ph) {
SColData *pColData = (SColData *)ph;
tsdbFree(pColData->pBitMap);
tsdbFree((uint8_t *)pColData->pOfst);
tsdbFree((uint8_t *)pColData->aOffset);
tsdbFree(pColData->pData);
}
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
int32_t code = 0;
int64_t size;
SValue value = {0};
SValue *pValue = &value;
ASSERT(pColVal->cid == pColData->cid);
ASSERT(pColVal->type == pColData->type);
// realloc bitmap
size = BIT2_SIZE(pColData->nVal + 1);
code = tsdbRealloc(&pColData->pBitMap, size);
if (code) goto _exit;
// put value
if (pColVal->isNone) {
pColData->flag |= HAS_NONE;
SET_BIT2(pColData->pBitMap, pColData->nVal, 0);
if (IS_VAR_DATA_TYPE(pColData->type)) pValue = NULL;
} else if (pColVal->isNull) {
pColData->flag |= HAS_NULL;
SET_BIT2(pColData->pBitMap, pColData->nVal, 1);
if (IS_VAR_DATA_TYPE(pColData->type)) pValue = NULL;
} else {
pColData->flag |= HAS_VALUE;
SET_BIT2(pColData->pBitMap, pColData->nVal, 2);
pValue = &pColVal->value;
}
if (pValue) {
code = tsdbRealloc(&pColData->pData, pColData->nData + tPutValue(NULL, &pColVal->value, pColVal->type));
if (code) goto _exit;
pColData->nData += tPutValue(pColData->pData + pColData->nData, &pColVal->value, pColVal->type);
}
pColData->nVal++;
pColData->offsetValid = 0;
_exit:
return code;
}
void tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal) {
// TODO
static int32_t tColDataUpdateOffset(SColData *pColData) {
int32_t code = 0;
SValue value;
ASSERT(pColData->nVal > 0);
ASSERT(pColData->flag);
if (IS_VAR_DATA_TYPE(pColData->type) && (pColData->flag & HAS_VALUE)) {
code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * pColData->nVal);
if (code) goto _exit;
int32_t offset = 0;
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
uint8_t v = GET_BIT2(pColData->pBitMap, iVal);
if (v == 0 || v == 1) {
pColData->aOffset[iVal] = -1;
} else {
pColData->aOffset[iVal] = offset;
offset += tGetValue(pColData->pData + offset, &value, pColData->type);
}
}
ASSERT(offset == pColData->nData);
pColData->offsetValid = 1;
}
_exit:
return code;
}
int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
int32_t code = 0;
ASSERT(iVal < pColData->nVal);
ASSERT(pColData->flag);
if (pColData->flag == HAS_NONE) {
*pColVal = COL_VAL_NONE(pColData->cid, pColData->type);
goto _exit;
} else if (pColData->flag == HAS_NULL) {
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
goto _exit;
} else if (pColData->flag != HAS_VALUE) {
uint8_t v = GET_BIT2(pColData->pBitMap, iVal);
if (v == 0) {
*pColVal = COL_VAL_NONE(pColData->cid, pColData->type);
goto _exit;
} else if (v == 1) {
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
goto _exit;
}
}
// get value
SValue value;
if (IS_VAR_DATA_TYPE(pColData->type)) {
if (!pColData->offsetValid) {
code = tColDataUpdateOffset(pColData);
if (code) goto _exit;
}
tGetValue(pColData->pData + pColData->aOffset[iVal], &value, pColData->type);
} else {
tGetValue(pColData->pData + tDataTypes[pColData->type].bytes * iVal, &value, pColData->type);
}
*pColVal = COL_VAL_VALUE(pColData->cid, pColData->type, value);
_exit:
return code;
}
int32_t tColDataCmprFn(const void *p1, const void *p2) {
......@@ -839,7 +939,7 @@ static SColData *tBlockDataAddBlockCol(SBlockData *pBlockData, int32_t iColData,
// append NONE
for (int32_t i = 0; i < pBlockData->nRow; i++) {
if (tColDataAppendValue(pColData, &COL_VAL_NONE(cid)) != 0) return NULL;
if (tColDataAppendValue(pColData, &COL_VAL_NONE(cid, type)) != 0) return NULL;
}
return pColData;
......@@ -876,7 +976,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
pColVal = tRowIterNext(pIter);
} else if (pColVal->cid > pColData->cid) {
code = tColDataAppendValue(pColData, &(COL_VAL_NONE(pColData->cid)));
code = tColDataAppendValue(pColData, &(COL_VAL_NONE(pColData->cid, pColData->type)));
if (code) goto _err;
} else {
pColData = tBlockDataAddBlockCol(pBlockData, iColData, pColVal->cid, pColVal->type);
......@@ -897,7 +997,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
}
while (pColData) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid));
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err;
pColData = ((++iColData) < taosArrayGetSize(pBlockData->aColDataP))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册