提交 e9ffb086 编写于 作者: H Hongze Cheng

more data format

上级 cb3f868e
......@@ -34,8 +34,6 @@ typedef struct STSRowBuilder STSRowBuilder;
typedef struct SKVIdx SKVIdx;
// STSRow2
#define TSROW_SVER(r) (((r)->flags & TSROW_KV_ROW) ? -1 : (r)->sver)
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow);
int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow);
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t cid, const uint8_t **ppData, uint32_t *nData,
......@@ -75,12 +73,9 @@ struct STSchema {
#define TSROW_HAS_VAL ((uint8_t)0x4U)
#define TSROW_KV_ROW ((uint8_t)0x10U)
struct STSRow2 {
TSKEY ts;
uint8_t flags;
union {
int32_t sver;
int32_t ncols;
};
TSKEY ts;
uint8_t flags;
int32_t sver;
uint32_t nData;
const uint8_t *pData;
};
......@@ -94,7 +89,6 @@ struct STSRowBuilder {
int32_t szTPBuf;
uint8_t *pTPBuf;
int32_t iCol;
int32_t nCols;
int32_t vlenKV;
int32_t vlenTP;
STSRow2 row;
......
......@@ -24,31 +24,33 @@ struct SKVIdx {
int32_t offset;
};
#pragma pack(push, 1)
typedef struct {
int16_t nCols;
SKVIdx idx[];
} STSKVRow;
#pragma pack(pop)
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
#define SET_BIT1(p, i, v)
#define SET_BIT2(p, i, v)
#define SET_BIT1(p, i, v) ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8)))
#define SET_BIT2(p, i, v) ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4)))
// STSRow2
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow) {
if (tEncodeI64(pEncoder, pRow->ts) < 0) return -1;
if (tEncodeU8(pEncoder, pRow->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pRow->sver) < 0) return -1;
ASSERT(pRow->flags & 0xf != 0);
switch (pRow->flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
ASSERT(TSROW_IS_KV_ROW(pRow) && pRow->nData == 0 && pRow->pData == NULL);
return 0;
case TSROW_HAS_VAL:
ASSERT(!TSROW_IS_KV_ROW(pRow));
default:
ASSERT(pRow->nData && pRow->pData);
if (TSROW_IS_KV_ROW(pRow)) {
if (tEncodeI32v(pEncoder, pRow->ncols) < 0) return -1;
} else {
if (tEncodeI32v(pEncoder, pRow->sver) < 0) return -1;
}
if (tEncodeBinary(pEncoder, pRow->pData, pRow->nData)) return -1;
break;
}
......@@ -59,24 +61,19 @@ int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow) {
int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow) {
if (tDecodeI64(pDecoder, &pRow->ts) < 0) return -1;
if (tDecodeU8(pDecoder, &pRow->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pRow->sver) < 0) return -1;
ASSERT(pRow->flags & 0xf != 0);
switch (pRow->flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
ASSERT(TSROW_IS_KV_ROW(pRow));
pRow->nData = 0;
pRow->pData = NULL;
return 0;
case TSROW_HAS_VAL:
ASSERT(!TSROW_IS_KV_ROW(pRow));
default:
if (TSROW_IS_KV_ROW(pRow)) {
if (tDecodeI32v(pDecoder, &pRow->ncols) < 0) return -1;
} else {
if (tDecodeI32v(pDecoder, &pRow->sver) < 0) return -1;
}
if (tDecodeBinary(pDecoder, &pRow->pData, &pRow->nData)) return -1;
break;
}
......@@ -86,6 +83,7 @@ int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow) {
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t cid, const uint8_t **ppData, uint32_t *nData,
int8_t *flags) {
#if 0
if (cid == 0) {
*ppData = (uint8_t *)&pRow->ts;
*nData = sizeof(TSKEY);
......@@ -133,6 +131,7 @@ int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t cid, const ui
return -1;
}
}
#endif
return 0;
}
......@@ -177,24 +176,25 @@ void tTSchemaDestroy(STSchema *pTSchema) {
}
// STSRowBuilder
static int32_t tTSRowBitMapLen1(int32_t nCols) { return nCols / 8 + ((nCols % 8) == 0 ? 0 : 1); }
static int32_t tTSRowBitMapLen2(int32_t nCols) { return nCols / 4 + ((nCols % 4) == 0 ? 0 : 1); }
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols) {
if (tTSchemaCreate(sver, pSchema, nCols, &pBuilder->pTSchema) < 0) return -1;
pBuilder->szBitMap1 = tTSRowBitMapLen1(nCols - 1);
pBuilder->szBitMap2 = tTSRowBitMapLen2(nCols - 1);
pBuilder->szKVBuf = sizeof(SKVIdx) * (nCols - 1) + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
pBuilder->szBitMap1 = (nCols - 2) / 8 + 1;
pBuilder->szBitMap2 = (nCols - 2) / 4 + 1;
pBuilder->szKVBuf =
sizeof(STSKVRow) + sizeof(SKVIdx) * (nCols - 1) + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
pBuilder->szTPBuf = pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
pBuilder->pKVBuf = taosMemoryMalloc(pBuilder->szKVBuf);
if (pBuilder->pKVBuf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
tTSchemaDestroy(pBuilder->pTSchema);
return -1;
}
pBuilder->pTPBuf = taosMemoryMalloc(pBuilder->szTPBuf);
if (pBuilder->pTPBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pBuilder->pKVBuf);
tTSchemaDestroy(pBuilder->pTSchema);
return -1;
}
......@@ -202,16 +202,16 @@ int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchem
}
void tTSRowBuilderClear(STSRowBuilder *pBuilder) {
tTSchemaDestroy(pBuilder->pTSchema);
pBuilder->pTSchema = NULL;
if (pBuilder->pKVBuf) {
taosMemoryFree(pBuilder->pKVBuf);
pBuilder->pKVBuf = NULL;
}
if (pBuilder->pTPBuf) {
taosMemoryFree(pBuilder->pTPBuf);
pBuilder->pTPBuf = NULL;
}
if (pBuilder->pKVBuf) {
taosMemoryFree(pBuilder->pKVBuf);
pBuilder->pKVBuf = NULL;
}
tTSchemaDestroy(pBuilder->pTSchema);
pBuilder->pTSchema = NULL;
}
void tTSRowBuilderReset(STSRowBuilder *pBuilder) {
......@@ -221,7 +221,7 @@ void tTSRowBuilderReset(STSRowBuilder *pBuilder) {
}
pBuilder->iCol = 0;
pBuilder->nCols = 0;
((STSKVRow *)pBuilder->pKVBuf)->nCols = 0;
pBuilder->vlenKV = 0;
pBuilder->vlenTP = 0;
pBuilder->row.flags = 0;
......@@ -231,6 +231,7 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pD
STColumn *pTColumn = &pBuilder->pTSchema->columns[pBuilder->iCol];
uint8_t *p;
int32_t iCol;
STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
// use interp search (todo)
if (pTColumn->colId > cid) {
......@@ -265,11 +266,11 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pD
/* KV */
if (1) { // avoid KV at some threshold (todo)
p = pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols;
((SKVIdx *)p)->cid = cid;
((SKVIdx *)p)->offset = pBuilder->vlenKV;
pTSKVRow->idx[pTSKVRow->nCols].cid = cid;
pTSKVRow->idx[pTSKVRow->nCols].offset = pBuilder->vlenKV;
p = pBuilder->pKVBuf + sizeof(SKVIdx) * (pBuilder->pTSchema->numOfCols - 1) + pBuilder->vlenKV;
p = pBuilder->pKVBuf + sizeof(STSKVRow) + sizeof(SKVIdx) * (pBuilder->pTSchema->numOfCols - 1) +
pBuilder->vlenKV;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
ASSERT(nData <= pTColumn->bytes);
pBuilder->vlenKV += tPutBinary(p, pData, nData);
......@@ -286,7 +287,7 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pD
ASSERT(nData <= pTColumn->bytes);
*(int32_t *)p = pBuilder->vlenTP;
p = pBuilder->pTPBuf + pBuilder->szBitMap2 + pBuilder->pTSchema->flen;
p = pBuilder->pTPBuf + pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
pBuilder->vlenTP += tPutBinary(p, pData, nData);
} else {
ASSERT(nData == pTColumn->bytes);
......@@ -298,12 +299,11 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pD
pBuilder->row.flags |= TSROW_HAS_NULL;
pTColumn->flags |= COL_SET_NULL;
p = pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols;
((SKVIdx *)p)->cid = cid;
((SKVIdx *)p)->offset = -1;
pTSKVRow->idx[pTSKVRow->nCols].cid = cid;
pTSKVRow->idx[pTSKVRow->nCols].offset = -1;
}
pBuilder->nCols++;
pTSKVRow->nCols++;
}
return 0;
......@@ -319,17 +319,54 @@ static FORCE_INLINE int tSKVIdxCmprFn(const void *p1, const void *p2) {
}
return 0;
}
static void setBitMap(uint8_t *p, STSchema *pTSchema, uint8_t flags) {
int32_t bidx;
STColumn *pTColumn;
for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
pTColumn = &pTSchema->columns[iCol];
bidx = iCol - 1;
switch (flags) {
case TSROW_HAS_NULL | TSROW_HAS_NONE:
if (pTColumn->flags & COL_SET_NULL) {
SET_BIT1(p, bidx, (uint8_t)1);
} else {
SET_BIT1(p, bidx, (uint8_t)0);
}
break;
case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
if (pTColumn->flags & COL_SET_NULL) {
SET_BIT2(p, bidx, (uint8_t)1);
} else if (pTColumn->flags & COL_SET_NULL) {
SET_BIT2(p, bidx, (uint8_t)2);
} else {
SET_BIT2(p, bidx, (uint8_t)0);
}
break;
default:
if (pTColumn->flags & COL_SET_VAL) {
SET_BIT1(p, bidx, (uint8_t)1);
} else {
SET_BIT1(p, bidx, (uint8_t)0);
}
break;
}
}
}
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
int32_t nDataTP, nDataKV;
uint32_t flags;
int32_t nDataTP, nDataKV;
uint32_t flags;
STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
// error not set ts
if (!COL_IS_SET(pBuilder->pTSchema->columns->flags)) {
return -1;
}
ASSERT(pBuilder->nCols < pBuilder->pTSchema->numOfCols);
if (pBuilder->nCols < pBuilder->pTSchema->numOfCols - 1) {
ASSERT(pTSKVRow->nCols < pBuilder->pTSchema->numOfCols);
if (pTSKVRow->nCols < pBuilder->pTSchema->numOfCols - 1) {
pBuilder->row.flags |= TSROW_HAS_NONE;
}
......@@ -338,7 +375,6 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
switch (pBuilder->row.flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
pBuilder->row.flags |= TSROW_KV_ROW;
pBuilder->row.nData = 0;
pBuilder->row.pData = NULL;
return 0;
......@@ -359,58 +395,37 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
ASSERT(0);
}
nDataKV = sizeof(SKVIdx) * pBuilder->nCols + pBuilder->vlenKV;
ASSERT(pBuilder->row.flags & TSROW_KV_ROW == 0);
nDataKV = sizeof(STSKVRow) + sizeof(SKVIdx) * pTSKVRow->nCols + pBuilder->vlenKV;
pBuilder->row.sver = pBuilder->pTSchema->version;
if (nDataKV < nDataTP) {
// generate KV row
pBuilder->row.flags |= TSROW_KV_ROW;
pBuilder->row.ncols = pBuilder->nCols;
pBuilder->row.nData = nDataKV;
pBuilder->row.pData = pBuilder->pKVBuf;
qsort(pBuilder->pKVBuf, pBuilder->nCols, sizeof(SKVIdx), tSKVIdxCmprFn);
if (pBuilder->nCols < pBuilder->pTSchema->numOfCols - 1) {
memmove(pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols,
pBuilder->pKVBuf + sizeof(SKVIdx) * (pBuilder->pTSchema->numOfCols - 1), pBuilder->vlenKV);
qsort(pTSKVRow->idx, pTSKVRow->nCols, sizeof(SKVIdx), tSKVIdxCmprFn);
if (pTSKVRow->nCols < pBuilder->pTSchema->numOfCols - 1) {
memmove(&pTSKVRow->idx[pTSKVRow->nCols], &pTSKVRow->idx[pBuilder->pTSchema->numOfCols - 1], pBuilder->vlenKV);
}
} else {
// generate TUPLE row
uint8_t *p;
STColumn *pTColumn;
pBuilder->row.sver = pBuilder->pTSchema->version;
pBuilder->row.nData = nDataTP;
if (pBuilder->row.flags & 0xf == TSROW_HAS_VAL) {
// no bitmap
pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2;
} else if (pBuilder->row.flags & 0xf == TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE) {
// bitmap2
p = pBuilder->pTPBuf;
for (int32_t iCol = 1; iCol < pBuilder->pTSchema->numOfCols; iCol++) {
pTColumn = &pBuilder->pTSchema->columns[iCol];
if (pTColumn->flags & COL_SET_VAL) {
SET_BIT2(p, iCol - 1, 0x2);
} else if (pTColumn->flags & COL_SET_VAL) {
SET_BIT2(p, iCol - 1, 0x1);
}
}
uint8_t *p;
uint8_t flags = pBuilder->row.flags & 0xf;
pBuilder->row.pData = p;
if (flags == TSROW_HAS_VAL) {
pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2;
} else {
// bitmap1
p = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1;
for (int32_t iCol = 1; iCol < pBuilder->pTSchema->numOfCols; iCol++) {
pTColumn = &pBuilder->pTSchema->columns[iCol];
if (1) {
SET_BIT1(p, iCol - 1, 0x1);
}
if (flags == TSROW_HAS_VAL) {
p = pBuilder->pTPBuf;
} else {
p = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1;
}
setBitMap(p, pBuilder->pTSchema, flags);
pBuilder->row.pData = p;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册