未验证 提交 28b0cf94 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #19295 from taosdata/refact/submit_req_marks

opti:json parse logic for schemaless
...@@ -69,6 +69,7 @@ extern "C" { ...@@ -69,6 +69,7 @@ extern "C" {
#define VALUE "_value" #define VALUE "_value"
#define VALUE_LEN 6 #define VALUE_LEN 6
#define OTD_JSON_FIELDS_NUM 4
#define MAX_RETRY_TIMES 5 #define MAX_RETRY_TIMES 5
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
...@@ -177,9 +178,10 @@ typedef struct { ...@@ -177,9 +178,10 @@ typedef struct {
int32_t lineNum; int32_t lineNum;
SSmlMsgBuf msgBuf; SSmlMsgBuf msgBuf;
// cJSON *root; // for parse json cJSON *root; // for parse json
int8_t offset[4]; int8_t offset[OTD_JSON_FIELDS_NUM];
SSmlLineInfo *lines; // element is SSmlLineInfo SSmlLineInfo *lines; // element is SSmlLineInfo
bool parseJsonByLib;
// //
SArray *preLineTagKV; SArray *preLineTagKV;
...@@ -206,9 +208,9 @@ typedef int32_t (*_equal_fn_sml)(const void *, const void *); ...@@ -206,9 +208,9 @@ typedef int32_t (*_equal_fn_sml)(const void *, const void *);
SSmlHandle *smlBuildSmlInfo(TAOS *taos); SSmlHandle *smlBuildSmlInfo(TAOS *taos);
void smlDestroyInfo(SSmlHandle *info); void smlDestroyInfo(SSmlHandle *info);
void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset); int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset);
void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset); int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset);
SArray *smlJsonParseTags(char *start, char *end); //SArray *smlJsonParseTags(char *start, char *end);
bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg); bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg);
void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn); void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn);
int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn); int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn);
...@@ -226,6 +228,7 @@ int32_t is_same_child_table_telnet(const void *a, const void *b); ...@@ -226,6 +228,7 @@ int32_t is_same_child_table_telnet(const void *a, const void *b);
int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len); int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len);
int32_t smlClearForRerun(SSmlHandle *info); int32_t smlClearForRerun(SSmlHandle *info);
int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg); int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg);
uint8_t smlGetTimestampLen(int64_t num);
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements); int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements); int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
......
...@@ -1008,12 +1008,16 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols ...@@ -1008,12 +1008,16 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void smlDestroyTableInfo(SSmlTableInfo *tag) { static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) { for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i); SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
} }
if(info->parseJsonByLib){
SSmlLineInfo *key = (SSmlLineInfo *)(tag->key);
if(key != NULL) taosMemoryFree(key->tags);
}
taosMemoryFree(tag->key); taosMemoryFree(tag->key);
taosArrayDestroy(tag->cols); taosArrayDestroy(tag->cols);
taosArrayDestroy(tag->tags); taosArrayDestroy(tag->tags);
...@@ -1028,7 +1032,7 @@ void smlDestroyInfo(SSmlHandle *info) { ...@@ -1028,7 +1032,7 @@ void smlDestroyInfo(SSmlHandle *info) {
NodeList *tmp = info->childTables; NodeList *tmp = info->childTables;
while (tmp) { while (tmp) {
if (tmp->data.used) { if (tmp->data.used) {
smlDestroyTableInfo((SSmlTableInfo *)tmp->data.value); smlDestroyTableInfo(info, (SSmlTableInfo *)tmp->data.value);
} }
NodeList *t = tmp->next; NodeList *t = tmp->next;
taosMemoryFree(tmp); taosMemoryFree(tmp);
...@@ -1055,6 +1059,9 @@ void smlDestroyInfo(SSmlHandle *info) { ...@@ -1055,6 +1059,9 @@ void smlDestroyInfo(SSmlHandle *info) {
if (!info->dataFormat) { if (!info->dataFormat) {
for (int i = 0; i < info->lineNum; i++) { for (int i = 0; i < info->lineNum; i++) {
taosArrayDestroy(info->lines[i].colArray); taosArrayDestroy(info->lines[i].colArray);
if(info->parseJsonByLib){
taosMemoryFree(info->lines[i].tags);
}
} }
taosMemoryFree(info->lines); taosMemoryFree(info->lines);
} }
...@@ -1251,7 +1258,7 @@ int32_t smlClearForRerun(SSmlHandle *info) { ...@@ -1251,7 +1258,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
NodeList *pList = info->childTables; NodeList *pList = info->childTables;
while (pList) { while (pList) {
if (pList->data.used) { if (pList->data.used) {
smlDestroyTableInfo((SSmlTableInfo *)pList->data.value); smlDestroyTableInfo(info, (SSmlTableInfo *)pList->data.value);
pList->data.used = false; pList->data.used = false;
} }
pList = pList->next; pList = pList->next;
...@@ -1267,11 +1274,13 @@ int32_t smlClearForRerun(SSmlHandle *info) { ...@@ -1267,11 +1274,13 @@ int32_t smlClearForRerun(SSmlHandle *info) {
pList = pList->next; pList = pList->next;
} }
if (unlikely(info->lines != NULL)) { if (!info->dataFormat){
uError("SML:0x%" PRIx64 " info->lines != NULL", info->id); if (unlikely(info->lines != NULL)) {
return TSDB_CODE_SML_INVALID_DATA; uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
} }
info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
memset(&info->preLine, 0, sizeof(SSmlLineInfo)); memset(&info->preLine, 0, sizeof(SSmlLineInfo));
info->currSTableMeta = NULL; info->currSTableMeta = NULL;
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
#include <string.h> #include <string.h>
#include "clientSml.h" #include "clientSml.h"
#define OTD_JSON_SUB_FIELDS_NUM 2
#define JUMP_JSON_SPACE(start) \ #define JUMP_JSON_SPACE(start) \
while(*(start)){\ while(*(start)){\
if(unlikely(*(start) > 32))\ if(unlikely(*(start) > 32))\
...@@ -27,188 +29,193 @@ while(*(start)){\ ...@@ -27,188 +29,193 @@ while(*(start)){\
(start)++;\ (start)++;\
} }
SArray *smlJsonParseTags(char *start, char *end){ //SArray *smlJsonParseTags(char *start, char *end){
SArray *tags = taosArrayInit(4, sizeof(SSmlKv)); // SArray *tags = taosArrayInit(4, sizeof(SSmlKv));
while(start < end){ // while(start < end){
SSmlKv kv = {0}; // SSmlKv kv = {0};
kv.type = TSDB_DATA_TYPE_NCHAR; // kv.type = TSDB_DATA_TYPE_NCHAR;
bool isInQuote = false; // bool isInQuote = false;
while(start < end){ // while(start < end){
if(unlikely(!isInQuote && *start == '"')){ // if(unlikely(!isInQuote && *start == '"')){
start++; // start++;
kv.key = start; // kv.key = start;
isInQuote = true; // isInQuote = true;
continue; // continue;
} // }
if(unlikely(isInQuote && *start == '"')){ // if(unlikely(isInQuote && *start == '"')){
kv.keyLen = start - kv.key; // kv.keyLen = start - kv.key;
start++; // start++;
break; // break;
} // }
start++; // start++;
} // }
bool hasColon = false; // bool hasColon = false;
while(start < end){ // while(start < end){
if(unlikely(!hasColon && *start == ':')){ // if(unlikely(!hasColon && *start == ':')){
start++; // start++;
hasColon = true; // hasColon = true;
continue; // continue;
} // }
if(unlikely(hasColon && kv.value == NULL && (*start > 32 && *start != '"'))){ // if(unlikely(hasColon && kv.value == NULL && (*start > 32 && *start != '"'))){
kv.value = start; // kv.value = start;
start++; // start++;
continue; // continue;
} // }
//
if(unlikely(hasColon && kv.value != NULL && (*start == '"' || *start == ',' || *start == '}'))){ // if(unlikely(hasColon && kv.value != NULL && (*start == '"' || *start == ',' || *start == '}'))){
kv.length = start - kv.value; // kv.length = start - kv.value;
taosArrayPush(tags, &kv); // taosArrayPush(tags, &kv);
start++; // start++;
break; // break;
} // }
start++; // start++;
} // }
} // }
return tags; // return tags;
} //}
static int32_t smlParseTagsFromJSON(SSmlHandle *info, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if(is_same_child_table_telnet(elements, &info->preLine) == 0){
return TSDB_CODE_SUCCESS;
}
bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
taosArraySetSize(preLineKV, 0);
}
}else{
taosArraySetSize(preLineKV, 0);
}
SArray *tags = smlJsonParseTags(elements->tags, elements->tags + elements->tagsLen);
int32_t tagNum = taosArrayGetSize(tags);
for (int32_t i = 0; i < tagNum; ++i) {
SSmlKv kv = *(SSmlKv*)taosArrayGet(tags, i);
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt);
if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
taosArrayPush(preLineKV, &kv);
}
cnt++;
}
taosArrayDestroy(tags);
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
if (unlikely(tinfo == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tinfo->tags = taosArrayDup(preLineKV, NULL);
smlSetCTableName(tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
}
SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
*key = *elements;
tinfo->key = key;
nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet);
}
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
return ret; //static int32_t smlParseTagsFromJSON(SSmlHandle *info, SSmlLineInfo *elements) {
} // int32_t ret = TSDB_CODE_SUCCESS;
//
// if(is_same_child_table_telnet(elements, &info->preLine) == 0){
// return TSDB_CODE_SUCCESS;
// }
//
// bool isSameMeasure = IS_SAME_SUPER_TABLE;
//
// int cnt = 0;
// SArray *preLineKV = info->preLineTagKV;
// bool isSuperKVInit = true;
// SArray *superKV = NULL;
// if(info->dataFormat){
// if(unlikely(!isSameMeasure)){
// SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
//
// if(unlikely(sMeta == NULL)){
// sMeta = smlBuildSTableMeta(info->dataFormat);
// STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
// sMeta->tableMeta = pTableMeta;
// if(pTableMeta == NULL){
// info->dataFormat = false;
// info->reRun = true;
// return TSDB_CODE_SUCCESS;
// }
// nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
// }
// info->currSTableMeta = sMeta->tableMeta;
// superKV = sMeta->tags;
//
// if(unlikely(taosArrayGetSize(superKV) == 0)){
// isSuperKVInit = false;
// }
// taosArraySetSize(preLineKV, 0);
// }
// }else{
// taosArraySetSize(preLineKV, 0);
// }
//
// SArray *tags = smlJsonParseTags(elements->tags, elements->tags + elements->tagsLen);
// int32_t tagNum = taosArrayGetSize(tags);
// if (tagNum == 0) {
// uError("SML:tag is empty:%s", elements->tags)
// taosArrayDestroy(tags);
// return TSDB_CODE_SML_INVALID_DATA;
// }
// for (int32_t i = 0; i < tagNum; ++i) {
// SSmlKv kv = *(SSmlKv*)taosArrayGet(tags, i);
//
// if(info->dataFormat){
// if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
//
// if(isSameMeasure){
// if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
// SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt);
// if(unlikely(kv.length > preKV->length)){
// preKV->length = kv.length;
// SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
// ASSERT(tableMeta != NULL);
//
// SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt);
// oldKV->length = kv.length;
// info->needModifySchema = true;
// }
// if(unlikely(!IS_SAME_KEY)){
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
// }else{
// if(isSuperKVInit){
// if(unlikely(cnt >= taosArrayGetSize(superKV))) {
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
// SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt);
// if(unlikely(kv.length > preKV->length)) {
// preKV->length = kv.length;
// }else{
// kv.length = preKV->length;
// }
// info->needModifySchema = true;
//
// if(unlikely(!IS_SAME_KEY)){
// info->dataFormat = false;
// info->reRun = true;
// taosArrayDestroy(tags);
// return TSDB_CODE_SUCCESS;
// }
// }else{
// taosArrayPush(superKV, &kv);
// }
// taosArrayPush(preLineKV, &kv);
// }
// }else{
// taosArrayPush(preLineKV, &kv);
// }
// cnt++;
// }
// taosArrayDestroy(tags);
//
// SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
// if (unlikely(tinfo == NULL)) {
// tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
// if (unlikely(!tinfo)) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// tinfo->tags = taosArrayDup(preLineKV, NULL);
//
// smlSetCTableName(tinfo);
// if (info->dataFormat) {
// info->currSTableMeta->uid = tinfo->uid;
// tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
// if (tinfo->tableDataCtx == NULL) {
// smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
// return TSDB_CODE_SML_INVALID_DATA;
// }
// }
//
// SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
// *key = *elements;
// tinfo->key = key;
// nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet);
// }
// if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
//
// return ret;
//}
static char* smlJsonGetObj(char *payload){ static char* smlJsonGetObj(char *payload){
int leftBracketCnt = 0; int leftBracketCnt = 0;
...@@ -233,7 +240,7 @@ static char* smlJsonGetObj(char *payload){ ...@@ -233,7 +240,7 @@ static char* smlJsonGetObj(char *payload){
return NULL; return NULL;
} }
void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){
int index = 0; int index = 0;
while(*(*start)){ while(*(*start)){
if((*start)[0] != '"'){ if((*start)[0] != '"'){
...@@ -241,10 +248,11 @@ void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){ ...@@ -241,10 +248,11 @@ void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){
continue; continue;
} }
if(unlikely(index >= 4)) { if(unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= 4, %s", *start) uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start)
break; break;
} }
char *sTmp = *start; char *sTmp = *start;
if((*start)[1] == 'm' && (*start)[2] == 'e' && (*start)[3] == 't' if((*start)[1] == 'm' && (*start)[2] == 'e' && (*start)[3] == 't'
&& (*start)[4] == 'r' && (*start)[5] == 'i' && (*start)[6] == 'c' && (*start)[7] == '"'){ && (*start)[4] == 'r' && (*start)[5] == 'i' && (*start)[6] == 'c' && (*start)[7] == '"'){
...@@ -333,9 +341,15 @@ void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){ ...@@ -333,9 +341,15 @@ void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){
} }
(*start)++; (*start)++;
} }
if(unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL || element->measure == NULL || element->timestamp == NULL) {
uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM)
return -1;
}
return 0;
} }
void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){
int index = 0; int index = 0;
while(*(*start)){ while(*(*start)){
if((*start)[0] != '"'){ if((*start)[0] != '"'){
...@@ -343,10 +357,11 @@ void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){ ...@@ -343,10 +357,11 @@ void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){
continue; continue;
} }
if(unlikely(index >= 4)) { if(unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= 4, %s", *start) uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start)
break; break;
} }
if((*start)[1] == 'm'){ if((*start)[1] == 'm'){
(*start) += offset[index++]; (*start) += offset[index++];
element->measure = *start; element->measure = *start;
...@@ -393,30 +408,711 @@ void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){ ...@@ -393,30 +408,711 @@ void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){
} }
(*start)++; (*start)++;
} }
if(unlikely(index != OTD_JSON_FIELDS_NUM)) {
uError("elements != %d", OTD_JSON_FIELDS_NUM)
return -1;
}
return 0;
}
static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) {
elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
elements->measure = metric->valuestring;
return TSDB_CODE_SUCCESS;
}
const char *jsonName[OTD_JSON_FIELDS_NUM] = {"metric", "timestamp", "value", "tags"};
static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks){
cJSON *child = root->child;
for (int i = 0; i < OTD_JSON_FIELDS_NUM; ++i) {
while(child != NULL)
{
if(strcasecmp(child->string, jsonName[i]) == 0){
*marks[i] = child;
break;
}
child = child->next;
}
if(*marks[i] == NULL){
uError("smlGetJsonElements error, not find mark:%s", jsonName[i]);
return -1;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
if (strcasecmp(typeStr, "bool") != 0) {
uError("OTD:invalid type(%s) for JSON Bool", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valueint;
return TSDB_CODE_SUCCESS;
}
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// tinyint
if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
if (!IS_VALID_TINYINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_TINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// smallint
if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
if (!IS_VALID_SMALLINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_SMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// int
if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
if (!IS_VALID_INT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_INT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// bigint
if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
if (value->valuedouble >= (double)INT64_MAX) {
pVal->i = INT64_MAX;
} else if (value->valuedouble <= (double)INT64_MIN) {
pVal->i = INT64_MIN;
} else {
pVal->i = value->valuedouble;
}
return TSDB_CODE_SUCCESS;
}
// float
if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
if (!IS_VALID_FLOAT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->f = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// double
if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->d = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// if reach here means type is unsupported
uError("OTD:invalid type(%s) for JSON Number", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
if (strcasecmp(typeStr, "binary") == 0) {
pVal->type = TSDB_DATA_TYPE_BINARY;
} else if (strcasecmp(typeStr, "nchar") == 0) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
} else {
uError("OTD:invalid type(%s) for JSON String", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->length = (int16_t)strlen(value->valuestring);
if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
pVal->value = value->valuestring;
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t size = cJSON_GetArraySize(root);
if (size != OTD_JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (value == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
switch (value->type) {
case cJSON_True:
case cJSON_False: {
ret = smlConvertJSONBool(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
case cJSON_Number: {
ret = smlConvertJSONNumber(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
case cJSON_String: {
ret = smlConvertJSONString(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
default:
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
switch (root->type) {
case cJSON_True:
case cJSON_False: {
kv->type = TSDB_DATA_TYPE_BOOL;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
kv->i = root->valueint;
break;
}
case cJSON_Number: {
kv->type = TSDB_DATA_TYPE_DOUBLE;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
kv->d = root->valuedouble;
break;
}
case cJSON_String: {
/* set default JSON type to binary/nchar according to
* user configured parameter tsDefaultJSONStrType
*/
char *tsDefaultJSONStrType = "nchar"; // todo
smlConvertJSONString(kv, tsDefaultJSONStrType, root);
break;
}
case cJSON_Object: {
int32_t ret = smlParseValueFromJSONObj(root, kv);
if (ret != TSDB_CODE_SUCCESS) {
uError("OTD:Failed to parse value from JSON Obj");
return ret;
}
break;
}
default:
return TSDB_CODE_TSC_INVALID_JSON;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
taosArraySetSize(preLineKV, 0);
}
}else{
taosArraySetSize(preLineKV, 0);
}
int32_t tagNum = cJSON_GetArraySize(tags);
if(unlikely(tagNum == 0)){
uError("SML:Tag should not be empty");
return TSDB_CODE_TSC_INVALID_JSON;
}
for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i);
if (unlikely(tag == NULL)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
// if(unlikely(tag == cMeasure)) continue;
size_t keyLen = strlen(tag->string);
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
uError("OTD:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
// add kv to SSmlKv
SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
// value
ret = smlParseValueFromJSON(tag, &kv);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt);
if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
taosArrayPush(preLineKV, &kv);
}
cnt++;
}
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
if (unlikely(tinfo == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tinfo->tags = taosArrayDup(preLineKV, NULL);
smlSetCTableName(tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
}
SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
*key = *elements;
if(info->parseJsonByLib){
key->tags = taosMemoryMalloc(elements->tagsLen + 1);
memcpy(key->tags, elements->tags, elements->tagsLen);
key->tags[elements->tagsLen] = 0;
}
tinfo->key = key;
nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet);
}
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
return ret;
}
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
int32_t size = cJSON_GetArraySize(root);
if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (unlikely(!cJSON_IsNumber(value))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (unlikely(!cJSON_IsString(type))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
double timeDouble = value->valuedouble;
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return -1;
}
if (timeDouble == 0) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
if (timeDouble < 0) {
return timeDouble;
}
int64_t tsInt64 = timeDouble;
size_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
// seconds
int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
} else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
switch (type->valuestring[0]) {
case 'm':
case 'M':
// milliseconds
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
break;
case 'u':
case 'U':
// microseconds
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
break;
case 'n':
case 'N':
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
break;
default:
return -1;
}
} else {
return -1;
}
}
uint8_t smlGetTimestampLen(int64_t num) {
uint8_t len = 0;
while ((num /= 10) != 0) {
len++;
}
len++;
return len;
}
static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
// Timestamp must be the first KV to parse
int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
if (cJSON_IsNumber(timestamp)) {
// timestamp value 0 indicates current system time
double timeDouble = timestamp->valuedouble;
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return -1;
}
if (unlikely(timeDouble < 0)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp is negative", NULL);
return timeDouble;
}else if (unlikely(timeDouble == 0)) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
int8_t fromPrecision = smlGetTsTypeByLen(tsLen);
if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
return -1;
}
int64_t tsInt64 = timeDouble;
if(fromPrecision == TSDB_TIME_PRECISION_SECONDS){
if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
}else{
return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
}
} else if (cJSON_IsObject(timestamp)) {
return smlParseTSFromJSONObj(info, timestamp, toPrecision);
} else {
smlBuildInvalidDataMsg(&info->msgBuf,
"invalidate json", NULL);
return -1;
}
}
static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
cJSON *metricJson = NULL;
cJSON *tsJson = NULL;
cJSON *valueJson = NULL;
cJSON *tagsJson = NULL;
int32_t size = cJSON_GetArraySize(root);
// outmost json fields has to be exactly 4
if (size != OTD_JSON_FIELDS_NUM) {
uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON **marks[OTD_JSON_FIELDS_NUM] = {&metricJson, &tsJson, &valueJson, &tagsJson};
ret = smlGetJsonElements(root, marks);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
// Parse metric
ret = smlParseMetricFromJSON(info, metricJson, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
return ret;
}
// Parse metric value
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
ret = smlParseValueFromJSON(valueJson, &kv);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
return ret;
}
// Parse tags
elements->tags = cJSON_PrintUnformatted(tagsJson);
elements->tagsLen = strlen(elements->tags);
if(is_same_child_table_telnet(elements, &info->preLine) != 0) {
ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
taosMemoryFree(elements->tags);
elements->tags = NULL;
return ret;
}
}
if(info->dataFormat){
taosMemoryFree(elements->tags);
elements->tags = NULL;
}
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = smlParseTSFromJSON(info, tsJson);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .type = TSDB_DATA_TYPE_TIMESTAMP, .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
}else{
if(elements->colArray == NULL){
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
int32_t payloadNum = 0;
int32_t ret = TSDB_CODE_SUCCESS;
if (unlikely(payload == NULL)) {
uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
info->root = cJSON_Parse(payload);
if (unlikely(info->root == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
return TSDB_CODE_TSC_INVALID_JSON;
}
// multiple data points must be sent in JSON array
if (cJSON_IsArray(info->root)) {
payloadNum = cJSON_GetArraySize(info->root);
} else if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else {
uError("SML:0x%" PRIx64 " Invalid JSON Payload 3:%s", info->id, payload);
return TSDB_CODE_TSC_INVALID_JSON;
}
info->lineNum = payloadNum;
info->dataFormat = true;
if (unlikely(info->lines != NULL)) {
taosMemoryFree(info->lines);
info->lines = NULL;
}
ret = smlClearForRerun(info);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
info->parseJsonByLib = true;
cJSON *head = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : info->root->child;
int cnt = 0;
cJSON *dataPoint = head;
while (dataPoint) {
if(info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONStringExt(info, dataPoint, &element);
}else{
ret = smlParseJSONStringExt(info, dataPoint, info->lines + cnt);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload 2:%s", info->id, payload);
return ret;
}
if(unlikely(info->reRun)){
cnt = 0;
dataPoint = head;
info->lineNum = payloadNum;
ret = smlClearForRerun(info);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
continue;
}
cnt++;
dataPoint = dataPoint->next;
}
return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *elements) { static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if(info->offset[0] == 0){ if(info->offset[0] == 0){
smlJsonParseObjFirst(start, elements, info->offset); ret = smlJsonParseObjFirst(start, elements, info->offset);
}else{ }else{
smlJsonParseObj(start, elements, info->offset); ret = smlJsonParseObj(start, elements, info->offset);
} }
if(**start == '\0') return TSDB_CODE_SUCCESS;
if (ret != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
if(unlikely(**start == '\0' && elements->measure == NULL)) return TSDB_CODE_SUCCESS;
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen}; SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (smlParseNumber(&kv, &info->msgBuf)) { if (elements->colsLen == 0 || smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
kv.length = (int16_t)tDataTypes[kv.type].bytes; uError("SML:cols invalidate:%s", elements->cols);
}else{
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
// Parse tags // Parse tags
ret = smlParseTagsFromJSON(info, elements); if(is_same_child_table_telnet(elements, &info->preLine) != 0){
if (unlikely(ret)) { char tmp = *(elements->tags + elements->tagsLen);
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); *(elements->tags + elements->tagsLen) = 0;
return ret; cJSON* tagsJson = cJSON_Parse(elements->tags);
*(elements->tags + elements->tagsLen) = tmp;
if (unlikely(tagsJson == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, elements->tags);
return TSDB_CODE_TSC_INVALID_JSON;
}
ret = smlParseTagsFromJSON(info, tagsJson, elements);
cJSON_free(tagsJson);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
} }
if(unlikely(info->reRun)){ if(unlikely(info->reRun)){
...@@ -472,17 +1168,16 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) { ...@@ -472,17 +1168,16 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
void* tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo)); void* tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
if(tmp != NULL){ if(tmp != NULL){
info->lines = (SSmlLineInfo*)tmp; info->lines = (SSmlLineInfo*)tmp;
memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
} }
} }
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt); ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
} }
if (unlikely(ret != TSDB_CODE_SUCCESS)) { if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id); uError("SML:0x%" PRIx64 " Invalid JSON Payload 1:%s", info->id, payload);
return ret; return smlParseJSONExt(info, payload);
} }
if(*dataPointStart == '\0') break;
if(unlikely(info->reRun)){ if(unlikely(info->reRun)){
cnt = 0; cnt = 0;
dataPointStart = payload; dataPointStart = payload;
...@@ -493,6 +1188,8 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) { ...@@ -493,6 +1188,8 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
} }
continue; continue;
} }
if(*dataPointStart == '\0') break;
cnt++; cnt++;
} }
info->lineNum = cnt; info->lineNum = cnt;
......
...@@ -411,28 +411,28 @@ TEST(testCase, smlParseCols_Test) { ...@@ -411,28 +411,28 @@ TEST(testCase, smlParseCols_Test) {
smlDestroyInfo(info); smlDestroyInfo(info);
} }
//TEST(testCase, smlGetTimestampLen_Test) { TEST(testCase, smlGetTimestampLen_Test) {
// uint8_t len = smlGetTimestampLen(0); uint8_t len = smlGetTimestampLen(0);
// ASSERT_EQ(len, 1); ASSERT_EQ(len, 1);
//
// len = smlGetTimestampLen(1); len = smlGetTimestampLen(1);
// ASSERT_EQ(len, 1); ASSERT_EQ(len, 1);
//
// len = smlGetTimestampLen(10); len = smlGetTimestampLen(10);
// ASSERT_EQ(len, 2); ASSERT_EQ(len, 2);
//
// len = smlGetTimestampLen(390); len = smlGetTimestampLen(390);
// ASSERT_EQ(len, 3); ASSERT_EQ(len, 3);
//
// len = smlGetTimestampLen(-1); len = smlGetTimestampLen(-1);
// ASSERT_EQ(len, 1); ASSERT_EQ(len, 1);
//
// len = smlGetTimestampLen(-10); len = smlGetTimestampLen(-10);
// ASSERT_EQ(len, 2); ASSERT_EQ(len, 2);
//
// len = smlGetTimestampLen(-390); len = smlGetTimestampLen(-390);
// ASSERT_EQ(len, 3); ASSERT_EQ(len, 3);
//} }
TEST(testCase, smlParseNumber_Test) { TEST(testCase, smlParseNumber_Test) {
SSmlKv kv = {0}; SSmlKv kv = {0};
......
...@@ -1719,7 +1719,6 @@ class TDTestCase: ...@@ -1719,7 +1719,6 @@ class TDTestCase:
print(err.errno) print(err.errno)
def runAll(self): def runAll(self):
"""
for value_type in ["obj", "default"]: for value_type in ["obj", "default"]:
self.initCheckCase(value_type) self.initCheckCase(value_type)
self.symbolsCheckCase(value_type) self.symbolsCheckCase(value_type)
...@@ -1759,7 +1758,7 @@ class TDTestCase: ...@@ -1759,7 +1758,7 @@ class TDTestCase:
self.batchErrorInsertCheckCase() self.batchErrorInsertCheckCase()
self.chineseCheckCase() self.chineseCheckCase()
# self.spellCheckCase() # self.spellCheckCase()
self.tbnameTagsColsNameCheckCase() # self.tbnameTagsColsNameCheckCase()
# # MultiThreads # # MultiThreads
# self.sStbStbDdataInsertMultiThreadCheckCase() # self.sStbStbDdataInsertMultiThreadCheckCase()
# self.sStbStbDdataAtInsertMultiThreadCheckCase() # self.sStbStbDdataAtInsertMultiThreadCheckCase()
...@@ -1772,7 +1771,7 @@ class TDTestCase: ...@@ -1772,7 +1771,7 @@ class TDTestCase:
# self.sStbStbDdataDtsMtInsertMultiThreadCheckCase() # self.sStbStbDdataDtsMtInsertMultiThreadCheckCase()
# self.sStbDtbDdataDtsMtInsertMultiThreadCheckCase() # self.sStbDtbDdataDtsMtInsertMultiThreadCheckCase()
# self.lengthIcreaseCrashCheckCase() # self.lengthIcreaseCrashCheckCase()
"""
def run(self): def run(self):
print("running {}".format(__file__)) print("running {}".format(__file__))
self.createDb() self.createDb()
......
...@@ -243,7 +243,7 @@ class TDTestCase: ...@@ -243,7 +243,7 @@ class TDTestCase:
if t_add_tag is not None: if t_add_tag is not None:
sql_seq = f'{stb_name} {ts} {value} t0={t0} t1={t1} t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8} t9={t8}' sql_seq = f'{stb_name} {ts} {value} t0={t0} t1={t1} t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8} t9={t8}'
if id_change_tag is not None: if id_change_tag is not None:
sql_seq = f'{stb_name} {ts} {value} t0={t0} {id}={tb_name} t1={t1} t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8}' sql_seq = f'{stb_name} {ts} {value} {id}={tb_name} t0={t0} t1={t1} t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8}'
if id_double_tag is not None: if id_double_tag is not None:
sql_seq = f'{stb_name} {ts} {value} {id}=\"{tb_name}_1\" t0={t0} t1={t1} {id}=\"{tb_name}_2\" t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8}' sql_seq = f'{stb_name} {ts} {value} {id}=\"{tb_name}_1\" t0={t0} t1={t1} {id}=\"{tb_name}_2\" t2={t2} t3={t3} t4={t4} t5={t5} t6={t6} t7={t7} t8={t8}'
if t_add_tag is not None: if t_add_tag is not None:
...@@ -1126,9 +1126,9 @@ class TDTestCase: ...@@ -1126,9 +1126,9 @@ class TDTestCase:
self._conn.schemaless_insert([input_sql], TDSmlProtocolType.TELNET.value, None) self._conn.schemaless_insert([input_sql], TDSmlProtocolType.TELNET.value, None)
query_sql = 'select * from `rFa$sta`' query_sql = 'select * from `rFa$sta`'
query_res = tdSql.query(query_sql, True) query_res = tdSql.query(query_sql, True)
tdSql.checkEqual(query_res, [(datetime.datetime(2021, 7, 11, 20, 33, 54), 9.223372036854776e+18, '2147483647i32', 'L"ncharTagValue"', '32767i16', '9223372036854775807i64', '22.123456789f64', '"ddzhiksj"', '11.12345f32', 'true', '127Ii8')]) tdSql.checkEqual(query_res, [(datetime.datetime(2021, 7, 11, 20, 33, 54), 9.223372036854776e+18, 'true', '127Ii8', '32767i16', '2147483647i32', '9223372036854775807i64', '11.12345f32', '22.123456789f64', '"ddzhiksj"', 'L"ncharTagValue"')])
col_tag_res = tdSql.getColNameList(query_sql) col_tag_res = tdSql.getColNameList(query_sql)
tdSql.checkEqual(col_tag_res, ['_ts', '_value', '"t$3"', 't!@#$%^&*()_+[];:<>?,9', 't#2', 't%4', 't&6', 't*7', 't^5', 'Tt!0', 'tT@1']) tdSql.checkEqual(col_tag_res, ['_ts', '_value', 'Tt!0', 'tT@1', 't#2', '"t$3"', 't%4', 't^5', 't&6', 't*7', 't!@#$%^&*()_+[];:<>?,9'])
tdSql.execute('drop table `rFa$sta`') tdSql.execute('drop table `rFa$sta`')
def tcpKeywordsCheckCase(self, protocol="telnet-tcp"): def tcpKeywordsCheckCase(self, protocol="telnet-tcp"):
...@@ -1207,7 +1207,6 @@ class TDTestCase: ...@@ -1207,7 +1207,6 @@ class TDTestCase:
tdLog.info(f'{sys._getframe().f_code.co_name}() function is running') tdLog.info(f'{sys._getframe().f_code.co_name}() function is running')
tdCom.cleanTb(dbname="test") tdCom.cleanTb(dbname="test")
input_sql = self.genSqlList()[0] input_sql = self.genSqlList()[0]
print(input_sql)
self.multiThreadRun(self.genMultiThreadSeq(input_sql)) self.multiThreadRun(self.genMultiThreadSeq(input_sql))
tdSql.query(f"show tables;") tdSql.query(f"show tables;")
tdSql.checkRows(5) tdSql.checkRows(5)
...@@ -1416,8 +1415,8 @@ class TDTestCase: ...@@ -1416,8 +1415,8 @@ class TDTestCase:
self.symbolsCheckCase() self.symbolsCheckCase()
self.tsCheckCase() self.tsCheckCase()
self.openTstbTelnetTsCheckCase() self.openTstbTelnetTsCheckCase()
#self.idSeqCheckCase() # self.idSeqCheckCase()
#self.idLetterCheckCase() self.idLetterCheckCase()
self.noIdCheckCase() self.noIdCheckCase()
self.maxColTagCheckCase() self.maxColTagCheckCase()
self.stbTbNameCheckCase() self.stbTbNameCheckCase()
...@@ -1450,7 +1449,7 @@ class TDTestCase: ...@@ -1450,7 +1449,7 @@ class TDTestCase:
self.spellCheckCase() self.spellCheckCase()
self.pointTransCheckCase() self.pointTransCheckCase()
self.defaultTypeCheckCase() self.defaultTypeCheckCase()
#self.tbnameTagsColsNameCheckCase() self.tbnameTagsColsNameCheckCase()
# # # MultiThreads # # # MultiThreads
# self.stbInsertMultiThreadCheckCase() # self.stbInsertMultiThreadCheckCase()
# self.sStbStbDdataInsertMultiThreadCheckCase() # self.sStbStbDdataInsertMultiThreadCheckCase()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册