提交 793b49db 编写于 作者: H Hongze Cheng

feat: data format

上级 75bd81af
......@@ -33,9 +33,9 @@ typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder;
typedef struct SKVIdx SKVIdx;
// STSchema
// STSRow2
#define TSROW_SVER(r) (((r)->flags & TSROW_KV_ROW) ? -1 : (r)->sver)
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow);
int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow);
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t cid, const uint8_t **ppData, uint32_t *nData,
......@@ -70,9 +70,13 @@ struct STSchema {
STColumn columns[];
};
#define TSROW_HAS_NONE ((uint8_t)0x1)
#define TSROW_HAS_NULL ((uint8_t)0x2U)
#define TSROW_HAS_VAL ((uint8_t)0x4U)
#define TSROW_KV_ROW ((uint8_t)0x10U)
struct STSRow2 {
TSKEY ts;
uint32_t flags;
TSKEY ts;
uint8_t flags;
union {
int32_t sver;
int32_t ncols;
......
......@@ -275,9 +275,8 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1)
#define COL_IDX_ON ((int8_t)0x2)
#define COL_SET_VAL ((int8_t)0x10)
#define COL_SET_NONE ((int8_t)0x20)
#define COL_SET_NULL ((int8_t)0x40)
#define COL_SET_NULL ((int8_t)0x10)
#define COL_SET_VAL ((int8_t)0x20)
typedef struct SSchema {
int8_t type;
int8_t flags;
......@@ -286,7 +285,8 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
#define COL_IS_SET(s) ((s)->flags & (COL_SET_VAL | COL_SET_NONE | COL_SET_NULL) != 0)
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
......
......@@ -19,38 +19,66 @@
#include "tdatablock.h"
#include "tlog.h"
#define TD_HAS_NONE 0x1U
#define TD_HAS_NULL 0x2U
#define TD_HAS_VAL 0x4U
#define TD_KV_ROW 0x10U
struct SKVIdx {
int32_t cid;
int32_t offset;
};
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
// STSRow2
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow) {
if (tEncodeI64(pEncoder, pRow->ts) < 0) return -1;
if (tEncodeU32v(pEncoder, pRow->flags) < 0) return -1;
if (pRow->flags & TD_KV_ROW) {
if (tEncodeI32v(pEncoder, pRow->ncols) < 0) return -1;
} else {
if (tEncodeI32v(pEncoder, pRow->sver) < 0) return -1;
if (tEncodeU8(pEncoder, pRow->flags) < 0) return -1;
ASSERT(pRow->flags & 0xf != 0);
switch (pRow->flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
ASSERT(TSROW_IS_KV_ROW(pRow) && pRow->nData == 0 && pRow->pData == NULL);
return 0;
case TSROW_HAS_VAL:
ASSERT(!TSROW_IS_KV_ROW(pRow));
default:
ASSERT(pRow->nData && pRow->pData);
if (TSROW_IS_KV_ROW(pRow)) {
if (tEncodeI32v(pEncoder, pRow->ncols) < 0) return -1;
} else {
if (tEncodeI32v(pEncoder, pRow->sver) < 0) return -1;
}
if (tEncodeBinary(pEncoder, pRow->pData, pRow->nData)) return -1;
break;
}
if (tEncodeBinary(pEncoder, pRow->pData, pRow->nData) < 0) return -1;
return 0;
}
int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow) {
if (tDecodeI64(pDecoder, &pRow->ts) < 0) return -1;
if (tDecodeU32v(pDecoder, &pRow->flags) < 0) return -1;
if (pRow->flags & TD_KV_ROW) {
if (tDecodeI32v(pDecoder, &pRow->ncols) < 0) return -1;
} else {
if (tDecodeI32v(pDecoder, &pRow->sver) < 0) return -1;
if (tDecodeU8(pDecoder, &pRow->flags) < 0) return -1;
ASSERT(pRow->flags & 0xf != 0);
switch (pRow->flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
ASSERT(TSROW_IS_KV_ROW(pRow));
pRow->nData = 0;
pRow->pData = NULL;
return 0;
case TSROW_HAS_VAL:
ASSERT(!TSROW_IS_KV_ROW(pRow));
default:
if (TSROW_IS_KV_ROW(pRow)) {
if (tDecodeI32v(pDecoder, &pRow->ncols) < 0) return -1;
} else {
if (tDecodeI32v(pDecoder, &pRow->sver) < 0) return -1;
}
if (tDecodeBinary(pDecoder, &pRow->pData, &pRow->nData)) return -1;
break;
}
if (tDecodeBinary(pDecoder, &pRow->pData, &pRow->nData) < 0) return -1;
return 0;
}
......@@ -66,17 +94,17 @@ int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t cid, const ui
*nData = 0;
switch (tflags) {
case TD_HAS_NONE:
case TSROW_HAS_NONE:
*flags = -1;
break;
case TD_HAS_NULL:
case TSROW_HAS_NULL:
*flags = 1;
break;
case TD_HAS_VAL:
case TSROW_HAS_VAL:
*flags = 0;
// find the row
break;
case TD_HAS_NULL | TD_HAS_NONE:
case TSROW_HAS_NULL | TSROW_HAS_NONE:
// read bit map (todo)
if (0) {
*flags = 1;
......@@ -84,11 +112,11 @@ int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t cid, const ui
*flags = -1;
}
break;
case TD_HAS_VAL | TD_HAS_NONE:
case TD_HAS_VAL | TD_HAS_NULL:
case TSROW_HAS_VAL | TSROW_HAS_NONE:
case TSROW_HAS_VAL | TSROW_HAS_NULL:
// read bitmap (todo)
if (0) {
if (tflags & TD_HAS_NONE) {
if (tflags & TSROW_HAS_NONE) {
*flags = -1;
} else {
*flags = 1;
......@@ -97,7 +125,7 @@ int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t cid, const ui
// get value (todo)
}
break;
case TD_HAS_VAL | TD_HAS_NULL | TD_HAS_NONE:
case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
break;
default:
return -1;
......@@ -199,18 +227,15 @@ void tTSRowBuilderReset(STSRowBuilder *pBuilder) {
}
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pData, uint32_t nData) {
int32_t iCol;
uint8_t *p;
// search column (todo: make here faster)
// search column (TODO: bsearch with interp)
if (pBuilder->pTColumn->colId < cid) {
iCol = (pBuilder->pTColumn - pBuilder->pTSchema->columns) / sizeof(STColumn) + 1;
int32_t iCol = (pBuilder->pTColumn - pBuilder->pTSchema->columns) / sizeof(STColumn) + 1;
for (; iCol < pBuilder->pTSchema->numOfCols; iCol++) {
pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol];
if (pBuilder->pTColumn->colId == cid) break;
}
} else if (pBuilder->pTColumn->colId > cid) {
iCol = (pBuilder->pTColumn - pBuilder->pTSchema->columns) / sizeof(STColumn) - 1;
int32_t iCol = (pBuilder->pTColumn - pBuilder->pTSchema->columns) / sizeof(STColumn) - 1;
for (; iCol >= 0; iCol--) {
pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol];
if (pBuilder->pTColumn->colId == cid) break;
......@@ -218,19 +243,20 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pD
}
// check
if (pBuilder->pTColumn->colId != cid || COL_IS_SET(pBuilder->pTColumn)) {
if (pBuilder->pTColumn->colId != cid || COL_IS_SET(pBuilder->pTColumn->flags)) {
return -1;
}
// set value
uint8_t *p;
if (cid == 0) {
ASSERT(pData && nData == sizeof(TSKEY));
pBuilder->row.ts = *(TSKEY *)pData;
pBuilder->pTColumn->flags |= COL_SET_VAL;
} else {
if (pData) {
pBuilder->row.flags |= TD_HAS_VAL;
if (pData) { // set val
pBuilder->row.flags |= TSROW_HAS_VAL;
pBuilder->pTColumn->flags |= COL_SET_VAL;
// set tuple data
......@@ -250,20 +276,20 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pD
((SKVIdx *)p)->cid = cid;
((SKVIdx *)p)->offset = pBuilder->kvVLen;
p = pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->pTSchema->numOfCols + pBuilder->kvVLen;
p = pBuilder->pKVBuf + sizeof(SKVIdx) * (pBuilder->pTSchema->numOfCols - 1) + pBuilder->kvVLen;
if (IS_VAR_DATA_TYPE(pBuilder->pTColumn->type)) {
pBuilder->kvVLen += tPutBinary(p, pData, nData);
} else {
memcpy(p, pData, nData);
pBuilder->kvVLen += nData;
}
} else {
pBuilder->row.flags |= TD_HAS_NULL;
} else { // set NULL
pBuilder->row.flags |= TSROW_HAS_NULL;
pBuilder->pTColumn->flags |= COL_SET_NULL;
p = pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols;
((SKVIdx *)p)->cid = cid;
((SKVIdx *)p)->offset = -1;
((SKVIdx *)p)->offset = -1; // for TSROW_KV_ROW, offset -1 means NULL
}
pBuilder->nCols++;
......@@ -272,68 +298,76 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pD
return 0;
}
static FORCE_INLINE int tSKVIdxCmprFn(const void *p1, const void *p2) {
SKVIdx *pKVIdx1 = (SKVIdx *)p1;
SKVIdx *pKVIdx2 = (SKVIdx *)p2;
if (pKVIdx1->cid > pKVIdx2->cid) {
return 1;
} else if (pKVIdx1->cid < pKVIdx2->cid) {
return -1;
}
return 0;
}
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
int32_t tpDataLen, kvDataLen;
uint32_t flags;
// error not set ts
if (!COL_IS_SET(pBuilder->pTSchema->columns)) {
if (!COL_IS_SET(pBuilder->pTSchema->columns->flags)) {
return -1;
}
ASSERT(pBuilder->nCols < pBuilder->pTSchema->numOfCols);
if (pBuilder->nCols < pBuilder->pTSchema->numOfCols - 1) {
pBuilder->row.flags |= TD_HAS_NONE;
pBuilder->row.flags |= TSROW_HAS_NONE;
}
flags = pBuilder->row.flags & 0xf;
switch (flags) {
case TD_HAS_NONE:
case TD_HAS_NULL:
pBuilder->row.sver = pBuilder->pTSchema->version;
ASSERT(pBuilder->row.flags & 0xf != 0);
*(ppRow) = &pBuilder->row;
switch (pBuilder->row.flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
pBuilder->row.flags |= TSROW_KV_ROW;
pBuilder->row.nData = 0;
pBuilder->row.pData = NULL;
return 0;
case TSROW_HAS_NULL | TSROW_HAS_NONE:
tpDataLen = (pBuilder->pTSchema->numOfCols - 1) / 8;
break;
case TD_HAS_VAL:
pBuilder->row.sver = pBuilder->pTSchema->version;
pBuilder->row.nData = pBuilder->pTSchema->flen + pBuilder->tpVLen;
pBuilder->row.pData = pBuilder->pTPBuf;
case TSROW_HAS_VAL:
tpDataLen = pBuilder->pTSchema->flen + pBuilder->tpVLen;
break;
case TD_HAS_NULL | TD_HAS_NONE:
pBuilder->row.sver = pBuilder->pTSchema->version;
// set bitmap (todo)
pBuilder->row.nData = ((pBuilder->pTSchema->numOfCols - 1) / 8) + 1;
pBuilder->row.pData = pBuilder->pBitBuf;
case TSROW_HAS_VAL | TSROW_HAS_NONE:
case TSROW_HAS_VAL | TSROW_HAS_NULL:
tpDataLen = pBuilder->pTSchema->flen + pBuilder->tpVLen + (pBuilder->pTSchema->numOfCols - 1) / 8;
break;
case TD_HAS_VAL | TD_HAS_NONE:
case TD_HAS_VAL | TD_HAS_NULL:
case TD_HAS_VAL | TD_HAS_NULL | TD_HAS_NONE:
if (flags == TD_HAS_VAL | TD_HAS_NULL | TD_HAS_NONE) {
tpDataLen = ((pBuilder->pTSchema->numOfCols - 1) / 4) + 1 + pBuilder->pTSchema->flen + pBuilder->tpVLen;
} else {
tpDataLen = ((pBuilder->pTSchema->numOfCols - 1) / 8) + 1 + pBuilder->pTSchema->flen + pBuilder->tpVLen;
}
kvDataLen = sizeof(SKVIdx) * pBuilder->nCols + pBuilder->kvVLen;
if (kvDataLen < tpDataLen) {
pBuilder->row.flags |= TD_KV_ROW;
pBuilder->row.ncols = pBuilder->nCols;
pBuilder->row.nData = kvDataLen;
pBuilder->row.pData = pBuilder->pKVBuf;
// memmove(); todo
// qsort
} else {
pBuilder->row.sver = pBuilder->pTSchema->numOfCols;
// set bitmap etc (todo)
pBuilder->row.nData = tpDataLen;
pBuilder->row.pData = pBuilder->pTPBuf;
}
case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
tpDataLen = pBuilder->pTSchema->flen + pBuilder->tpVLen + (pBuilder->pTSchema->numOfCols - 1) / 4;
break;
default:
ASSERT(0);
return -1;
// decide chose tuple or kv
kvDataLen = sizeof(SKVIdx) * pBuilder->nCols + pBuilder->kvVLen;
break;
}
if (kvDataLen < tpDataLen) {
pBuilder->row.flags |= TSROW_KV_ROW;
pBuilder->row.ncols = pBuilder->nCols;
pBuilder->row.nData = kvDataLen;
pBuilder->row.pData = pBuilder->pKVBuf;
qsort(pBuilder->pKVBuf, pBuilder->nCols, sizeof(SKVIdx), tSKVIdxCmprFn);
if (pBuilder->nCols < pBuilder->pTSchema->numOfCols - 1) {
memmove(pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols,
pBuilder->pKVBuf + sizeof(SKVIdx) * (pBuilder->pTSchema->numOfCols - 1), pBuilder->kvVLen);
}
} else {
pBuilder->row.sver = pBuilder->pTSchema->version;
pBuilder->row.nData = tpDataLen;
pBuilder->row.pData
}
*ppRow = &pBuilder->row;
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册