提交 5290c32f 编写于 作者: wmmhello's avatar wmmhello

fix:json parse error

上级 a0d46fe9
...@@ -181,6 +181,7 @@ typedef struct { ...@@ -181,6 +181,7 @@ typedef struct {
cJSON *root; // for parse json cJSON *root; // for parse json
int8_t offset[OTD_JSON_FIELDS_NUM]; int8_t offset[OTD_JSON_FIELDS_NUM];
SSmlLineInfo *lines; // element is SSmlLineInfo SSmlLineInfo *lines; // element is SSmlLineInfo
bool parseJsonByLib;
// //
SArray *preLineTagKV; SArray *preLineTagKV;
...@@ -209,7 +210,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos); ...@@ -209,7 +210,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos);
void smlDestroyInfo(SSmlHandle *info); void smlDestroyInfo(SSmlHandle *info);
int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset); int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset);
int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset); int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset);
SArray *smlJsonParseTags(char *start, char *end); //SArray *smlJsonParseTags(char *start, char *end);
bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg); bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg);
void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn); void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn);
int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn); int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn);
......
...@@ -1008,12 +1008,16 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols ...@@ -1008,12 +1008,16 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void smlDestroyTableInfo(SSmlTableInfo *tag) { static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) { for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i); SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
} }
if(info->parseJsonByLib){
SSmlLineInfo *key = (SSmlLineInfo *)(tag->key);
if(key != NULL) taosMemoryFree(key->tags);
}
taosMemoryFree(tag->key); taosMemoryFree(tag->key);
taosArrayDestroy(tag->cols); taosArrayDestroy(tag->cols);
taosArrayDestroy(tag->tags); taosArrayDestroy(tag->tags);
...@@ -1028,7 +1032,7 @@ void smlDestroyInfo(SSmlHandle *info) { ...@@ -1028,7 +1032,7 @@ void smlDestroyInfo(SSmlHandle *info) {
NodeList *tmp = info->childTables; NodeList *tmp = info->childTables;
while (tmp) { while (tmp) {
if (tmp->data.used) { if (tmp->data.used) {
smlDestroyTableInfo((SSmlTableInfo *)tmp->data.value); smlDestroyTableInfo(info, (SSmlTableInfo *)tmp->data.value);
} }
NodeList *t = tmp->next; NodeList *t = tmp->next;
taosMemoryFree(tmp); taosMemoryFree(tmp);
...@@ -1055,6 +1059,9 @@ void smlDestroyInfo(SSmlHandle *info) { ...@@ -1055,6 +1059,9 @@ void smlDestroyInfo(SSmlHandle *info) {
if (!info->dataFormat) { if (!info->dataFormat) {
for (int i = 0; i < info->lineNum; i++) { for (int i = 0; i < info->lineNum; i++) {
taosArrayDestroy(info->lines[i].colArray); taosArrayDestroy(info->lines[i].colArray);
if(info->parseJsonByLib){
taosMemoryFree(info->lines[i].tags);
}
} }
taosMemoryFree(info->lines); taosMemoryFree(info->lines);
} }
...@@ -1251,7 +1258,7 @@ int32_t smlClearForRerun(SSmlHandle *info) { ...@@ -1251,7 +1258,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
NodeList *pList = info->childTables; NodeList *pList = info->childTables;
while (pList) { while (pList) {
if (pList->data.used) { if (pList->data.used) {
smlDestroyTableInfo((SSmlTableInfo *)pList->data.value); smlDestroyTableInfo(info, (SSmlTableInfo *)pList->data.value);
pList->data.used = false; pList->data.used = false;
} }
pList = pList->next; pList = pList->next;
......
...@@ -29,193 +29,193 @@ while(*(start)){\ ...@@ -29,193 +29,193 @@ while(*(start)){\
(start)++;\ (start)++;\
} }
SArray *smlJsonParseTags(char *start, char *end){ //SArray *smlJsonParseTags(char *start, char *end){
SArray *tags = taosArrayInit(4, sizeof(SSmlKv)); // SArray *tags = taosArrayInit(4, sizeof(SSmlKv));
while(start < end){ // while(start < end){
SSmlKv kv = {0}; // SSmlKv kv = {0};
kv.type = TSDB_DATA_TYPE_NCHAR; // kv.type = TSDB_DATA_TYPE_NCHAR;
bool isInQuote = false; // bool isInQuote = false;
while(start < end){ // while(start < end){
if(unlikely(!isInQuote && *start == '"')){ // if(unlikely(!isInQuote && *start == '"')){
start++; // start++;
kv.key = start; // kv.key = start;
isInQuote = true; // isInQuote = true;
continue; // continue;
} // }
if(unlikely(isInQuote && *start == '"')){ // if(unlikely(isInQuote && *start == '"')){
kv.keyLen = start - kv.key; // kv.keyLen = start - kv.key;
start++; // start++;
break; // break;
} // }
start++; // start++;
} // }
bool hasColon = false; // bool hasColon = false;
while(start < end){ // while(start < end){
if(unlikely(!hasColon && *start == ':')){ // if(unlikely(!hasColon && *start == ':')){
start++; // start++;
hasColon = true; // hasColon = true;
continue; // continue;
} // }
if(unlikely(hasColon && kv.value == NULL && (*start > 32 && *start != '"'))){ // if(unlikely(hasColon && kv.value == NULL && (*start > 32 && *start != '"'))){
kv.value = start; // kv.value = start;
start++; // start++;
continue; // continue;
} // }
//
if(unlikely(hasColon && kv.value != NULL && (*start == '"' || *start == ',' || *start == '}'))){ // if(unlikely(hasColon && kv.value != NULL && (*start == '"' || *start == ',' || *start == '}'))){
kv.length = start - kv.value; // kv.length = start - kv.value;
taosArrayPush(tags, &kv); // taosArrayPush(tags, &kv);
start++; // start++;
break; // break;
} // }
start++; // start++;
} // }
} // }
return tags; // return tags;
} //}
static int32_t smlParseTagsFromJSON(SSmlHandle *info, SSmlLineInfo *elements) { //static int32_t smlParseTagsFromJSON(SSmlHandle *info, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS; // int32_t ret = TSDB_CODE_SUCCESS;
//
if(is_same_child_table_telnet(elements, &info->preLine) == 0){ // if(is_same_child_table_telnet(elements, &info->preLine) == 0){
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
//
bool isSameMeasure = IS_SAME_SUPER_TABLE; // bool isSameMeasure = IS_SAME_SUPER_TABLE;
//
int cnt = 0; // int cnt = 0;
SArray *preLineKV = info->preLineTagKV; // SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true; // bool isSuperKVInit = true;
SArray *superKV = NULL; // SArray *superKV = NULL;
if(info->dataFormat){ // if(info->dataFormat){
if(unlikely(!isSameMeasure)){ // if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); // SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
//
if(unlikely(sMeta == NULL)){ // if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat); // sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); // STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta; // sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){ // if(pTableMeta == NULL){
info->dataFormat = false; // info->dataFormat = false;
info->reRun = true; // info->reRun = true;
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL); // nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
} // }
info->currSTableMeta = sMeta->tableMeta; // info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->tags; // superKV = sMeta->tags;
//
if(unlikely(taosArrayGetSize(superKV) == 0)){ // if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false; // isSuperKVInit = false;
} // }
taosArraySetSize(preLineKV, 0); // taosArraySetSize(preLineKV, 0);
} // }
}else{ // }else{
taosArraySetSize(preLineKV, 0); // taosArraySetSize(preLineKV, 0);
} // }
//
SArray *tags = smlJsonParseTags(elements->tags, elements->tags + elements->tagsLen); // SArray *tags = smlJsonParseTags(elements->tags, elements->tags + elements->tagsLen);
int32_t tagNum = taosArrayGetSize(tags); // int32_t tagNum = taosArrayGetSize(tags);
if (tagNum == 0) { // if (tagNum == 0) {
uError("SML:tag is empty:%s", elements->tags) // uError("SML:tag is empty:%s", elements->tags)
taosArrayDestroy(tags); // taosArrayDestroy(tags);
return TSDB_CODE_SML_INVALID_DATA; // return TSDB_CODE_SML_INVALID_DATA;
} // }
for (int32_t i = 0; i < tagNum; ++i) { // for (int32_t i = 0; i < tagNum; ++i) {
SSmlKv kv = *(SSmlKv*)taosArrayGet(tags, i); // SSmlKv kv = *(SSmlKv*)taosArrayGet(tags, i);
//
if(info->dataFormat){ // if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){ // if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->dataFormat = false; // info->dataFormat = false;
info->reRun = true; // info->reRun = true;
taosArrayDestroy(tags); // taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
//
if(isSameMeasure){ // if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) { // if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false; // info->dataFormat = false;
info->reRun = true; // info->reRun = true;
taosArrayDestroy(tags); // taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt); // SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt);
if(unlikely(kv.length > preKV->length)){ // if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length; // preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL); // SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
ASSERT(tableMeta != NULL); // ASSERT(tableMeta != NULL);
//
SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt); // SSmlKv *oldKV = (SSmlKv *)taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length; // oldKV->length = kv.length;
info->needModifySchema = true; // info->needModifySchema = true;
} // }
if(unlikely(!IS_SAME_KEY)){ // if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false; // info->dataFormat = false;
info->reRun = true; // info->reRun = true;
taosArrayDestroy(tags); // taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
}else{ // }else{
if(isSuperKVInit){ // if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) { // if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false; // info->dataFormat = false;
info->reRun = true; // info->reRun = true;
taosArrayDestroy(tags); // taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt); // SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) { // if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length; // preKV->length = kv.length;
}else{ // }else{
kv.length = preKV->length; // kv.length = preKV->length;
} // }
info->needModifySchema = true; // info->needModifySchema = true;
//
if(unlikely(!IS_SAME_KEY)){ // if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false; // info->dataFormat = false;
info->reRun = true; // info->reRun = true;
taosArrayDestroy(tags); // taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
}else{ // }else{
taosArrayPush(superKV, &kv); // taosArrayPush(superKV, &kv);
} // }
taosArrayPush(preLineKV, &kv); // taosArrayPush(preLineKV, &kv);
} // }
}else{ // }else{
taosArrayPush(preLineKV, &kv); // taosArrayPush(preLineKV, &kv);
} // }
cnt++; // cnt++;
} // }
taosArrayDestroy(tags); // taosArrayDestroy(tags);
//
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet); // SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
if (unlikely(tinfo == NULL)) { // if (unlikely(tinfo == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen); // tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) { // if (unlikely(!tinfo)) {
return TSDB_CODE_OUT_OF_MEMORY; // return TSDB_CODE_OUT_OF_MEMORY;
} // }
tinfo->tags = taosArrayDup(preLineKV, NULL); // tinfo->tags = taosArrayDup(preLineKV, NULL);
//
smlSetCTableName(tinfo); // smlSetCTableName(tinfo);
if (info->dataFormat) { // if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid; // info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); // tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) { // if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL); // smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA; // return TSDB_CODE_SML_INVALID_DATA;
} // }
} // }
//
SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo)); // SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
*key = *elements; // *key = *elements;
tinfo->key = key; // tinfo->key = key;
nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet); // nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet);
} // }
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx; // if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
//
return ret; // return ret;
} //}
static char* smlJsonGetObj(char *payload){ static char* smlJsonGetObj(char *payload){
int leftBracketCnt = 0; int leftBracketCnt = 0;
...@@ -416,72 +416,6 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){ ...@@ -416,72 +416,6 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){
return 0; return 0;
} }
static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if(info->offset[0] == 0){
ret = smlJsonParseObjFirst(start, elements, info->offset);
}else{
ret = smlJsonParseObj(start, elements, info->offset);
}
if (ret != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
if(unlikely(**start == '\0' && elements->measure == NULL)) return TSDB_CODE_SUCCESS;
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (elements->colsLen == 0 || smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
uError("SML:cols invalidate:%s", elements->cols);
return TSDB_CODE_TSC_INVALID_VALUE;
}
// Parse tags
ret = smlParseTagsFromJSON(info, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .type = TSDB_DATA_TYPE_TIMESTAMP, .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
}else{
if(elements->colArray == NULL){
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
}
static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) { static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) {
elements->measureLen = strlen(metric->valuestring); elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) { if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
...@@ -707,15 +641,9 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) { ...@@ -707,15 +641,9 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseTagsFromJSONExt(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) { static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
elements->tags = cJSON_PrintUnformatted(tags);
elements->tagsLen = strlen(elements->tags);
if(is_same_child_table_telnet(elements, &info->preLine) == 0){
return TSDB_CODE_SUCCESS;
}
bool isSameMeasure = IS_SAME_SUPER_TABLE; bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0; int cnt = 0;
...@@ -853,6 +781,11 @@ static int32_t smlParseTagsFromJSONExt(SSmlHandle *info, cJSON *tags, SSmlLineIn ...@@ -853,6 +781,11 @@ static int32_t smlParseTagsFromJSONExt(SSmlHandle *info, cJSON *tags, SSmlLineIn
SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo)); SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
*key = *elements; *key = *elements;
if(info->parseJsonByLib){
key->tags = taosMemoryMalloc(elements->tagsLen + 1);
memcpy(key->tags, elements->tags, elements->tagsLen);
key->tags[elements->tagsLen] = 0;
}
tinfo->key = key; tinfo->key = key;
nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet); nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet);
} }
...@@ -1018,11 +951,20 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo ...@@ -1018,11 +951,20 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
} }
// Parse tags // Parse tags
ret = smlParseTagsFromJSONExt(info, tagsJson, elements); elements->tags = cJSON_PrintUnformatted(tagsJson);
elements->tagsLen = strlen(elements->tags);
if(is_same_child_table_telnet(elements, &info->preLine) != 0) {
ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) { if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
taosMemoryFree(elements->tags);
return ret; return ret;
} }
}
if(info->dataFormat){
taosMemoryFree(elements->tags);
}
if(unlikely(info->reRun)){ if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1129,6 +1071,83 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) { ...@@ -1129,6 +1071,83 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if(info->offset[0] == 0){
ret = smlJsonParseObjFirst(start, elements, info->offset);
}else{
ret = smlJsonParseObj(start, elements, info->offset);
}
if (ret != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
if(unlikely(**start == '\0' && elements->measure == NULL)) return TSDB_CODE_SUCCESS;
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (elements->colsLen == 0 || smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
uError("SML:cols invalidate:%s", elements->cols);
return TSDB_CODE_TSC_INVALID_VALUE;
}
// Parse tags
if(is_same_child_table_telnet(elements, &info->preLine) != 0){
char tmp = *(elements->tags + elements->tagsLen);
*(elements->tags + elements->tagsLen) = 0;
cJSON* tagsJson = cJSON_Parse(elements->tags);
*(elements->tags + elements->tagsLen) = tmp;
if (unlikely(tagsJson == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, elements->tags);
return TSDB_CODE_TSC_INVALID_JSON;
}
ret = smlParseTagsFromJSON(info, tagsJson, elements);
cJSON_free(tagsJson);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
}
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .type = TSDB_DATA_TYPE_TIMESTAMP, .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
}else{
if(elements->colArray == NULL){
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
}
int32_t smlParseJSON(SSmlHandle *info, char *payload) { int32_t smlParseJSON(SSmlHandle *info, char *payload) {
int32_t payloadNum = 1 << 15; int32_t payloadNum = 1 << 15;
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
...@@ -1152,6 +1171,7 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) { ...@@ -1152,6 +1171,7 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
} }
if (unlikely(ret != TSDB_CODE_SUCCESS)) { if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload 1:%s", info->id, payload); uError("SML:0x%" PRIx64 " Invalid JSON Payload 1:%s", info->id, payload);
info->parseJsonByLib = true;
return smlParseJSONExt(info, payload); return smlParseJSONExt(info, payload);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册