未验证 提交 8be47d56 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13334 from taosdata/feat/tag_refact

feat:add new logic for new tag format
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "os.h" #include "os.h"
#include "talgo.h" #include "talgo.h"
#include "tarray.h"
#include "tencode.h" #include "tencode.h"
#include "ttypes.h" #include "ttypes.h"
#include "tutil.h" #include "tutil.h"
...@@ -59,12 +60,14 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, u ...@@ -59,12 +60,14 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, u
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow); int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STag // STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag); void tTagFree(STag *pTag);
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag); bool tTagGet(const STag *pTag, STagVal *pTagVal);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData); char* tTagValToData(const STagVal *pTagVal, bool isJson);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln);
// STRUCT ================= // STRUCT =================
struct STColumn { struct STColumn {
...@@ -117,12 +120,32 @@ struct SColVal { ...@@ -117,12 +120,32 @@ struct SColVal {
uint8_t *pData; uint8_t *pData;
}; };
#pragma pack(push, 1)
struct STagVal { struct STagVal {
int16_t cid; union {
int8_t type; int16_t cid;
uint32_t nData; char *pKey;
uint8_t *pData; };
int8_t type;
union {
int64_t i64;
struct {
uint32_t nData;
uint8_t *pData;
};
};
};
#define TD_TAG_JSON ((int8_t)0x40) // distinguish JSON string and JSON value with the highest bit
#define TD_TAG_LARGE ((int8_t)0x20)
struct STag {
int8_t flags;
int16_t len;
int16_t nTag;
int32_t ver;
int8_t idx[];
}; };
#pragma pack(pop)
#if 1 //================================================================================================================================================ #if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap. // Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
...@@ -366,109 +389,6 @@ SDataCols *tdFreeDataCols(SDataCols *pCols); ...@@ -366,109 +389,6 @@ SDataCols *tdFreeDataCols(SDataCols *pCols);
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update, int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update,
TDRowVerT maxVer); TDRowVerT maxVer);
// ----------------- K-V data row structure
/* |<-------------------------------------- len -------------------------------------------->|
* |<----- header ----->|<--------------------------- body -------------------------------->|
* +----------+----------+---------------------------------+---------------------------------+
* | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | ncols | cols index | data part |
* +----------+----------+---------------------------------+---------------------------------+
*/
typedef void *SKVRow;
typedef struct {
int16_t colId;
uint16_t offset;
} SColIdx;
#define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define kvRowLen(r) (*(uint16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r))
#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r))
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) taosMemoryFreeClear(r)
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
#define kvRowValLen(r) (kvRowLen(r) - TD_KV_ROW_HEAD_SIZE - sizeof(SColIdx) * kvRowNCols(r))
#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r)))
#define kvRowKey(r) tdGetKey(kvRowTKey(r))
#define kvRowKeys(r) POINTER_SHIFT(r, *(uint16_t *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(int16_t)))
#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r))
SKVRow tdKVRowDup(SKVRow row);
int32_t tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
int32_t tdEncodeKVRow(void **buf, SKVRow row);
void *tdDecodeKVRow(void *buf, SKVRow *row);
void tdSortKVRowByColIdx(SKVRow row);
static FORCE_INLINE int32_t comparTagId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE void *tdGetKVRowValOfCol(const SKVRow row, int16_t colId) {
void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
if (ret == NULL) return NULL;
return kvRowColVal(row, (SColIdx *)ret);
}
static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
return taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
}
// ----------------- K-V data row builder
typedef struct {
int16_t tCols;
int16_t nCols;
SColIdx *pColIdx;
uint16_t alloc;
uint16_t size;
void *buf;
} SKVRowBuilder;
int32_t tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t colId, const void *value, int32_t tlen) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
SColIdx *pColIdx = (SColIdx *)taosMemoryRealloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pColIdx == NULL) return -1;
pBuilder->pColIdx = pColIdx;
}
pBuilder->pColIdx[pBuilder->nCols].colId = colId;
pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size;
pBuilder->nCols++;
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2;
}
void *buf = taosMemoryRealloc(pBuilder->buf, pBuilder->alloc);
if (buf == NULL) return -1;
pBuilder->buf = buf;
}
memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
pBuilder->size += tlen;
return 0;
}
#endif #endif
#ifdef __cplusplus #ifdef __cplusplus
...@@ -476,3 +396,4 @@ static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t co ...@@ -476,3 +396,4 @@ static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t co
#endif #endif
#endif /*_TD_COMMON_DATA_FORMAT_H_*/ #endif /*_TD_COMMON_DATA_FORMAT_H_*/
...@@ -287,7 +287,7 @@ typedef struct SSchema { ...@@ -287,7 +287,7 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
} SSchema; } SSchema;
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0) #define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL))) #define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON) #define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
...@@ -1778,6 +1778,15 @@ typedef struct SVCreateTbReq { ...@@ -1778,6 +1778,15 @@ typedef struct SVCreateTbReq {
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq); int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq); int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
taosMemoryFreeClear(req->name);
if (req->type == TSDB_CHILD_TABLE) {
taosMemoryFreeClear(req->ctb.pTag);
} else if (req->type == TSDB_NORMAL_TABLE) {
taosMemoryFreeClear(req->ntb.schemaRow.pSchema);
}
}
typedef struct { typedef struct {
int32_t nReqs; int32_t nReqs;
union { union {
......
...@@ -461,133 +461,64 @@ static FORCE_INLINE void* tDecoderMalloc(SDecoder* pCoder, int32_t size) { ...@@ -461,133 +461,64 @@ static FORCE_INLINE void* tDecoderMalloc(SDecoder* pCoder, int32_t size) {
} }
// =========================================== // ===========================================
#define tPutV(p, v) \ #define tPutV(p, v) \
do { \ int32_t n = 0; \
int32_t n = 0; \ for (;;) { \
for (;;) { \ if (v <= 0x7f) { \
if (v <= 0x7f) { \ if (p) p[n] = v; \
if (p) p[n] = v; \ n++; \
n++; \ break; \
break; \ } \
} \ if (p) p[n] = (v & 0x7f) | 0x80; \
if (p) p[n] = (v & 0x7f) | 0x80; \ n++; \
n++; \ v >>= 7; \
v >>= 7; \ } \
} \ return n;
return n; \
} while (0)
#define tGetV(p, v) \ #define tGetV(p, v) \
do { \ int32_t n = 0; \
int32_t n = 0; \ if (v) *v = 0; \
if (v) *v = 0; \ for (;;) { \
for (;;) { \ if (p[n] <= 0x7f) { \
if (p[n] <= 0x7f) { \ if (v) (*v) |= (p[n] << (7 * n)); \
if (v) (*v) |= (p[n] << (7 * n)); \ n++; \
n++; \ break; \
break; \ } \
} \ if (v) (*v) |= ((p[n] & 0x7f) << (7 * n)); \
if (v) (*v) |= ((p[n] & 0x7f) << (7 * n)); \ n++; \
n++; \ } \
} \ return n;
return n; \
} while (0)
// PUT // PUT
static FORCE_INLINE int32_t tPutU8(uint8_t* p, uint8_t v) {
if (p) ((uint8_t*)p)[0] = v;
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tPutI8(uint8_t* p, int8_t v) { static FORCE_INLINE int32_t tPutI8(uint8_t* p, int8_t v) {
if (p) ((int8_t*)p)[0] = v; if (p) ((int8_t*)p)[0] = v;
return sizeof(int8_t); return sizeof(int8_t);
} }
static FORCE_INLINE int32_t tPutU16(uint8_t* p, uint16_t v) {
if (p) ((uint16_t*)p)[0] = v;
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tPutI16(uint8_t* p, int16_t v) {
if (p) ((int16_t*)p)[0] = v;
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tPutU32(uint8_t* p, uint32_t v) {
if (p) ((uint32_t*)p)[0] = v;
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tPutI32(uint8_t* p, int32_t v) {
if (p) ((int32_t*)p)[0] = v;
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tPutU64(uint8_t* p, uint64_t v) {
if (p) ((uint64_t*)p)[0] = v;
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) { static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) {
if (p) ((int64_t*)p)[0] = v; if (p) ((int64_t*)p)[0] = v;
return sizeof(int64_t); return sizeof(int64_t);
} }
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v); } static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v) }
static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); } static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); }
static FORCE_INLINE int32_t tPutU32v(uint8_t* p, uint32_t v) { tPutV(p, v); } static FORCE_INLINE int32_t tPutU32v(uint8_t* p, uint32_t v) { tPutV(p, v) }
static FORCE_INLINE int32_t tPutI32v(uint8_t* p, int32_t v) { return tPutU32v(p, ZIGZAGE(int32_t, v)); } static FORCE_INLINE int32_t tPutI32v(uint8_t* p, int32_t v) { return tPutU32v(p, ZIGZAGE(int32_t, v)); }
static FORCE_INLINE int32_t tPutU64v(uint8_t* p, uint64_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI64v(uint8_t* p, int64_t v) { return tPutU64v(p, ZIGZAGE(int64_t, v)); }
// GET
static FORCE_INLINE int32_t tGetU8(uint8_t* p, uint8_t* v) {
if (v) *v = ((uint8_t*)p)[0];
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tGetI8(uint8_t* p, int8_t* v) { static FORCE_INLINE int32_t tGetI8(uint8_t* p, int8_t* v) {
if (v) *v = ((int8_t*)p)[0]; if (v) *v = ((int8_t*)p)[0];
return sizeof(int8_t); return sizeof(int8_t);
} }
static FORCE_INLINE int32_t tGetU16(uint8_t* p, uint16_t* v) {
if (v) *v = ((uint16_t*)p)[0];
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tGetI16(uint8_t* p, int16_t* v) {
if (v) *v = ((int16_t*)p)[0];
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tGetU32(uint8_t* p, uint32_t* v) {
if (v) *v = ((uint32_t*)p)[0];
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tGetI32(uint8_t* p, int32_t* v) {
if (v) *v = ((int32_t*)p)[0];
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tGetU64(uint8_t* p, uint64_t* v) {
if (v) *v = ((uint64_t*)p)[0];
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tGetI64(uint8_t* p, int64_t* v) { static FORCE_INLINE int32_t tGetI64(uint8_t* p, int64_t* v) {
if (v) *v = ((int64_t*)p)[0]; if (v) *v = ((int64_t*)p)[0];
return sizeof(int64_t); return sizeof(int64_t);
} }
static FORCE_INLINE int32_t tGetU16v(uint8_t* p, uint16_t* v) { tGetV(p, v); } static FORCE_INLINE int32_t tGetU16v(uint8_t* p, uint16_t* v) { tGetV(p, v) }
static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) { static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) {
int32_t n; int32_t n;
...@@ -599,7 +530,7 @@ static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) { ...@@ -599,7 +530,7 @@ static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) {
return n; return n;
} }
static FORCE_INLINE int32_t tGetU32v(uint8_t* p, uint32_t* v) { tGetV(p, v); } static FORCE_INLINE int32_t tGetU32v(uint8_t* p, uint32_t* v) { tGetV(p, v) }
static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) { static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) {
int32_t n; int32_t n;
...@@ -611,18 +542,6 @@ static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) { ...@@ -611,18 +542,6 @@ static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) {
return n; return n;
} }
static FORCE_INLINE int32_t tGetU64v(uint8_t* p, uint64_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI64v(uint8_t* p, int64_t* v) {
int32_t n;
uint64_t tv;
n = tGetU64v(p, &tv);
if (v) *v = ZIGZAGD(int64_t, tv);
return n;
}
// ===================== // =====================
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) { static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) {
int n = 0; int n = 0;
...@@ -646,6 +565,11 @@ static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* n ...@@ -646,6 +565,11 @@ static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* n
return n; return n;
} }
static FORCE_INLINE int32_t tPutCStr(uint8_t* p, char* pData) {
return tPutBinary(p, (uint8_t*)pData, strlen(pData) + 1);
}
static FORCE_INLINE int32_t tGetCStr(uint8_t* p, char** ppData) { return tGetBinary(p, (uint8_t**)ppData, NULL); }
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1033,27 +1033,20 @@ static char* parseTagDatatoJson(void* p) { ...@@ -1033,27 +1033,20 @@ static char* parseTagDatatoJson(void* p) {
goto end; goto end;
} }
int16_t nCols = kvRowNCols(p); SArray* pTagVals = NULL;
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
goto end;
}
int16_t nCols = taosArrayGetSize(pTagVals);
char tagJsonKey[256] = {0}; char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) { for (int j = 0; j < nCols; ++j) {
SColIdx* pColIdx = kvRowColIdxAt(p, j); STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0) {
if (*val == TSDB_DATA_TYPE_NULL) {
string = taosMemoryCalloc(1, 8);
sprintf(string, "%s", TSDB_DATA_NULL_STR_L);
goto end;
}
continue;
}
// json key encode by binary // json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey)); memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val)); memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
// json value // json value
val += varDataTLen(val); char type = pTagVal->type;
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
if (type == TSDB_DATA_TYPE_NULL) { if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull(); cJSON* value = cJSON_CreateNull();
if (value == NULL) { if (value == NULL) {
...@@ -1062,11 +1055,11 @@ static char* parseTagDatatoJson(void* p) { ...@@ -1062,11 +1055,11 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_NCHAR) { } else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL; cJSON* value = NULL;
if (varDataLen(realData) > 0) { if (pTagVal->nData > 0) {
char* tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1); char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue); int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
if (length < 0) { if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val); tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, pTagVal->pData);
taosMemoryFree(tagJsonValue); taosMemoryFree(tagJsonValue);
goto end; goto end;
} }
...@@ -1075,7 +1068,7 @@ static char* parseTagDatatoJson(void* p) { ...@@ -1075,7 +1068,7 @@ static char* parseTagDatatoJson(void* p) {
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
} else if (varDataLen(realData) == 0) { } else if (pTagVal->nData == 0) {
value = cJSON_CreateString(""); value = cJSON_CreateString("");
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -1083,22 +1076,14 @@ static char* parseTagDatatoJson(void* p) { ...@@ -1083,22 +1076,14 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData); double jsonVd = *(double*)(&pTagVal->i64);
cJSON* value = cJSON_CreateNumber(jsonVd); cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_BOOL) { } else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData); char jsonVd = *(char*)(&pTagVal->i64);
cJSON* value = cJSON_CreateBool(jsonVd); cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL) { if (value == NULL) {
goto end; goto end;
...@@ -1163,7 +1148,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -1163,7 +1148,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (jsonInnerType == TSDB_DATA_TYPE_NULL) { if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) { } else if (jsonInnerType == TD_TAG_JSON) {
char* jsonString = parseTagDatatoJson(jsonInnerData); char* jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString); STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString); taosMemoryFree(jsonString);
...@@ -1182,10 +1167,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -1182,10 +1167,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
double jsonVd = *(double*)(jsonInnerData); double jsonVd = *(double*)(jsonInnerData);
sprintf(varDataVal(dst), "%.9lf", jsonVd); sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(jsonInnerData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) { } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false"); sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
......
...@@ -116,22 +116,23 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con ...@@ -116,22 +116,23 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t type = pColumnInfoData->info.type; int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = varDataTLen(pData); int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) { if (type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) { if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0; dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) { } else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES); dataLen = varDataTLen(pData + CHAR_BYTES) + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) { } else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = DOUBLE_BYTES; dataLen = DOUBLE_BYTES + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) { } else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES; dataLen = CHAR_BYTES + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_JSON) { } else if (*pData == TD_TAG_JSON) { // json string
dataLen = kvRowLen(pData + CHAR_BYTES); dataLen = ((STag*)(pData))->len;
} else { } else {
ASSERT(0); ASSERT(0);
} }
dataLen += CHAR_BYTES; }else {
dataLen = varDataTLen(pData);
} }
SVarColAttr* pAttr = &pColumnInfoData->varmeta; SVarColAttr* pAttr = &pColumnInfoData->varmeta;
...@@ -1634,6 +1635,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1634,6 +1635,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId) { const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if(!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// cal size // cal size
int32_t cap = sizeof(SSubmitReq); int32_t cap = sizeof(SSubmitReq);
...@@ -1655,18 +1661,33 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1655,18 +1661,33 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE; createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid; createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0); STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
STag* pTag = NULL;
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
} }
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t)); createTbReq.ctb.pTag = (uint8_t*)pTag;
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code; int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
taosMemoryFree(cname); tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
}
} }
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
...@@ -1709,22 +1730,42 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1709,22 +1730,42 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE; createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid; createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0}; STagVal tagVal = {.cid = 1,
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { .type = TSDB_DATA_TYPE_UBIGINT,
ASSERT(0); .pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
} }
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t)); createTbReq.ctb.pTag = (uint8_t*)pTag;
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code; int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL; if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen); tEncoderInit(&encoder, blockData, schemaLen);
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL; code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
} }
blkHead->schemaLen = htonl(schemaLen); blkHead->schemaLen = htonl(schemaLen);
...@@ -1759,5 +1800,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1759,5 +1800,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
} }
ret->length = htonl(ret->length); ret->length = htonl(ret->length);
taosArrayDestroy(tagArray);
return ret; return ret;
} }
此差异已折叠。
...@@ -3909,7 +3909,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { ...@@ -3909,7 +3909,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (pReq->type == TSDB_CHILD_TABLE) { if (pReq->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1; if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1;
if (tEncodeBinary(pCoder, pReq->ctb.pTag, kvRowLen(pReq->ctb.pTag)) < 0) return -1; if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) { } else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else { } else {
...@@ -3921,8 +3921,6 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { ...@@ -3921,8 +3921,6 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
} }
int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
uint32_t len;
if (tStartDecode(pCoder) < 0) return -1; if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1; if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
...@@ -3934,7 +3932,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { ...@@ -3934,7 +3932,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (pReq->type == TSDB_CHILD_TABLE) { if (pReq->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1; if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1; if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) { } else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else { } else {
......
...@@ -78,7 +78,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); ...@@ -78,7 +78,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderClear(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader);
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReadNext(SMetaReader *pReader); int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid); const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *tagVal);
typedef struct SMetaFltParam { typedef struct SMetaFltParam {
tb_uid_t suid; tb_uid_t suid;
......
...@@ -117,7 +117,7 @@ typedef struct { ...@@ -117,7 +117,7 @@ typedef struct {
} SSmaIdxKey; } SSmaIdxKey;
// metaTable ================== // metaTable ==================
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int8_t type, tb_uid_t uid, int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey); STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey);
#ifndef META_REFACT #ifndef META_REFACT
......
...@@ -30,7 +30,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { ...@@ -30,7 +30,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1; if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1; if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
if (tEncodeBinary(pCoder, pME->ctbEntry.pTags, kvRowLen(pME->ctbEntry.pTags)) < 0) return -1; if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
...@@ -47,7 +47,6 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { ...@@ -47,7 +47,6 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
} }
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
uint32_t len;
if (tStartDecode(pCoder) < 0) return -1; if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pME->version) < 0) return -1; if (tDecodeI64(pCoder, &pME->version) < 0) return -1;
...@@ -62,7 +61,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -62,7 +61,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
if (tDecodeBinary(pCoder, &pME->ctbEntry.pTags, &len) < 0) return -1; // (TODO) if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
......
...@@ -573,10 +573,23 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) { ...@@ -573,10 +573,23 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) {
#endif #endif
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid) { const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
ASSERT(pEntry->type == TSDB_CHILD_TABLE); ASSERT(pEntry->type == TSDB_CHILD_TABLE);
return tdGetKVRowValOfCol((const SKVRow)pEntry->ctbEntry.pTags, cid); STag *tag = (STag *)pEntry->ctbEntry.pTags;
if (type == TSDB_DATA_TYPE_JSON){
if(tag->nTag == 0){
return NULL;
}
return tag;
}
bool find = tTagGet(tag, val);
if(!find){
return NULL;
}
return val;
} }
typedef struct { typedef struct {
SMeta * pMeta; SMeta * pMeta;
TBC * pCur; TBC * pCur;
...@@ -609,7 +622,13 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { ...@@ -609,7 +622,13 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
STagIdxKey *pKey = NULL; STagIdxKey *pKey = NULL;
int32_t nKey = 0; int32_t nKey = 0;
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, param->val, pCursor->type, int32_t nTagData = 0;
if(IS_VAR_DATA_TYPE(param->type)){
nTagData = strlen(param->val);
}else{
nTagData = tDataTypes[param->type].bytes;
}
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, param->val, nTagData, pCursor->type,
param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey); param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
if (ret != 0) { if (ret != 0) {
goto END; goto END;
...@@ -651,4 +670,4 @@ END: ...@@ -651,4 +670,4 @@ END:
taosMemoryFree(pCursor); taosMemoryFree(pCursor);
return ret; return ret;
} }
\ No newline at end of file
...@@ -563,29 +563,39 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -563,29 +563,39 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
} }
memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal); memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
} else { } else {
SKVRowBuilder kvrb = {0}; const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags;
const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags; STag *pNewTag = NULL;
SKVRow pNewTag = NULL; SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
if (!pTagArray) {
tdInitKVRowBuilder(&kvrb); terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
for (int32_t i = 0; i < pTagSchema->nCols; i++) { for (int32_t i = 0; i < pTagSchema->nCols; i++) {
SSchema *pCol = &pTagSchema->pSchema[i]; SSchema *pCol = &pTagSchema->pSchema[i];
if (iCol == i) { if (iCol == i) {
tdAddColToKVRow(&kvrb, pCol->colId, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal); STagVal val = {0};
val.type = pCol->type;
val.cid = pCol->colId;
if (IS_VAR_DATA_TYPE(pCol->type)) {
val.pData = pAlterTbReq->pTagVal;
val.nData = pAlterTbReq->nTagVal;
}else{
memcpy(&val.i64, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
}
taosArrayPush(pTagArray, &val);
} else { } else {
void *p = tdGetKVRowValOfCol(pOldTag, pCol->colId); STagVal val = {0};
if (p) { if (tTagGet(pOldTag, &val)) {
if (IS_VAR_DATA_TYPE(pCol->type)) { taosArrayPush(pTagArray, &val);
tdAddColToKVRow(&kvrb, pCol->colId, p, varDataTLen(p));
} else {
tdAddColToKVRow(&kvrb, pCol->colId, p, pCol->bytes);
}
} }
} }
} }
if ((terrno = tTagNew(pTagArray, pTagSchema->version, false, &pNewTag)) < 0) {
ctbEntry.ctbEntry.pTags = tdGetKVRowFromBuilder(&kvrb); taosArrayDestroy(pTagArray);
tdDestroyKVRowBuilder(&kvrb); goto _err;
}
ctbEntry.ctbEntry.pTags = (uint8_t *)pNewTag;
taosArrayDestroy(pTagArray);
} }
// save to table.db // save to table.db
...@@ -721,17 +731,17 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -721,17 +731,17 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn); return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
} }
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid, int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) { STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
int32_t nTagData = 0; // int32_t nTagData = 0;
if (pTagData) { // if (pTagData) {
if (IS_VAR_DATA_TYPE(type)) { // if (IS_VAR_DATA_TYPE(type)) {
nTagData = varDataTLen(pTagData); // nTagData = varDataTLen(pTagData);
} else { // } else {
nTagData = tDataTypes[type].bytes; // nTagData = tDataTypes[type].bytes;
} // }
} // }
*nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t); *nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t);
*ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey); *ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
...@@ -762,7 +772,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -762,7 +772,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
STagIdxKey * pTagIdxKey = NULL; STagIdxKey * pTagIdxKey = NULL;
int32_t nTagIdxKey; int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0]; const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
const void * pTagData = NULL; // const void *pTagData = NULL; //
int32_t nTagData = 0;
SDecoder dc = {0}; SDecoder dc = {0};
// get super table // get super table
...@@ -775,7 +786,22 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -775,7 +786,22 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
metaDecodeEntry(&dc, &stbEntry); metaDecodeEntry(&dc, &stbEntry);
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0]; pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
pTagData = tdGetKVRowValOfCol((const SKVRow)pCtbEntry->ctbEntry.pTags, pTagColumn->colId);
STagVal tagVal = {.cid = pTagColumn->colId};
if(pTagColumn->type != TSDB_DATA_TYPE_JSON){
tTagGet((const STag *)pCtbEntry->ctbEntry.pTags, &tagVal);
if(IS_VAR_DATA_TYPE(pTagColumn->type)){
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
}else{
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pTagColumn->type].bytes;
}
}else{
//pTagData = pCtbEntry->ctbEntry.pTags;
//nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
}
// update tag index // update tag index
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
...@@ -790,7 +816,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -790,7 +816,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid); int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid);
indexMultiTermDestroy(tmGroup); indexMultiTermDestroy(tmGroup);
#else #else
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, pTagColumn->type, pCtbEntry->uid, if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, pCtbEntry->uid,
&pTagIdxKey, &nTagIdxKey) < 0) { &pTagIdxKey, &nTagIdxKey) < 0) {
return -1; return -1;
} }
......
...@@ -313,29 +313,23 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_ ...@@ -313,29 +313,23 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_
if (fmIsScanPseudoColumnFunc(functionId)) { if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId); setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
} else { // these are tags } else { // these are tags
const char* p = NULL; STagVal tagVal = {0};
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { tagVal.cid = pExpr->base.pParam[0].pCol->colId;
const uint8_t* tmp = mr.me.ctbEntry.pTags; const char *p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);
char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1); char *data = NULL;
if (data == NULL) { if(pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL){
metaReaderClear(&mr); data = tTagValToData((const STagVal *)p, false);
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1); }else {
return; data = (char*)p;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data + 1, tmp, kvRowLen(tmp));
p = data;
} else {
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
} }
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL)); colDataAppend(pColInfoData, i, data, (data == NULL));
} }
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { if(pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL &&
taosMemoryFree((void*)p); IS_VAR_DATA_TYPE(((const STagVal *)p)->type) && data){
taosMemoryFree(data);
} }
} }
} }
...@@ -1589,22 +1583,21 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1589,22 +1583,21 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STR_TO_VARSTR(str, mr.me.name); STR_TO_VARSTR(str, mr.me.name);
colDataAppend(pDst, count, str, false); colDataAppend(pDst, count, str, false);
} else { // it is a tag value } else { // it is a tag value
if (pDst->info.type == TSDB_DATA_TYPE_JSON) { STagVal val = {0};
const uint8_t* tmp = mr.me.ctbEntry.pTags; val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
// TODO opt perf by realloc memory const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);
char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if (data == NULL) { char *data = NULL;
qError("%s failed to malloc memory, size:%d", GET_TASKID(pTaskInfo), kvRowLen(tmp) + 1); if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL){
longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); data = tTagValToData((const STagVal *)p, false);
} }else {
data = (char*)p;
}
colDataAppend(pDst, count, data, (data == NULL));
*data = TSDB_DATA_TYPE_JSON; if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL
memcpy(data + 1, tmp, kvRowLen(tmp)); && IS_VAR_DATA_TYPE(((const STagVal *)p)->type) && data != NULL){
colDataAppend(pDst, count, data, false);
taosMemoryFree(data); taosMemoryFree(data);
} else {
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
colDataAppend(pDst, count, p, (p == NULL));
} }
} }
} }
...@@ -1634,8 +1627,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1634,8 +1627,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableListInfo,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
......
...@@ -58,7 +58,7 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta); ...@@ -58,7 +58,7 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta);
int32_t getNumOfTags(const STableMeta* pTableMeta); int32_t getNumOfTags(const STableMeta* pTableMeta);
STableComInfo getTableInfo(const STableMeta* pTableMeta); STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta); STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* errMsg, int16_t startColId); int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag **ppTag, SMsgBuf* pMsgBuf);
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
......
此差异已折叠。
...@@ -4162,8 +4162,8 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -4162,8 +4162,8 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
return code; return code;
} }
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt, SKVRow row, static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt,
uint64_t suid, SVgroupInfo* pVgInfo) { const STag* pTag, uint64_t suid, SVgroupInfo* pVgInfo) {
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId}; SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
strcpy(name.dbname, pStmt->dbName); strcpy(name.dbname, pStmt->dbName);
...@@ -4173,7 +4173,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S ...@@ -4173,7 +4173,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
req.type = TD_CHILD_TABLE; req.type = TD_CHILD_TABLE;
req.name = strdup(pStmt->tableName); req.name = strdup(pStmt->tableName);
req.ctb.suid = suid; req.ctb.suid = suid;
req.ctb.pTag = row; req.ctb.pTag = (uint8_t*)pTag;
if (pStmt->ignoreExists) { if (pStmt->ignoreExists) {
req.flags |= TD_CREATE_IF_NOT_EXISTS; req.flags |= TD_CREATE_IF_NOT_EXISTS;
} }
...@@ -4193,24 +4193,6 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S ...@@ -4193,24 +4193,6 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
} }
} }
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
SKVRowBuilder* pBuilder) {
if (pSchema->type == TSDB_DATA_TYPE_JSON) {
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
}
return parseJsontoTagData(pVal->literal, pBuilder, &pCxt->msgBuf, pSchema->colId);
}
if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
tdAddColToKVRow(pBuilder, pSchema->colId, nodesGetValueFromNode(pVal),
IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
}
return TSDB_CODE_SUCCESS;
}
static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) { static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) {
int32_t code = getFuncInfo(pCxt, pFunc); int32_t code = getFuncInfo(pCxt, pFunc);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -4238,15 +4220,22 @@ static int32_t translateTagVal(STranslateContext* pCxt, uint8_t precision, SSche ...@@ -4238,15 +4220,22 @@ static int32_t translateTagVal(STranslateContext* pCxt, uint8_t precision, SSche
} }
static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta, static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta,
SKVRowBuilder* pBuilder) { STag** ppTag) {
int32_t numOfTags = getNumOfTags(pSuperTableMeta); int32_t numOfTags = getNumOfTags(pSuperTableMeta);
if (LIST_LENGTH(pStmt->pValsOfTags) != LIST_LENGTH(pStmt->pSpecificTags) || if (LIST_LENGTH(pStmt->pValsOfTags) != LIST_LENGTH(pStmt->pSpecificTags) ||
numOfTags < LIST_LENGTH(pStmt->pValsOfTags)) { numOfTags < LIST_LENGTH(pStmt->pValsOfTags)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TAGS_NOT_MATCHED); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
} }
SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal));
if (!pTagArray) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY);
}
int32_t code = TSDB_CODE_SUCCESS;
int16_t nTags = 0, nBufPos = 0;
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta); SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
SNode * pTag, *pNode; SNode * pTag = NULL, *pNode = NULL;
bool isJson = false;
FORBOTH(pTag, pStmt->pSpecificTags, pNode, pStmt->pValsOfTags) { FORBOTH(pTag, pStmt->pSpecificTags, pNode, pStmt->pValsOfTags) {
SColumnNode* pCol = (SColumnNode*)pTag; SColumnNode* pCol = (SColumnNode*)pTag;
SSchema* pSchema = NULL; SSchema* pSchema = NULL;
...@@ -4257,56 +4246,125 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla ...@@ -4257,56 +4246,125 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
} }
} }
if (NULL == pSchema) { if (NULL == pSchema) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName);
goto end;
} }
SValueNode* pVal = NULL; SValueNode* pVal = NULL;
int32_t code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pSchema, pNode, &pVal); code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pSchema, pNode, &pVal);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS != code) {
if (NULL == pVal) { goto end;
pVal = (SValueNode*)pNode;
} else {
REPLACE_LIST2_NODE(pVal);
}
} }
if (TSDB_CODE_SUCCESS == code) {
code = addValToKVRow(pCxt, pVal, pSchema, pBuilder); if (NULL == pVal) {
pVal = (SValueNode*)pNode;
} else {
REPLACE_LIST2_NODE(pVal);
} }
if (TSDB_CODE_SUCCESS != code) { if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
return code; if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
code = buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
goto end;
}
isJson = true;
code = parseJsontoTagData(pVal->literal, pTagArray, ppTag, &pCxt->msgBuf);
if(code != TSDB_CODE_SUCCESS){
goto end;
}
}else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
void* nodeVal = nodesGetValueFromNode(pVal);
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
val.pData = varDataVal(nodeVal);
val.nData = varDataLen(nodeVal);
} else {
memcpy(&val.i64, nodeVal, pTagSchema->bytes);
}
taosArrayPush(pTagArray, &val);
} }
} }
if(!isJson) code = tTagNew(pTagArray, 1, false, ppTag);
end:
if(isJson){
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal *p = (STagVal *)taosArrayGet(pTagArray, i);
if(IS_VAR_DATA_TYPE(p->type)){
taosMemoryFree(p->pData);
}
}
}
taosArrayDestroy(pTagArray);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta, static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta,
SKVRowBuilder* pBuilder) { STag** ppTag) {
if (getNumOfTags(pSuperTableMeta) != LIST_LENGTH(pStmt->pValsOfTags)) { if (getNumOfTags(pSuperTableMeta) != LIST_LENGTH(pStmt->pValsOfTags)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TAGS_NOT_MATCHED); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
} }
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta); SSchema* pTagSchemas = getTableTagSchema(pSuperTableMeta);
SNode* pNode; SNode* pNode;
int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0; int32_t index = 0;
SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal));
if (!pTagArray) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY);
}
bool isJson = false;
FOREACH(pNode, pStmt->pValsOfTags) { FOREACH(pNode, pStmt->pValsOfTags) {
SValueNode* pVal = NULL; SValueNode* pVal = NULL;
int32_t code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pTagSchema + index, pNode, &pVal); SSchema* pTagSchema = pTagSchemas + index;
if (TSDB_CODE_SUCCESS == code) { code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pTagSchema, pNode, &pVal);
if (NULL == pVal) { if (TSDB_CODE_SUCCESS != code) {
pVal = (SValueNode*)pNode; goto end;
}
if (NULL == pVal) {
pVal = (SValueNode*)pNode;
} else {
REPLACE_NODE(pVal);
}
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
code = buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
goto end;
}
isJson = true;
code = parseJsontoTagData(pVal->literal, pTagArray, ppTag, &pCxt->msgBuf);
if(code != TSDB_CODE_SUCCESS){
goto end;
}
}else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
char* tmpVal = nodesGetValueFromNode(pVal);
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
val.pData = varDataVal(tmpVal);
val.nData = varDataLen(tmpVal);
} else { } else {
REPLACE_NODE(pVal); memcpy(&val.i64, tmpVal, pTagSchema->bytes);
} }
taosArrayPush(pTagArray, &val);
} }
if (TSDB_CODE_SUCCESS == code) { ++index;
code = addValToKVRow(pCxt, pVal, pTagSchema + index++, pBuilder); }
} if(!isJson) code = tTagNew(pTagArray, 1, false, ppTag);
if (TSDB_CODE_SUCCESS != code) {
return code; end:
if(isJson){
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal *p = (STagVal *)taosArrayGet(pTagArray, i);
if(IS_VAR_DATA_TYPE(p->type)){
taosMemoryFree(p->pData);
}
} }
} }
return TSDB_CODE_SUCCESS; taosArrayDestroy(pTagArray);
return code;
} }
static int32_t checkCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt) { static int32_t checkCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt) {
...@@ -4323,26 +4381,13 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla ...@@ -4323,26 +4381,13 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta); code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
} }
SKVRowBuilder kvRowBuilder = {0}; STag* pTag = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = tdInitKVRowBuilder(&kvRowBuilder);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (NULL != pStmt->pSpecificTags) { if (NULL != pStmt->pSpecificTags) {
code = buildKVRowForBindTags(pCxt, pStmt, pSuperTableMeta, &kvRowBuilder); code = buildKVRowForBindTags(pCxt, pStmt, pSuperTableMeta, &pTag);
} else { } else {
code = buildKVRowForAllTags(pCxt, pStmt, pSuperTableMeta, &kvRowBuilder); code = buildKVRowForAllTags(pCxt, pStmt, pSuperTableMeta, &pTag);
}
}
SKVRow row = NULL;
if (TSDB_CODE_SUCCESS == code) {
row = tdGetKVRowFromBuilder(&kvRowBuilder);
if (NULL == row) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
tdSortKVRowByColIdx(row);
} }
} }
...@@ -4351,11 +4396,10 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla ...@@ -4351,11 +4396,10 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info); code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, row, pSuperTableMeta->uid, &info); addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, pTag, pSuperTableMeta->uid, &info);
} }
taosMemoryFreeClear(pSuperTableMeta); taosMemoryFreeClear(pSuperTableMeta);
tdDestroyKVRowBuilder(&kvRowBuilder);
return code; return code;
} }
...@@ -4577,37 +4621,41 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS ...@@ -4577,37 +4621,41 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type); pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) { if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) {
SKVRowBuilder kvRowBuilder = {0};
int32_t code = tdInitKVRowBuilder(&kvRowBuilder);
if (TSDB_CODE_SUCCESS != code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pStmt->pVal->literal && if (pStmt->pVal->literal &&
strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal); return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal);
} }
SArray *pTagVals = taosArrayInit(1, sizeof(STagVal));
code = parseJsontoTagData(pStmt->pVal->literal, &kvRowBuilder, &pCxt->msgBuf, pSchema->colId); int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS != code) { STag* pTag = NULL;
return code; do{
code = parseJsontoTagData(pStmt->pVal->literal, pTagVals, &pTag, &pCxt->msgBuf);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}while(0);
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal *p = (STagVal *)taosArrayGet(pTagVals, i);
if(IS_VAR_DATA_TYPE(p->type)){
taosMemoryFree(p->pData);
}
} }
taosArrayDestroy(pTagVals);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); if (code != TSDB_CODE_SUCCESS){
if (NULL == row) { return code;
tdDestroyKVRowBuilder(&kvRowBuilder);
return TSDB_CODE_OUT_OF_MEMORY;
} }
pReq->nTagVal = kvRowLen(row); pReq->nTagVal = pTag->len;
pReq->pTagVal = row; pReq->pTagVal = (uint8_t *)pTag;
pStmt->pVal->datum.p = row; // for free pStmt->pVal->datum.p = (char*)pTag; // for free
tdDestroyKVRowBuilder(&kvRowBuilder);
} else { } else {
pReq->nTagVal = pStmt->pVal->node.resType.bytes; pReq->nTagVal = pStmt->pVal->node.resType.bytes;
if (TSDB_DATA_TYPE_NCHAR == pStmt->pVal->node.resType.type) {
pReq->nTagVal = pReq->nTagVal * TSDB_NCHAR_SIZE;
}
pReq->pTagVal = nodesGetValueFromNode(pStmt->pVal); pReq->pTagVal = nodesGetValueFromNode(pStmt->pVal);
// data and length are seperated for new tag format STagVal
if (IS_VAR_DATA_TYPE(pStmt->pVal->node.resType.type)) {
pReq->nTagVal = varDataLen(pReq->pTagVal);
pReq->pTagVal = varDataVal(pReq->pTagVal);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -322,33 +322,35 @@ static bool isValidateTag(char* input) { ...@@ -322,33 +322,35 @@ static bool isValidateTag(char* input) {
return true; return true;
} }
int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* pMsgBuf, int16_t startColId) { int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag **ppTag, SMsgBuf* pMsgBuf) {
int32_t retCode = TSDB_CODE_SUCCESS;
cJSON* root = NULL;
SHashObj* keyHash = NULL;
int32_t size = 0;
// set json NULL data // set json NULL data
uint8_t jsonNULL = TSDB_DATA_TYPE_NULL;
int32_t jsonIndex = startColId + 1;
if (!json || strtrim((char*)json) == 0 || strcasecmp(json, TSDB_DATA_NULL_STR_L) == 0) { if (!json || strtrim((char*)json) == 0 || strcasecmp(json, TSDB_DATA_NULL_STR_L) == 0) {
tdAddColToKVRow(kvRowBuilder, jsonIndex, &jsonNULL, CHAR_BYTES); retCode = TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS; goto end;
} }
// set json real data // set json real data
cJSON* root = cJSON_Parse(json); root = cJSON_Parse(json);
if (root == NULL) { if (root == NULL) {
return buildSyntaxErrMsg(pMsgBuf, "json parse error", json); retCode = buildSyntaxErrMsg(pMsgBuf, "json parse error", json);
goto end;
} }
int32_t size = cJSON_GetArraySize(root); size = cJSON_GetArraySize(root);
if (!cJSON_IsObject(root)) { if (!cJSON_IsObject(root)) {
return buildSyntaxErrMsg(pMsgBuf, "json error invalide value", json); retCode = buildSyntaxErrMsg(pMsgBuf, "json error invalide value", json);
goto end;
} }
int32_t retCode = 0; keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
char* tagKV = NULL;
SHashObj* keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i); cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) { if (!item) {
qError("json inner error:%d", i); uError("json inner error:%d", i);
retCode = buildSyntaxErrMsg(pMsgBuf, "json inner error", json); retCode = buildSyntaxErrMsg(pMsgBuf, "json inner error", json);
goto end; goto end;
} }
...@@ -360,85 +362,60 @@ int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBu ...@@ -360,85 +362,60 @@ int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBu
} }
size_t keyLen = strlen(jsonKey); size_t keyLen = strlen(jsonKey);
if (keyLen > TSDB_MAX_JSON_KEY_LEN) { if (keyLen > TSDB_MAX_JSON_KEY_LEN) {
qError("json key too long error"); uError("json key too long error");
retCode = buildSyntaxErrMsg(pMsgBuf, "json key too long, more than 256", jsonKey); retCode = buildSyntaxErrMsg(pMsgBuf, "json key too long, more than 256", jsonKey);
goto end; goto end;
} }
if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) { if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) {
continue; continue;
} }
// key: keyLen + VARSTR_HEADER_SIZE, value type: CHAR_BYTES, value reserved: DOUBLE_BYTES STagVal val = {0};
tagKV = taosMemoryCalloc(keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES, 1); val.pKey = jsonKey;
if (!tagKV) { taosHashPut(keyHash, jsonKey, keyLen, &keyLen, CHAR_BYTES); // add key to hash to remove dumplicate, value is useless
retCode = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
strncpy(varDataVal(tagKV), jsonKey, keyLen);
varDataSetLen(tagKV, keyLen);
if (taosHashGetSize(keyHash) == 0) {
uint8_t jsonNotNULL = TSDB_DATA_TYPE_JSON;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, &jsonNotNULL, CHAR_BYTES); // add json type
}
taosHashPut(keyHash, jsonKey, keyLen, &keyLen,
CHAR_BYTES); // add key to hash to remove dumplicate, value is useless
if (item->type == cJSON_String) { // add json value format: type|data if (item->type == cJSON_String) { // add json value format: type|data
char* jsonValue = item->valuestring; char* jsonValue = item->valuestring;
int32_t valLen = (int32_t)strlen(jsonValue); int32_t valLen = (int32_t)strlen(jsonValue);
int32_t totalLen = keyLen + VARSTR_HEADER_SIZE + valLen * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE + CHAR_BYTES; char* tmp = taosMemoryCalloc(1, valLen * TSDB_NCHAR_SIZE);
char* tmp = taosMemoryRealloc(tagKV, totalLen);
if (!tmp) { if (!tmp) {
retCode = TSDB_CODE_TSC_OUT_OF_MEMORY; retCode = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end; goto end;
} }
tagKV = tmp; val.type = TSDB_DATA_TYPE_NCHAR;
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); if (valLen > 0 && !taosMbsToUcs4(jsonValue, valLen, (TdUcs4*)tmp,
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
*valueType = TSDB_DATA_TYPE_NCHAR;
if (valLen > 0 && !taosMbsToUcs4(jsonValue, valLen, (TdUcs4*)varDataVal(valueData),
(int32_t)(valLen * TSDB_NCHAR_SIZE), &valLen)) { (int32_t)(valLen * TSDB_NCHAR_SIZE), &valLen)) {
qError("charset:%s to %s. val:%s, errno:%s, convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, jsonValue, uError("charset:%s to %s. val:%s, errno:%s, convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, jsonValue,
strerror(errno)); strerror(errno));
retCode = buildSyntaxErrMsg(pMsgBuf, "charset convert json error", jsonValue); retCode = buildSyntaxErrMsg(pMsgBuf, "charset convert json error", jsonValue);
goto end; goto end;
} }
val.nData = valLen;
varDataSetLen(valueData, valLen); val.pData = tmp;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, totalLen);
} else if (item->type == cJSON_Number) { } else if (item->type == cJSON_Number) {
if (!isfinite(item->valuedouble)) { if (!isfinite(item->valuedouble)) {
qError("json value is invalidate"); uError("json value is invalidate");
retCode = buildSyntaxErrMsg(pMsgBuf, "json value number is illegal", json); retCode = buildSyntaxErrMsg(pMsgBuf, "json value number is illegal", json);
goto end; goto end;
} }
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); val.type = TSDB_DATA_TYPE_DOUBLE;
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES); *((double*)&(val.i64)) = item->valuedouble;
*valueType = TSDB_DATA_TYPE_DOUBLE;
*((double*)valueData) = item->valuedouble;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES);
} else if (item->type == cJSON_True || item->type == cJSON_False) { } else if (item->type == cJSON_True || item->type == cJSON_False) {
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); val.type = TSDB_DATA_TYPE_BOOL;
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES); *((char*)&(val.i64)) = (char)(item->valueint);
*valueType = TSDB_DATA_TYPE_BOOL;
*valueData = (char)(item->valueint);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + CHAR_BYTES);
} else if (item->type == cJSON_NULL) { } else if (item->type == cJSON_NULL) {
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); val.type = TSDB_DATA_TYPE_NULL;
*valueType = TSDB_DATA_TYPE_NULL;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
} else { } else {
retCode = buildSyntaxErrMsg(pMsgBuf, "invalidate json value", json); retCode = buildSyntaxErrMsg(pMsgBuf, "invalidate json value", json);
goto end; goto end;
} }
} taosArrayPush(pTagVals, &val);
if (taosHashGetSize(keyHash) == 0) { // set json NULL true
tdAddColToKVRow(kvRowBuilder, jsonIndex, &jsonNULL, CHAR_BYTES);
} }
end: end:
taosMemoryFree(tagKV);
taosHashCleanup(keyHash); taosHashCleanup(keyHash);
if(retCode == TSDB_CODE_SUCCESS){
tTagNew(pTagVals, 1, true, ppTag);
}
cJSON_Delete(root); cJSON_Delete(root);
return retCode; return retCode;
} }
......
...@@ -922,23 +922,13 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) { ...@@ -922,23 +922,13 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
} }
} }
char *getJsonValue(char *json, char *key){ //todo STagVal getJsonValue(char *json, char *key, bool *isExist) {
json++; // jump type STagVal val = {.pKey = key};
int16_t cols = kvRowNCols(json); bool find = tTagGet(((const STag *)json), &val); // json value is null and not exist is different
for (int i = 0; i < cols; ++i) { if(isExist){
SColIdx *pColIdx = kvRowColIdxAt(json, i); *isExist = find;
char *data = kvRowColVal(json, pColIdx);
if(i == 0){
if(*data == TSDB_DATA_TYPE_NULL) {
return NULL;
}
continue;
}
if(memcmp(key, data, varDataTLen(data)) == 0){
return data + varDataTLen(data);
}
} }
return NULL; return val;
} }
void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
...@@ -950,6 +940,8 @@ void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO ...@@ -950,6 +940,8 @@ void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows); pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
char *pRightData = colDataGetVarData(pRight->columnData, 0); char *pRightData = colDataGetVarData(pRight->columnData, 0);
char *jsonKey = taosMemoryCalloc(1, varDataLen(pRightData) + 1);
memcpy(jsonKey, varDataVal(pRightData), varDataLen(pRightData));
for (; i >= 0 && i < pLeft->numOfRows; i += step) { for (; i >= 0 && i < pLeft->numOfRows; i += step) {
if (colDataIsNull_var(pLeft->columnData, i)) { if (colDataIsNull_var(pLeft->columnData, i)) {
colDataSetNull_var(pOutputCol, i); colDataSetNull_var(pOutputCol, i);
...@@ -957,14 +949,15 @@ void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO ...@@ -957,14 +949,15 @@ void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO
continue; continue;
} }
char *pLeftData = colDataGetVarData(pLeft->columnData, i); char *pLeftData = colDataGetVarData(pLeft->columnData, i);
char *value = getJsonValue(pLeftData, pRightData); bool isExist = false;
if (!value) { STagVal value = getJsonValue(pLeftData, jsonKey, &isExist);
colDataSetNull_var(pOutputCol, i); char *data = isExist ? tTagValToData(&value, true) : NULL;
pOutputCol->hasNull = true; colDataAppend(pOutputCol, i, data, data == NULL);
continue; if(isExist && IS_VAR_DATA_TYPE(value.type) && data){
taosMemoryFree(data);
} }
colDataAppend(pOutputCol, i, value, false);
} }
taosMemoryFree(jsonKey);
} }
void vectorMathAdd(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorMathAdd(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
......
...@@ -217,7 +217,7 @@ void scltMakeOpNode(SNode **pNode, EOperatorType opType, int32_t resType, SNode ...@@ -217,7 +217,7 @@ void scltMakeOpNode(SNode **pNode, EOperatorType opType, int32_t resType, SNode
SOperatorNode *onode = (SOperatorNode *)node; SOperatorNode *onode = (SOperatorNode *)node;
onode->node.resType.type = resType; onode->node.resType.type = resType;
onode->node.resType.bytes = tDataTypes[resType].bytes; onode->node.resType.bytes = tDataTypes[resType].bytes;
onode->opType = opType; onode->opType = opType;
onode->pLeft = pLeft; onode->pLeft = pLeft;
onode->pRight = pRight; onode->pRight = pRight;
...@@ -1035,7 +1035,7 @@ void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){ ...@@ -1035,7 +1035,7 @@ void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){
SNode *pLeft = NULL, *pRight = NULL; SNode *pLeft = NULL, *pRight = NULL;
scltMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, keyVar); scltMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, keyVar);
scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, kvRowLen(json), 1, json); scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, ((STag*)json)->len, 1, json);
scltMakeOpNode(opNode, OP_TYPE_JSON_GET_VALUE, TSDB_DATA_TYPE_JSON, pLeft, pRight); scltMakeOpNode(opNode, OP_TYPE_JSON_GET_VALUE, TSDB_DATA_TYPE_JSON, pLeft, pRight);
} }
...@@ -1111,17 +1111,9 @@ TEST(columnTest, json_column_arith_op) { ...@@ -1111,17 +1111,9 @@ TEST(columnTest, json_column_arith_op) {
char rightv[256] = {0}; char rightv[256] = {0};
memcpy(rightv, rightvTmp, strlen(rightvTmp)); memcpy(rightv, rightvTmp, strlen(rightvTmp));
SKVRowBuilder kvRowBuilder; SArray *tags = taosArrayInit(1, sizeof(STagVal));
tdInitKVRowBuilder(&kvRowBuilder); STag* row = NULL;
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0); parseJsontoTagData(rightv, tags, &row, NULL);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
char *tmp = (char *)taosMemoryRealloc(row, kvRowLen(row)+1);
if(tmp == NULL){
ASSERT_TRUE(0);
}
memmove(tmp+1, tmp, kvRowLen(tmp));
*tmp = TSDB_DATA_TYPE_JSON;
row = tmp;
const int32_t len = 8; const int32_t len = 8;
EOperatorType op[len] = {OP_TYPE_ADD, OP_TYPE_SUB, OP_TYPE_MULTI, OP_TYPE_DIV, EOperatorType op[len] = {OP_TYPE_ADD, OP_TYPE_SUB, OP_TYPE_MULTI, OP_TYPE_DIV,
...@@ -1175,7 +1167,7 @@ TEST(columnTest, json_column_arith_op) { ...@@ -1175,7 +1167,7 @@ TEST(columnTest, json_column_arith_op) {
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]);
} }
tdDestroyKVRowBuilder(&kvRowBuilder); taosArrayDestroy(tags);
taosMemoryFree(row); taosMemoryFree(row);
} }
...@@ -1195,17 +1187,9 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1195,17 +1187,9 @@ TEST(columnTest, json_column_logic_op) {
char rightv[256] = {0}; char rightv[256] = {0};
memcpy(rightv, rightvTmp, strlen(rightvTmp)); memcpy(rightv, rightvTmp, strlen(rightvTmp));
SKVRowBuilder kvRowBuilder; SArray *tags = taosArrayInit(1, sizeof(STagVal));
tdInitKVRowBuilder(&kvRowBuilder); STag* row = NULL;
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0); parseJsontoTagData(rightv, tags, &row, NULL);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
char *tmp = (char *)taosMemoryRealloc(row, kvRowLen(row)+1);
if(tmp == NULL){
ASSERT_TRUE(0);
}
memmove(tmp+1, tmp, kvRowLen(tmp));
*tmp = TSDB_DATA_TYPE_JSON;
row = tmp;
const int32_t len = 9; const int32_t len = 9;
const int32_t len1 = 4; const int32_t len1 = 4;
...@@ -1305,7 +1289,7 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1305,7 +1289,7 @@ TEST(columnTest, json_column_logic_op) {
taosMemoryFree(rightData); taosMemoryFree(rightData);
} }
tdDestroyKVRowBuilder(&kvRowBuilder); taosArrayDestroy(tags);
taosMemoryFree(row); taosMemoryFree(row);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册