diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index b74d0b34943af7e5a61e3c0fd8d4bc481736c9d6..d5f1cf512c8e9fb3631ab55b1b72ee101449cc09 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -69,6 +69,7 @@ extern "C" { #define VALUE "_value" #define VALUE_LEN 6 +#define OTD_JSON_FIELDS_NUM 4 #define MAX_RETRY_TIMES 5 typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; @@ -177,9 +178,10 @@ typedef struct { int32_t lineNum; SSmlMsgBuf msgBuf; -// cJSON *root; // for parse json - int8_t offset[4]; + cJSON *root; // for parse json + int8_t offset[OTD_JSON_FIELDS_NUM]; SSmlLineInfo *lines; // element is SSmlLineInfo + bool parseJsonByLib; // SArray *preLineTagKV; @@ -206,9 +208,9 @@ typedef int32_t (*_equal_fn_sml)(const void *, const void *); SSmlHandle *smlBuildSmlInfo(TAOS *taos); void smlDestroyInfo(SSmlHandle *info); -void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset); -void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset); -SArray *smlJsonParseTags(char *start, char *end); +int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset); +int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset); +//SArray *smlJsonParseTags(char *start, char *end); bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg); void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn); int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn); @@ -226,6 +228,7 @@ int32_t is_same_child_table_telnet(const void *a, const void *b); int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len); int32_t smlClearForRerun(SSmlHandle *info); int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg); +uint8_t smlGetTimestampLen(int64_t num); int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements); int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index ba154c95bde3e6c8c3b2b9201b2b2c65a96b077c..4d091e4b61376856669babcde732bc23efdfcdb1 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1008,12 +1008,16 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols return TSDB_CODE_SUCCESS; } -static void smlDestroyTableInfo(SSmlTableInfo *tag) { +static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) { for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) { SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i); taosHashCleanup(kvHash); } + if(info->parseJsonByLib){ + SSmlLineInfo *key = (SSmlLineInfo *)(tag->key); + if(key != NULL) taosMemoryFree(key->tags); + } taosMemoryFree(tag->key); taosArrayDestroy(tag->cols); taosArrayDestroy(tag->tags); @@ -1028,7 +1032,7 @@ void smlDestroyInfo(SSmlHandle *info) { NodeList *tmp = info->childTables; while (tmp) { if (tmp->data.used) { - smlDestroyTableInfo((SSmlTableInfo *)tmp->data.value); + smlDestroyTableInfo(info, (SSmlTableInfo *)tmp->data.value); } NodeList *t = tmp->next; taosMemoryFree(tmp); @@ -1055,6 +1059,9 @@ void smlDestroyInfo(SSmlHandle *info) { if (!info->dataFormat) { for (int i = 0; i < info->lineNum; i++) { taosArrayDestroy(info->lines[i].colArray); + if(info->parseJsonByLib){ + taosMemoryFree(info->lines[i].tags); + } } taosMemoryFree(info->lines); } @@ -1251,7 +1258,7 @@ int32_t smlClearForRerun(SSmlHandle *info) { NodeList *pList = info->childTables; while (pList) { if (pList->data.used) { - smlDestroyTableInfo((SSmlTableInfo *)pList->data.value); + smlDestroyTableInfo(info, (SSmlTableInfo *)pList->data.value); pList->data.used = false; } pList = pList->next; @@ -1267,11 +1274,13 @@ int32_t smlClearForRerun(SSmlHandle *info) { pList = pList->next; } - if (unlikely(info->lines != NULL)) { - uError("SML:0x%" PRIx64 " info->lines != NULL", info->id); - return TSDB_CODE_SML_INVALID_DATA; + if (!info->dataFormat){ + if (unlikely(info->lines != NULL)) { + uError("SML:0x%" PRIx64 " info->lines != NULL", info->id); + return TSDB_CODE_SML_INVALID_DATA; + } + info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo)); } - info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo)); memset(&info->preLine, 0, sizeof(SSmlLineInfo)); info->currSTableMeta = NULL; diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index 3de1397f72533c2ee35df832116e5e81e45686e2..c4cd558418face3b3bc2680e929c661dd958a173 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -19,6 +19,8 @@ #include #include "clientSml.h" +#define OTD_JSON_SUB_FIELDS_NUM 2 + #define JUMP_JSON_SPACE(start) \ while(*(start)){\ if(unlikely(*(start) > 32))\ @@ -27,188 +29,193 @@ while(*(start)){\ (start)++;\ } -SArray *smlJsonParseTags(char *start, char *end){ - SArray *tags = taosArrayInit(4, sizeof(SSmlKv)); - while(start < end){ - SSmlKv kv = {0}; - kv.type = TSDB_DATA_TYPE_NCHAR; - bool isInQuote = false; - while(start < end){ - if(unlikely(!isInQuote && *start == '"')){ - start++; - kv.key = start; - isInQuote = true; - continue; - } - if(unlikely(isInQuote && *start == '"')){ - kv.keyLen = start - kv.key; - start++; - break; - } - start++; - } - bool hasColon = false; - while(start < end){ - if(unlikely(!hasColon && *start == ':')){ - start++; - hasColon = true; - continue; - } - if(unlikely(hasColon && kv.value == NULL && (*start > 32 && *start != '"'))){ - kv.value = start; - start++; - continue; - } - - if(unlikely(hasColon && kv.value != NULL && (*start == '"' || *start == ',' || *start == '}'))){ - kv.length = start - kv.value; - taosArrayPush(tags, &kv); - start++; - break; - } - start++; - } - } - return tags; -} - -static int32_t smlParseTagsFromJSON(SSmlHandle *info, SSmlLineInfo *elements) { - int32_t ret = TSDB_CODE_SUCCESS; - - if(is_same_child_table_telnet(elements, &info->preLine) == 0){ - return TSDB_CODE_SUCCESS; - } - - bool isSameMeasure = IS_SAME_SUPER_TABLE; - - int cnt = 0; - SArray *preLineKV = info->preLineTagKV; - bool isSuperKVInit = true; - SArray *superKV = NULL; - if(info->dataFormat){ - if(unlikely(!isSameMeasure)){ - SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); - - if(unlikely(sMeta == NULL)){ - sMeta = smlBuildSTableMeta(info->dataFormat); - STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); - sMeta->tableMeta = pTableMeta; - if(pTableMeta == NULL){ - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL); - } - info->currSTableMeta = sMeta->tableMeta; - superKV = sMeta->tags; - - if(unlikely(taosArrayGetSize(superKV) == 0)){ - isSuperKVInit = false; - } - taosArraySetSize(preLineKV, 0); - } - }else{ - taosArraySetSize(preLineKV, 0); - } - - SArray *tags = smlJsonParseTags(elements->tags, elements->tags + elements->tagsLen); - int32_t tagNum = taosArrayGetSize(tags); - for (int32_t i = 0; i < tagNum; ++i) { - SSmlKv kv = *(SSmlKv*)taosArrayGet(tags, i); - - if(info->dataFormat){ - if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){ - info->dataFormat = false; - info->reRun = true; - taosArrayDestroy(tags); - return TSDB_CODE_SUCCESS; - } - - if(isSameMeasure){ - if(unlikely(cnt >= taosArrayGetSize(preLineKV))) { - info->dataFormat = false; - info->reRun = true; - taosArrayDestroy(tags); - return TSDB_CODE_SUCCESS; - } - SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt); - if(unlikely(kv.length > preKV->length)){ - preKV->length = kv.length; - SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); - ASSERT(tableMeta != NULL); - - SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt); - oldKV->length = kv.length; - info->needModifySchema = true; - } - if(unlikely(!IS_SAME_KEY)){ - info->dataFormat = false; - info->reRun = true; - taosArrayDestroy(tags); - return TSDB_CODE_SUCCESS; - } - }else{ - if(isSuperKVInit){ - if(unlikely(cnt >= taosArrayGetSize(superKV))) { - info->dataFormat = false; - info->reRun = true; - taosArrayDestroy(tags); - return TSDB_CODE_SUCCESS; - } - SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt); - if(unlikely(kv.length > preKV->length)) { - preKV->length = kv.length; - }else{ - kv.length = preKV->length; - } - info->needModifySchema = true; - - if(unlikely(!IS_SAME_KEY)){ - info->dataFormat = false; - info->reRun = true; - taosArrayDestroy(tags); - return TSDB_CODE_SUCCESS; - } - }else{ - taosArrayPush(superKV, &kv); - } - taosArrayPush(preLineKV, &kv); - } - }else{ - taosArrayPush(preLineKV, &kv); - } - cnt++; - } - taosArrayDestroy(tags); - - SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet); - if (unlikely(tinfo == NULL)) { - tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen); - if (unlikely(!tinfo)) { - return TSDB_CODE_OUT_OF_MEMORY; - } - tinfo->tags = taosArrayDup(preLineKV, NULL); - - smlSetCTableName(tinfo); - if (info->dataFormat) { - info->currSTableMeta->uid = tinfo->uid; - tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); - if (tinfo->tableDataCtx == NULL) { - smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL); - return TSDB_CODE_SML_INVALID_DATA; - } - } - - SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo)); - *key = *elements; - tinfo->key = key; - nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet); - } - if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx; +//SArray *smlJsonParseTags(char *start, char *end){ +// SArray *tags = taosArrayInit(4, sizeof(SSmlKv)); +// while(start < end){ +// SSmlKv kv = {0}; +// kv.type = TSDB_DATA_TYPE_NCHAR; +// bool isInQuote = false; +// while(start < end){ +// if(unlikely(!isInQuote && *start == '"')){ +// start++; +// kv.key = start; +// isInQuote = true; +// continue; +// } +// if(unlikely(isInQuote && *start == '"')){ +// kv.keyLen = start - kv.key; +// start++; +// break; +// } +// start++; +// } +// bool hasColon = false; +// while(start < end){ +// if(unlikely(!hasColon && *start == ':')){ +// start++; +// hasColon = true; +// continue; +// } +// if(unlikely(hasColon && kv.value == NULL && (*start > 32 && *start != '"'))){ +// kv.value = start; +// start++; +// continue; +// } +// +// if(unlikely(hasColon && kv.value != NULL && (*start == '"' || *start == ',' || *start == '}'))){ +// kv.length = start - kv.value; +// taosArrayPush(tags, &kv); +// start++; +// break; +// } +// start++; +// } +// } +// return tags; +//} - return ret; -} +//static int32_t smlParseTagsFromJSON(SSmlHandle *info, SSmlLineInfo *elements) { +// int32_t ret = TSDB_CODE_SUCCESS; +// +// if(is_same_child_table_telnet(elements, &info->preLine) == 0){ +// return TSDB_CODE_SUCCESS; +// } +// +// bool isSameMeasure = IS_SAME_SUPER_TABLE; +// +// int cnt = 0; +// SArray *preLineKV = info->preLineTagKV; +// bool isSuperKVInit = true; +// SArray *superKV = NULL; +// if(info->dataFormat){ +// if(unlikely(!isSameMeasure)){ +// SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); +// +// if(unlikely(sMeta == NULL)){ +// sMeta = smlBuildSTableMeta(info->dataFormat); +// STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); +// sMeta->tableMeta = pTableMeta; +// if(pTableMeta == NULL){ +// info->dataFormat = false; +// info->reRun = true; +// return TSDB_CODE_SUCCESS; +// } +// nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL); +// } +// info->currSTableMeta = sMeta->tableMeta; +// superKV = sMeta->tags; +// +// if(unlikely(taosArrayGetSize(superKV) == 0)){ +// isSuperKVInit = false; +// } +// taosArraySetSize(preLineKV, 0); +// } +// }else{ +// taosArraySetSize(preLineKV, 0); +// } +// +// SArray *tags = smlJsonParseTags(elements->tags, elements->tags + elements->tagsLen); +// int32_t tagNum = taosArrayGetSize(tags); +// if (tagNum == 0) { +// uError("SML:tag is empty:%s", elements->tags) +// taosArrayDestroy(tags); +// return TSDB_CODE_SML_INVALID_DATA; +// } +// for (int32_t i = 0; i < tagNum; ++i) { +// SSmlKv kv = *(SSmlKv*)taosArrayGet(tags, i); +// +// if(info->dataFormat){ +// if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){ +// info->dataFormat = false; +// info->reRun = true; +// taosArrayDestroy(tags); +// return TSDB_CODE_SUCCESS; +// } +// +// if(isSameMeasure){ +// if(unlikely(cnt >= taosArrayGetSize(preLineKV))) { +// info->dataFormat = false; +// info->reRun = true; +// taosArrayDestroy(tags); +// return TSDB_CODE_SUCCESS; +// } +// SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt); +// if(unlikely(kv.length > preKV->length)){ +// preKV->length = kv.length; +// SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); +// ASSERT(tableMeta != NULL); +// +// SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt); +// oldKV->length = kv.length; +// info->needModifySchema = true; +// } +// if(unlikely(!IS_SAME_KEY)){ +// info->dataFormat = false; +// info->reRun = true; +// taosArrayDestroy(tags); +// return TSDB_CODE_SUCCESS; +// } +// }else{ +// if(isSuperKVInit){ +// if(unlikely(cnt >= taosArrayGetSize(superKV))) { +// info->dataFormat = false; +// info->reRun = true; +// taosArrayDestroy(tags); +// return TSDB_CODE_SUCCESS; +// } +// SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt); +// if(unlikely(kv.length > preKV->length)) { +// preKV->length = kv.length; +// }else{ +// kv.length = preKV->length; +// } +// info->needModifySchema = true; +// +// if(unlikely(!IS_SAME_KEY)){ +// info->dataFormat = false; +// info->reRun = true; +// taosArrayDestroy(tags); +// return TSDB_CODE_SUCCESS; +// } +// }else{ +// taosArrayPush(superKV, &kv); +// } +// taosArrayPush(preLineKV, &kv); +// } +// }else{ +// taosArrayPush(preLineKV, &kv); +// } +// cnt++; +// } +// taosArrayDestroy(tags); +// +// SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet); +// if (unlikely(tinfo == NULL)) { +// tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen); +// if (unlikely(!tinfo)) { +// return TSDB_CODE_OUT_OF_MEMORY; +// } +// tinfo->tags = taosArrayDup(preLineKV, NULL); +// +// smlSetCTableName(tinfo); +// if (info->dataFormat) { +// info->currSTableMeta->uid = tinfo->uid; +// tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); +// if (tinfo->tableDataCtx == NULL) { +// smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL); +// return TSDB_CODE_SML_INVALID_DATA; +// } +// } +// +// SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo)); +// *key = *elements; +// tinfo->key = key; +// nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet); +// } +// if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx; +// +// return ret; +//} static char* smlJsonGetObj(char *payload){ int leftBracketCnt = 0; @@ -233,7 +240,7 @@ static char* smlJsonGetObj(char *payload){ return NULL; } -void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){ +int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){ int index = 0; while(*(*start)){ if((*start)[0] != '"'){ @@ -241,10 +248,11 @@ void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){ continue; } - if(unlikely(index >= 4)) { - uError("index >= 4, %s", *start) + if(unlikely(index >= OTD_JSON_FIELDS_NUM)) { + uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) break; } + char *sTmp = *start; if((*start)[1] == 'm' && (*start)[2] == 'e' && (*start)[3] == 't' && (*start)[4] == 'r' && (*start)[5] == 'i' && (*start)[6] == 'c' && (*start)[7] == '"'){ @@ -333,9 +341,15 @@ void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){ } (*start)++; } + + if(unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL || element->measure == NULL || element->timestamp == NULL) { + uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM) + return -1; + } + return 0; } -void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){ +int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){ int index = 0; while(*(*start)){ if((*start)[0] != '"'){ @@ -343,10 +357,11 @@ void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){ continue; } - if(unlikely(index >= 4)) { - uError("index >= 4, %s", *start) + if(unlikely(index >= OTD_JSON_FIELDS_NUM)) { + uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) break; } + if((*start)[1] == 'm'){ (*start) += offset[index++]; element->measure = *start; @@ -393,30 +408,711 @@ void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){ } (*start)++; } + + if(unlikely(index != OTD_JSON_FIELDS_NUM)) { + uError("elements != %d", OTD_JSON_FIELDS_NUM) + return -1; + } + return 0; +} + +static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) { + elements->measureLen = strlen(metric->valuestring); + if (IS_INVALID_TABLE_LEN(elements->measureLen)) { + uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id); + return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; + } + + elements->measure = metric->valuestring; + return TSDB_CODE_SUCCESS; +} + +const char *jsonName[OTD_JSON_FIELDS_NUM] = {"metric", "timestamp", "value", "tags"}; +static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks){ + cJSON *child = root->child; + + for (int i = 0; i < OTD_JSON_FIELDS_NUM; ++i) { + while(child != NULL) + { + if(strcasecmp(child->string, jsonName[i]) == 0){ + *marks[i] = child; + break; + } + child = child->next; + } + if(*marks[i] == NULL){ + uError("smlGetJsonElements error, not find mark:%s", jsonName[i]); + return -1; + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) { + if (strcasecmp(typeStr, "bool") != 0) { + uError("OTD:invalid type(%s) for JSON Bool", typeStr); + return TSDB_CODE_TSC_INVALID_JSON_TYPE; + } + pVal->type = TSDB_DATA_TYPE_BOOL; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->i = value->valueint; + + return TSDB_CODE_SUCCESS; +} + +static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) { + // tinyint + if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) { + if (!IS_VALID_TINYINT(value->valuedouble)) { + uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + pVal->type = TSDB_DATA_TYPE_TINYINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->i = value->valuedouble; + return TSDB_CODE_SUCCESS; + } + // smallint + if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) { + if (!IS_VALID_SMALLINT(value->valuedouble)) { + uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + pVal->type = TSDB_DATA_TYPE_SMALLINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->i = value->valuedouble; + return TSDB_CODE_SUCCESS; + } + // int + if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) { + if (!IS_VALID_INT(value->valuedouble)) { + uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + pVal->type = TSDB_DATA_TYPE_INT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->i = value->valuedouble; + return TSDB_CODE_SUCCESS; + } + // bigint + if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) { + pVal->type = TSDB_DATA_TYPE_BIGINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + if (value->valuedouble >= (double)INT64_MAX) { + pVal->i = INT64_MAX; + } else if (value->valuedouble <= (double)INT64_MIN) { + pVal->i = INT64_MIN; + } else { + pVal->i = value->valuedouble; + } + return TSDB_CODE_SUCCESS; + } + // float + if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) { + if (!IS_VALID_FLOAT(value->valuedouble)) { + uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + pVal->type = TSDB_DATA_TYPE_FLOAT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->f = value->valuedouble; + return TSDB_CODE_SUCCESS; + } + // double + if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) { + pVal->type = TSDB_DATA_TYPE_DOUBLE; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + pVal->d = value->valuedouble; + return TSDB_CODE_SUCCESS; + } + + // if reach here means type is unsupported + uError("OTD:invalid type(%s) for JSON Number", typeStr); + return TSDB_CODE_TSC_INVALID_JSON_TYPE; +} + +static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) { + if (strcasecmp(typeStr, "binary") == 0) { + pVal->type = TSDB_DATA_TYPE_BINARY; + } else if (strcasecmp(typeStr, "nchar") == 0) { + pVal->type = TSDB_DATA_TYPE_NCHAR; + } else { + uError("OTD:invalid type(%s) for JSON String", typeStr); + return TSDB_CODE_TSC_INVALID_JSON_TYPE; + } + pVal->length = (int16_t)strlen(value->valuestring); + + if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) { + return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; + } + if (pVal->type == TSDB_DATA_TYPE_NCHAR && + pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { + return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; + } + + pVal->value = value->valuestring; + return TSDB_CODE_SUCCESS; +} + +static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) { + int32_t ret = TSDB_CODE_SUCCESS; + int32_t size = cJSON_GetArraySize(root); + + if (size != OTD_JSON_SUB_FIELDS_NUM) { + return TSDB_CODE_TSC_INVALID_JSON; + } + + cJSON *value = cJSON_GetObjectItem(root, "value"); + if (value == NULL) { + return TSDB_CODE_TSC_INVALID_JSON; + } + + cJSON *type = cJSON_GetObjectItem(root, "type"); + if (!cJSON_IsString(type)) { + return TSDB_CODE_TSC_INVALID_JSON; + } + + switch (value->type) { + case cJSON_True: + case cJSON_False: { + ret = smlConvertJSONBool(kv, type->valuestring, value); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + break; + } + case cJSON_Number: { + ret = smlConvertJSONNumber(kv, type->valuestring, value); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + break; + } + case cJSON_String: { + ret = smlConvertJSONString(kv, type->valuestring, value); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + break; + } + default: + return TSDB_CODE_TSC_INVALID_JSON_TYPE; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) { + switch (root->type) { + case cJSON_True: + case cJSON_False: { + kv->type = TSDB_DATA_TYPE_BOOL; + kv->length = (int16_t)tDataTypes[kv->type].bytes; + kv->i = root->valueint; + break; + } + case cJSON_Number: { + kv->type = TSDB_DATA_TYPE_DOUBLE; + kv->length = (int16_t)tDataTypes[kv->type].bytes; + kv->d = root->valuedouble; + break; + } + case cJSON_String: { + /* set default JSON type to binary/nchar according to + * user configured parameter tsDefaultJSONStrType + */ + + char *tsDefaultJSONStrType = "nchar"; // todo + smlConvertJSONString(kv, tsDefaultJSONStrType, root); + break; + } + case cJSON_Object: { + int32_t ret = smlParseValueFromJSONObj(root, kv); + if (ret != TSDB_CODE_SUCCESS) { + uError("OTD:Failed to parse value from JSON Obj"); + return ret; + } + break; + } + default: + return TSDB_CODE_TSC_INVALID_JSON; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) { + int32_t ret = TSDB_CODE_SUCCESS; + + bool isSameMeasure = IS_SAME_SUPER_TABLE; + + int cnt = 0; + SArray *preLineKV = info->preLineTagKV; + bool isSuperKVInit = true; + SArray *superKV = NULL; + if(info->dataFormat){ + if(unlikely(!isSameMeasure)){ + SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); + + if(unlikely(sMeta == NULL)){ + sMeta = smlBuildSTableMeta(info->dataFormat); + STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); + sMeta->tableMeta = pTableMeta; + if(pTableMeta == NULL){ + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL); + } + info->currSTableMeta = sMeta->tableMeta; + superKV = sMeta->tags; + + if(unlikely(taosArrayGetSize(superKV) == 0)){ + isSuperKVInit = false; + } + taosArraySetSize(preLineKV, 0); + } + }else{ + taosArraySetSize(preLineKV, 0); + } + + int32_t tagNum = cJSON_GetArraySize(tags); + if(unlikely(tagNum == 0)){ + uError("SML:Tag should not be empty"); + return TSDB_CODE_TSC_INVALID_JSON; + } + for (int32_t i = 0; i < tagNum; ++i) { + cJSON *tag = cJSON_GetArrayItem(tags, i); + if (unlikely(tag == NULL)) { + return TSDB_CODE_TSC_INVALID_JSON; + } +// if(unlikely(tag == cMeasure)) continue; + size_t keyLen = strlen(tag->string); + if (unlikely(IS_INVALID_COL_LEN(keyLen))) { + uError("OTD:Tag key length is 0 or too large than 64"); + return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; + } + + // add kv to SSmlKv + SSmlKv kv ={.key = tag->string, .keyLen = keyLen}; + // value + ret = smlParseValueFromJSON(tag, &kv); + if (unlikely(ret != TSDB_CODE_SUCCESS)) { + return ret; + } + + if(info->dataFormat){ + if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){ + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + + if(isSameMeasure){ + if(unlikely(cnt >= taosArrayGetSize(preLineKV))) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt); + if(unlikely(kv.length > preKV->length)){ + preKV->length = kv.length; + SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); + ASSERT(tableMeta != NULL); + + SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt); + oldKV->length = kv.length; + info->needModifySchema = true; + } + if(unlikely(!IS_SAME_KEY)){ + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + }else{ + if(isSuperKVInit){ + if(unlikely(cnt >= taosArrayGetSize(superKV))) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt); + if(unlikely(kv.length > preKV->length)) { + preKV->length = kv.length; + }else{ + kv.length = preKV->length; + } + info->needModifySchema = true; + + if(unlikely(!IS_SAME_KEY)){ + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + }else{ + taosArrayPush(superKV, &kv); + } + taosArrayPush(preLineKV, &kv); + } + }else{ + taosArrayPush(preLineKV, &kv); + } + cnt++; + } + + SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet); + if (unlikely(tinfo == NULL)) { + tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen); + if (unlikely(!tinfo)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tinfo->tags = taosArrayDup(preLineKV, NULL); + + smlSetCTableName(tinfo); + if (info->dataFormat) { + info->currSTableMeta->uid = tinfo->uid; + tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); + if (tinfo->tableDataCtx == NULL) { + smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL); + return TSDB_CODE_SML_INVALID_DATA; + } + } + + SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo)); + *key = *elements; + if(info->parseJsonByLib){ + key->tags = taosMemoryMalloc(elements->tagsLen + 1); + memcpy(key->tags, elements->tags, elements->tagsLen); + key->tags[elements->tagsLen] = 0; + } + tinfo->key = key; + nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet); + } + if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx; + + return ret; +} + +static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) { + int32_t size = cJSON_GetArraySize(root); + if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); + return -1; + } + + cJSON *value = cJSON_GetObjectItem(root, "value"); + if (unlikely(!cJSON_IsNumber(value))) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); + return -1; + } + + cJSON *type = cJSON_GetObjectItem(root, "type"); + if (unlikely(!cJSON_IsString(type))) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL); + return -1; + } + + double timeDouble = value->valuedouble; + if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) { + smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); + return -1; + } + + if (timeDouble == 0) { + return taosGetTimestampNs()/smlFactorNS[toPrecision]; + } + + if (timeDouble < 0) { + return timeDouble; + } + + int64_t tsInt64 = timeDouble; + size_t typeLen = strlen(type->valuestring); + if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) { + // seconds + int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS; + if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){ + return tsInt64 * smlFactorS[toPrecision]; + } + return -1; + } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) { + switch (type->valuestring[0]) { + case 'm': + case 'M': + // milliseconds + return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision); + break; + case 'u': + case 'U': + // microseconds + return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision); + break; + case 'n': + case 'N': + return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision); + break; + default: + return -1; + } + } else { + return -1; + } +} + +uint8_t smlGetTimestampLen(int64_t num) { + uint8_t len = 0; + while ((num /= 10) != 0) { + len++; + } + len++; + return len; +} + +static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) { + // Timestamp must be the first KV to parse + int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO; + if (cJSON_IsNumber(timestamp)) { + // timestamp value 0 indicates current system time + double timeDouble = timestamp->valuedouble; + if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) { + smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); + return -1; + } + + if (unlikely(timeDouble < 0)) { + smlBuildInvalidDataMsg(&info->msgBuf, + "timestamp is negative", NULL); + return timeDouble; + }else if (unlikely(timeDouble == 0)) { + return taosGetTimestampNs()/smlFactorNS[toPrecision]; + } + + uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble); + + int8_t fromPrecision = smlGetTsTypeByLen(tsLen); + if (unlikely(fromPrecision == -1)) { + smlBuildInvalidDataMsg(&info->msgBuf, + "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL); + return -1; + } + int64_t tsInt64 = timeDouble; + if(fromPrecision == TSDB_TIME_PRECISION_SECONDS){ + if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){ + return tsInt64 * smlFactorS[toPrecision]; + } + return -1; + }else{ + return convertTimePrecision(timeDouble, fromPrecision, toPrecision); + } + } else if (cJSON_IsObject(timestamp)) { + return smlParseTSFromJSONObj(info, timestamp, toPrecision); + } else { + smlBuildInvalidDataMsg(&info->msgBuf, + "invalidate json", NULL); + return -1; + } +} + +static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) { + int32_t ret = TSDB_CODE_SUCCESS; + + cJSON *metricJson = NULL; + cJSON *tsJson = NULL; + cJSON *valueJson = NULL; + cJSON *tagsJson = NULL; + + int32_t size = cJSON_GetArraySize(root); + // outmost json fields has to be exactly 4 + if (size != OTD_JSON_FIELDS_NUM) { + uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size); + return TSDB_CODE_TSC_INVALID_JSON; + } + + cJSON **marks[OTD_JSON_FIELDS_NUM] = {&metricJson, &tsJson, &valueJson, &tagsJson}; + ret = smlGetJsonElements(root, marks); + if (unlikely(ret != TSDB_CODE_SUCCESS)) { + return ret; + } + + // Parse metric + ret = smlParseMetricFromJSON(info, metricJson, elements); + if (unlikely(ret != TSDB_CODE_SUCCESS)) { + uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id); + return ret; + } + + // Parse metric value + SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN}; + ret = smlParseValueFromJSON(valueJson, &kv); + if (unlikely(ret)) { + uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id); + return ret; + } + + // Parse tags + elements->tags = cJSON_PrintUnformatted(tagsJson); + elements->tagsLen = strlen(elements->tags); + if(is_same_child_table_telnet(elements, &info->preLine) != 0) { + ret = smlParseTagsFromJSON(info, tagsJson, elements); + if (unlikely(ret)) { + uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); + taosMemoryFree(elements->tags); + elements->tags = NULL; + return ret; + } + } + + if(info->dataFormat){ + taosMemoryFree(elements->tags); + elements->tags = NULL; + } + + if(unlikely(info->reRun)){ + return TSDB_CODE_SUCCESS; + } + + // Parse timestamp + // notice!!! put ts back to tag to ensure get meta->precision + int64_t ts = smlParseTSFromJSON(info, tsJson); + if (unlikely(ts < 0)) { + uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id); + return TSDB_CODE_INVALID_TIMESTAMP; + } + SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .type = TSDB_DATA_TYPE_TIMESTAMP, .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; + + if(info->dataFormat){ + ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0); + if(ret == TSDB_CODE_SUCCESS){ + ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1); + } + if(ret == TSDB_CODE_SUCCESS){ + ret = smlBuildRow(info->currTableDataCtx); + } + if (unlikely(ret != TSDB_CODE_SUCCESS)) { + smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL); + return ret; + } + }else{ + if(elements->colArray == NULL){ + elements->colArray = taosArrayInit(16, sizeof(SSmlKv)); + } + taosArrayPush(elements->colArray, &kvTs); + taosArrayPush(elements->colArray, &kv); + } + info->preLine = *elements; + + return TSDB_CODE_SUCCESS; +} + +static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) { + int32_t payloadNum = 0; + int32_t ret = TSDB_CODE_SUCCESS; + + if (unlikely(payload == NULL)) { + uError("SML:0x%" PRIx64 " empty JSON Payload", info->id); + return TSDB_CODE_TSC_INVALID_JSON; + } + + info->root = cJSON_Parse(payload); + if (unlikely(info->root == NULL)) { + uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload); + return TSDB_CODE_TSC_INVALID_JSON; + } + + // multiple data points must be sent in JSON array + if (cJSON_IsArray(info->root)) { + payloadNum = cJSON_GetArraySize(info->root); + } else if (cJSON_IsObject(info->root)) { + payloadNum = 1; + } else { + uError("SML:0x%" PRIx64 " Invalid JSON Payload 3:%s", info->id, payload); + return TSDB_CODE_TSC_INVALID_JSON; + } + + info->lineNum = payloadNum; + info->dataFormat = true; + if (unlikely(info->lines != NULL)) { + taosMemoryFree(info->lines); + info->lines = NULL; + } + ret = smlClearForRerun(info); + if(ret != TSDB_CODE_SUCCESS){ + return ret; + } + + info->parseJsonByLib = true; + cJSON *head = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : info->root->child; + + int cnt = 0; + cJSON *dataPoint = head; + while (dataPoint) { + if(info->dataFormat) { + SSmlLineInfo element = {0}; + ret = smlParseJSONStringExt(info, dataPoint, &element); + }else{ + ret = smlParseJSONStringExt(info, dataPoint, info->lines + cnt); + } + if (unlikely(ret != TSDB_CODE_SUCCESS)) { + uError("SML:0x%" PRIx64 " Invalid JSON Payload 2:%s", info->id, payload); + return ret; + } + + if(unlikely(info->reRun)){ + cnt = 0; + dataPoint = head; + info->lineNum = payloadNum; + ret = smlClearForRerun(info); + if(ret != TSDB_CODE_SUCCESS){ + return ret; + } + continue; + } + cnt++; + dataPoint = dataPoint->next; + } + + return TSDB_CODE_SUCCESS; } static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *elements) { int32_t ret = TSDB_CODE_SUCCESS; if(info->offset[0] == 0){ - smlJsonParseObjFirst(start, elements, info->offset); + ret = smlJsonParseObjFirst(start, elements, info->offset); }else{ - smlJsonParseObj(start, elements, info->offset); + ret = smlJsonParseObj(start, elements, info->offset); } - if(**start == '\0') return TSDB_CODE_SUCCESS; + + if (ret != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if(unlikely(**start == '\0' && elements->measure == NULL)) return TSDB_CODE_SUCCESS; SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen}; - if (smlParseNumber(&kv, &info->msgBuf)) { - kv.length = (int16_t)tDataTypes[kv.type].bytes; - }else{ + if (elements->colsLen == 0 || smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) { + uError("SML:cols invalidate:%s", elements->cols); return TSDB_CODE_TSC_INVALID_VALUE; } // Parse tags - ret = smlParseTagsFromJSON(info, elements); - if (unlikely(ret)) { - uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); - return ret; + if(is_same_child_table_telnet(elements, &info->preLine) != 0){ + char tmp = *(elements->tags + elements->tagsLen); + *(elements->tags + elements->tagsLen) = 0; + cJSON* tagsJson = cJSON_Parse(elements->tags); + *(elements->tags + elements->tagsLen) = tmp; + if (unlikely(tagsJson == NULL)) { + uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, elements->tags); + return TSDB_CODE_TSC_INVALID_JSON; + } + + ret = smlParseTagsFromJSON(info, tagsJson, elements); + cJSON_free(tagsJson); + if (unlikely(ret)) { + uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); + return ret; + } } if(unlikely(info->reRun)){ @@ -472,17 +1168,16 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) { void* tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo)); if(tmp != NULL){ info->lines = (SSmlLineInfo*)tmp; + memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo)); } } ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt); } if (unlikely(ret != TSDB_CODE_SUCCESS)) { - uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id); - return ret; + uError("SML:0x%" PRIx64 " Invalid JSON Payload 1:%s", info->id, payload); + return smlParseJSONExt(info, payload); } - if(*dataPointStart == '\0') break; - if(unlikely(info->reRun)){ cnt = 0; dataPointStart = payload; @@ -493,6 +1188,8 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) { } continue; } + + if(*dataPointStart == '\0') break; cnt++; } info->lineNum = cnt; diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index d608447506778cad1e964cdd79183d87be0a88e8..4ff638b802723fae0ea93f9ca02f4d2bb5179751 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -411,28 +411,28 @@ TEST(testCase, smlParseCols_Test) { smlDestroyInfo(info); } -//TEST(testCase, smlGetTimestampLen_Test) { -// uint8_t len = smlGetTimestampLen(0); -// ASSERT_EQ(len, 1); -// -// len = smlGetTimestampLen(1); -// ASSERT_EQ(len, 1); -// -// len = smlGetTimestampLen(10); -// ASSERT_EQ(len, 2); -// -// len = smlGetTimestampLen(390); -// ASSERT_EQ(len, 3); -// -// len = smlGetTimestampLen(-1); -// ASSERT_EQ(len, 1); -// -// len = smlGetTimestampLen(-10); -// ASSERT_EQ(len, 2); -// -// len = smlGetTimestampLen(-390); -// ASSERT_EQ(len, 3); -//} +TEST(testCase, smlGetTimestampLen_Test) { + uint8_t len = smlGetTimestampLen(0); + ASSERT_EQ(len, 1); + + len = smlGetTimestampLen(1); + ASSERT_EQ(len, 1); + + len = smlGetTimestampLen(10); + ASSERT_EQ(len, 2); + + len = smlGetTimestampLen(390); + ASSERT_EQ(len, 3); + + len = smlGetTimestampLen(-1); + ASSERT_EQ(len, 1); + + len = smlGetTimestampLen(-10); + ASSERT_EQ(len, 2); + + len = smlGetTimestampLen(-390); + ASSERT_EQ(len, 3); +} TEST(testCase, smlParseNumber_Test) { SSmlKv kv = {0}; diff --git a/tests/system-test/1-insert/opentsdb_json_taosc_insert.py b/tests/system-test/1-insert/opentsdb_json_taosc_insert.py index 93f3c7410df5bfc0ffef836d8353249f8de0c965..da3c659e178bbe2bb9a0d38103abad718f058669 100644 --- a/tests/system-test/1-insert/opentsdb_json_taosc_insert.py +++ b/tests/system-test/1-insert/opentsdb_json_taosc_insert.py @@ -1719,7 +1719,6 @@ class TDTestCase: print(err.errno) def runAll(self): - """ for value_type in ["obj", "default"]: self.initCheckCase(value_type) self.symbolsCheckCase(value_type) @@ -1759,7 +1758,7 @@ class TDTestCase: self.batchErrorInsertCheckCase() self.chineseCheckCase() # self.spellCheckCase() - self.tbnameTagsColsNameCheckCase() + # self.tbnameTagsColsNameCheckCase() # # MultiThreads # self.sStbStbDdataInsertMultiThreadCheckCase() # self.sStbStbDdataAtInsertMultiThreadCheckCase() @@ -1772,7 +1771,7 @@ class TDTestCase: # self.sStbStbDdataDtsMtInsertMultiThreadCheckCase() # self.sStbDtbDdataDtsMtInsertMultiThreadCheckCase() # self.lengthIcreaseCrashCheckCase() - """ + def run(self): print("running {}".format(__file__)) self.createDb() diff --git a/tests/system-test/1-insert/opentsdb_telnet_line_taosc_insert.py b/tests/system-test/1-insert/opentsdb_telnet_line_taosc_insert.py index a17a2d9514a50a06f594fca2990048b1876ef441..351cf49e3a217a44e93bbaf9c8c69ce2fa76c190 100644 --- a/tests/system-test/1-insert/opentsdb_telnet_line_taosc_insert.py +++ b/tests/system-test/1-insert/opentsdb_telnet_line_taosc_insert.py @@ -243,7 +243,7 @@ class TDTestCase: if t_add_tag is not None: sql_seq = f'{stb_name} {ts} {value} t0={t0} t1={t1} t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8} t9={t8}' if id_change_tag is not None: - sql_seq = f'{stb_name} {ts} {value} t0={t0} {id}={tb_name} t1={t1} t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8}' + sql_seq = f'{stb_name} {ts} {value} {id}={tb_name} t0={t0} t1={t1} t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8}' if id_double_tag is not None: sql_seq = f'{stb_name} {ts} {value} {id}=\"{tb_name}_1\" t0={t0} t1={t1} {id}=\"{tb_name}_2\" t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8}' if t_add_tag is not None: @@ -1126,9 +1126,9 @@ class TDTestCase: self._conn.schemaless_insert([input_sql], TDSmlProtocolType.TELNET.value, None) query_sql = 'select * from `rFa$sta`' query_res = tdSql.query(query_sql, True) - tdSql.checkEqual(query_res, [(datetime.datetime(2021, 7, 11, 20, 33, 54), 9.223372036854776e+18, '2147483647i32', 'L"ncharTagValue"', '32767i16', '9223372036854775807i64', '22.123456789f64', '"ddzhiksj"', '11.12345f32', 'true', '127Ii8')]) + tdSql.checkEqual(query_res, [(datetime.datetime(2021, 7, 11, 20, 33, 54), 9.223372036854776e+18, 'true', '127Ii8', '32767i16', '2147483647i32', '9223372036854775807i64', '11.12345f32', '22.123456789f64', '"ddzhiksj"', 'L"ncharTagValue"')]) col_tag_res = tdSql.getColNameList(query_sql) - tdSql.checkEqual(col_tag_res, ['_ts', '_value', '"t$3"', 't!@#$%^&*()_+[];:<>?,9', 't#2', 't%4', 't&6', 't*7', 't^5', 'Tt!0', 'tT@1']) + tdSql.checkEqual(col_tag_res, ['_ts', '_value', 'Tt!0', 'tT@1', 't#2', '"t$3"', 't%4', 't^5', 't&6', 't*7', 't!@#$%^&*()_+[];:<>?,9']) tdSql.execute('drop table `rFa$sta`') def tcpKeywordsCheckCase(self, protocol="telnet-tcp"): @@ -1207,7 +1207,6 @@ class TDTestCase: tdLog.info(f'{sys._getframe().f_code.co_name}() function is running') tdCom.cleanTb(dbname="test") input_sql = self.genSqlList()[0] - print(input_sql) self.multiThreadRun(self.genMultiThreadSeq(input_sql)) tdSql.query(f"show tables;") tdSql.checkRows(5) @@ -1416,8 +1415,8 @@ class TDTestCase: self.symbolsCheckCase() self.tsCheckCase() self.openTstbTelnetTsCheckCase() - #self.idSeqCheckCase() - #self.idLetterCheckCase() + # self.idSeqCheckCase() + self.idLetterCheckCase() self.noIdCheckCase() self.maxColTagCheckCase() self.stbTbNameCheckCase() @@ -1450,7 +1449,7 @@ class TDTestCase: self.spellCheckCase() self.pointTransCheckCase() self.defaultTypeCheckCase() - #self.tbnameTagsColsNameCheckCase() + self.tbnameTagsColsNameCheckCase() # # # MultiThreads # self.stbInsertMultiThreadCheckCase() # self.sStbStbDdataInsertMultiThreadCheckCase()