提交 65bdd340 编写于 作者: H Hongze Cheng

feat: tag refact

上级 675fe706
...@@ -59,12 +59,12 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, u ...@@ -59,12 +59,12 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, u
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow); int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STag // STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag); int32_t tTagNew(STagVal *pTagVals, int16_t nTag, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag); void tTagFree(STag *pTag);
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag); void tTagGet(STag *pTag, STagVal *pTagVal);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(STag *pTag, STagVal **ppTagVals, int16_t *nTag);
// STRUCT ================= // STRUCT =================
struct STColumn { struct STColumn {
...@@ -118,7 +118,10 @@ struct SColVal { ...@@ -118,7 +118,10 @@ struct SColVal {
}; };
struct STagVal { struct STagVal {
int16_t cid; union {
int16_t cid;
char *pKey;
};
int8_t type; int8_t type;
uint32_t nData; uint32_t nData;
uint8_t *pData; uint8_t *pData;
......
...@@ -642,6 +642,11 @@ static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* n ...@@ -642,6 +642,11 @@ static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* n
return n; return n;
} }
static FORCE_INLINE int32_t tPutCStr(uint8_t* p, char* pData) {
return tPutBinary(p, (uint8_t*)pData, strlen(pData) + 1);
}
static FORCE_INLINE int32_t tGetCStr(uint8_t* p, char** ppData) { return tGetBinary(p, (uint8_t**)ppData, NULL); }
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -31,16 +31,13 @@ typedef struct { ...@@ -31,16 +31,13 @@ typedef struct {
} STSKVRow; } STSKVRow;
#pragma pack(pop) #pragma pack(pop)
typedef struct STagIdx {
int16_t cid;
uint16_t offset;
} STagIdx;
#pragma pack(push, 1) #pragma pack(push, 1)
struct STag { struct STag {
uint16_t len; int8_t isJson;
uint16_t nTag; int16_t len;
STagIdx idx[]; int16_t nTag;
int32_t ver;
int16_t idx[];
}; };
#pragma pack(pop) #pragma pack(pop)
...@@ -521,123 +518,149 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) { ...@@ -521,123 +518,149 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
return 0; return 0;
} }
static FORCE_INLINE int tTagIdxCmprFn(const void *p1, const void *p2) { static int tTagValCmprFn(const void *p1, const void *p2) {
STagIdx *pTagIdx1 = (STagIdx *)p1; if (((STagVal *)p1)->cid < ((STagVal *)p2)->cid) {
STagIdx *pTagIdx2 = (STagIdx *)p2;
if (pTagIdx1->cid < pTagIdx1->cid) {
return -1; return -1;
} else if (pTagIdx1->cid > pTagIdx1->cid) { } else if (((STagVal *)p1)->cid > ((STagVal *)p2)->cid) {
return 1; return 1;
} }
ASSERT(0);
return 0; return 0;
} }
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag) { static int tTagValJsonCmprFn(const void *p1, const void *p2) {
STagVal *pTagVal; return strcmp(((STagVal *)p1)[0].pKey, ((STagVal *)p2)[0].pKey);
uint8_t *p; }
int32_t n; static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
uint16_t tsize = sizeof(STag) + sizeof(STagIdx) * nTag; int32_t n = 0;
for (int16_t iTag = 0; iTag < nTag; iTag++) { // key
pTagVal = &pTagVals[iTag]; if (isJson) {
n += tPutCStr(p ? p + n : p, pTagVal->pKey);
} else {
n += tPutI16v(p ? p + n : p, pTagVal->cid);
}
if (IS_VAR_DATA_TYPE(pTagVal->type)) { // type
tsize += tPutBinary(NULL, pTagVal->pData, pTagVal->nData); n += tPutI8(p ? p + n : p, pTagVal->type);
} else {
ASSERT(pTagVal->nData == TYPE_BYTES[pTagVal->type]); // value
tsize += pTagVal->nData; if (IS_VAR_DATA_TYPE(pTagVal->type)) {
} n += tPutBinary(p ? p + n : p, pTagVal->pData, pTagVal->nData);
} else {
ASSERT(pTagVal->nData == TYPE_BYTES[pTagVal->type]);
if (p) memcpy(p + n, pTagVal->pData, pTagVal->nData);
n += pTagVal->nData;
} }
(*ppTag) = (STag *)taosMemoryMalloc(tsize); return n;
if (*ppTag == NULL) { }
terrno = TSDB_CODE_OUT_OF_MEMORY; static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
return -1; int32_t n = 0;
// key
if (isJson) {
n += tGetCStr(p + n, &pTagVal->pKey);
} else {
n += tGetI16v(p + n, &pTagVal->cid);
} }
p = (uint8_t *)&((*ppTag)->idx[nTag]); // type
n = 0; n += tGetI8(p + n, &pTagVal->type);
(*ppTag)->len = tsize; // value
(*ppTag)->nTag = nTag; if (IS_VAR_DATA_TYPE(pTagVal->type)) {
n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData);
} else {
pTagVal->pData = p + n;
pTagVal->nData = TYPE_BYTES[pTagVal->type];
n += pTagVal->nData;
}
return n;
}
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, int32_t version, int8_t isJson, STag **ppTag) {
int32_t code = 0;
uint8_t *p = NULL;
int16_t n = 0;
int32_t szTag = sizeof(STag) + sizeof(int16_t) * nTag;
// sort
if (isJson) {
qsort(pTagVals, nTag, sizeof(STagVal), tTagValJsonCmprFn);
} else {
qsort(pTagVals, nTag, sizeof(STagVal), tTagValCmprFn);
}
// get size
for (int16_t iTag = 0; iTag < nTag; iTag++) { for (int16_t iTag = 0; iTag < nTag; iTag++) {
pTagVal = &pTagVals[iTag]; szTag += tPutTagVal(NULL, &pTagVals[iTag], isJson);
}
(*ppTag)->idx[iTag].cid = pTagVal->cid; // TODO
(*ppTag)->idx[iTag].offset = n; // if (szTag >= 16 * 1024) {
// code = TSDB_CODE_IVLD_TAG;
// goto _err;
// }
// build tag
(*ppTag) = (STag *)taosMemoryMalloc(szTag);
if ((*ppTag) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
(*ppTag)->isJson = isJson ? 1 : 0;
(*ppTag)->len = szTag;
(*ppTag)->nTag = nTag;
(*ppTag)->ver = version;
if (IS_VAR_DATA_TYPE(pTagVal->type)) { p = (uint8_t *)&(*ppTag)->idx[nTag];
n += tPutBinary(p + n, pTagVal->pData, pTagVal->nData); n = 0;
} else { for (int16_t iTag = 0; iTag < nTag; iTag++) {
memcpy(p + n, pTagVal->pData, pTagVal->nData); (*ppTag)->idx[iTag] = n;
n += pTagVal->nData; n += tPutTagVal(p + n, &pTagVals[iTag], isJson);
}
} }
qsort((*ppTag)->idx, (*ppTag)->nTag, sizeof(STagIdx), tTagIdxCmprFn); return code;
return 0;
_err:
return code;
} }
void tTagFree(STag *pTag) { void tTagFree(STag *pTag) {
if (pTag) taosMemoryFree(pTag); if (pTag) taosMemoryFree(pTag);
} }
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag) { void tTagGet(STag *pTag, STagVal *pTagVal) {
STagVal *pTagVals; int16_t lidx = 0;
int16_t nTags = 0; int16_t ridx = pTag->nTag - 1;
SSchema *pColumn; int16_t midx;
uint8_t *p; uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag];
uint32_t n; STagVal tv;
int c;
pTagVals = (STagVal *)taosMemoryMalloc(sizeof(*pTagVals) * nCols);
if (pTagVals == NULL) { pTagVal->type = TSDB_DATA_TYPE_NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY; pTagVal->pData = NULL;
return -1; pTagVal->nData = 0;
} while (lidx <= ridx) {
midx = (lidx + ridx) / 2;
for (int32_t i = 0; i < nCols; i++) {
pColumn = &pSchema[i]; tGetTagVal(p + pTag->idx[midx], &tv, pTag->isJson);
if (pTag->isJson) {
if (i == iCol) { c = tTagValJsonCmprFn(pTagVal, &tv);
p = pData;
n = nData;
} else { } else {
tTagGet(pTag, pColumn->colId, pColumn->type, &p, &n); c = tTagValCmprFn(pTagVal, &tv);
} }
if (p == NULL) continue; if (c < 0) {
ridx = midx - 1;
ASSERT(IS_VAR_DATA_TYPE(pColumn->type) || n == pColumn->bytes); } else if (c > 0) {
lidx = midx + 1;
pTagVals[nTags].cid = pColumn->colId;
pTagVals[nTags].type = pColumn->type;
pTagVals[nTags].nData = n;
pTagVals[nTags].pData = p;
nTags++;
}
// create new tag
if (tTagNew(pTagVals, nTags, ppTag) < 0) {
taosMemoryFree(pTagVals);
return -1;
}
taosMemoryFree(pTagVals);
return 0;
}
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData) {
STagIdx *pTagIdx = bsearch(&((STagIdx){.cid = cid}), pTag->idx, pTag->nTag, sizeof(STagIdx), tTagIdxCmprFn);
if (pTagIdx == NULL) {
*ppData = NULL;
*nData = 0;
} else {
uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag] + pTagIdx->offset;
if (IS_VAR_DATA_TYPE(type)) {
tGetBinary(p, ppData, nData);
} else { } else {
*ppData = p; pTagVal->type = tv.type;
*nData = TYPE_BYTES[type]; pTagVal->nData = tv.nData;
pTagVal->pData = tv.pData;
break;
} }
} }
} }
...@@ -648,6 +671,27 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag) { ...@@ -648,6 +671,27 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag) {
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag) { return tDecodeBinary(pDecoder, (uint8_t **)ppTag, NULL); } int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag) { return tDecodeBinary(pDecoder, (uint8_t **)ppTag, NULL); }
int32_t tTagToValArray(STag *pTag, STagVal **ppTagVals, int16_t *nTag) {
int32_t code = 0;
uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag];
*nTag = pTag->nTag;
(*ppTagVals) = (STagVal *)taosMemoryMalloc(sizeof(STagVal) * pTag->nTag);
if (*ppTagVals == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
for (int16_t iTag = 0; iTag < pTag->nTag; iTag++) {
tGetTagVal(p + pTag->idx[iTag], &(*ppTagVals)[iTag], pTag->isJson);
}
return code;
_err:
return code;
}
#if 1 // =================================================================================================================== #if 1 // ===================================================================================================================
static void dataColSetNEleNull(SDataCol *pCol, int nEle); static void dataColSetNEleNull(SDataCol *pCol, int nEle);
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册