提交 7d9a9f91 编写于 作者: G Ganlin Zhao

[TD-6443]<feature>: Support OpenTSDB HTTP JSON data import format

上级 b5455d3c
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "tscParseLine.h" #include "tscParseLine.h"
#define MAX_FIELDS_NUM 2 #define MAX_FIELDS_NUM 2
#define JSON_SUB_FIELDS_NUM 2
#define JSON_FIELDS_NUM 4 #define JSON_FIELDS_NUM 4
#define OTS_TIMESTAMP_COLUMN_NAME "ts" #define OTS_TIMESTAMP_COLUMN_NAME "ts"
...@@ -429,8 +430,7 @@ int taos_telnet_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { ...@@ -429,8 +430,7 @@ int taos_telnet_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
/* telnet style API parser */ /* telnet style API parser */
int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) { int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
cJSON *metric = cJSON_GetObjectItem(root, "metric"); cJSON *metric = cJSON_GetObjectItem(root, "metric");
if (metric == NULL || metric->type != cJSON_String) { if (cJSON_IsString(metric)) {
tscError("OTD:0x%"PRIx64" failed to parse metric from JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
...@@ -457,6 +457,55 @@ int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInf ...@@ -457,6 +457,55 @@ int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInf
} }
int32_t parseTimestampFromJSONObj(cJSON *root, int64_t *tsVal, SSmlLinesInfo* info) {
int32_t size = cJSON_GetArraySize(root);
if (size != JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (!cJSON_IsNumber(value)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
*tsVal = value->valueint;
//if timestamp value is 0 use current system time
if (*tsVal == 0) {
*tsVal = taosGetTimestampNs();
return TSDB_CODE_SUCCESS;
}
int32_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && type->valuestring[0] == 's') {
//seconds
*tsVal = *tsVal * 1e9;
} else if (typeLen == 2 && type->valuestring[1] == 's') {
switch (type->valuestring[0]) {
case 'm':
//milliseconds
*tsVal = convertTimePrecision(*tsVal, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
break;
case 'u':
//microseconds
*tsVal = convertTimePrecision(*tsVal, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
break;
case 'n':
//nanoseconds
*tsVal = *tsVal * 1;
break;
default:
return TSDB_CODE_TSC_INVALID_JSON;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSmlLinesInfo* info) { int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSmlLinesInfo* info) {
//Timestamp must be the first KV to parse //Timestamp must be the first KV to parse
assert(*num_kvs == 0); assert(*num_kvs == 0);
...@@ -464,32 +513,29 @@ int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSm ...@@ -464,32 +513,29 @@ int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSm
char key[] = OTS_TIMESTAMP_COLUMN_NAME; char key[] = OTS_TIMESTAMP_COLUMN_NAME;
cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp"); cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
if (timestamp == NULL || timestamp->type != cJSON_Number) { if (cJSON_IsNumber(timestamp)) {
tscError("OTD:0x%"PRIx64" failed to parse timestamp from JSON Payload", info->id); //timestamp value 0 indicates current system time
if (timestamp->valueint == 0) {
tsVal = taosGetTimestampNs();
} else {
tsVal = convertTimePrecision(timestamp->valueint, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
}
} else if (cJSON_IsObject(timestamp)) {
parseTimestampFromJSONObj(root, &tsVal, info);
} else {
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
//allocate fields for timestamp and value //allocate fields for timestamp and value
*pTS = tcalloc(MAX_FIELDS_NUM, sizeof(TAOS_SML_KV)); *pTS = tcalloc(MAX_FIELDS_NUM, sizeof(TAOS_SML_KV));
tsVal = convertTimePrecision(timestamp->valueint, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
(*pTS)->key = tcalloc(sizeof(key), 1); (*pTS)->key = tcalloc(sizeof(key), 1);
if ((*pTS)->key == NULL) {
tfree(*pTS);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy((*pTS)->key, key, sizeof(key)); memcpy((*pTS)->key, key, sizeof(key));
(*pTS)->type = TSDB_DATA_TYPE_TIMESTAMP; (*pTS)->type = TSDB_DATA_TYPE_TIMESTAMP;
(*pTS)->length = (int16_t)tDataTypes[(*pTS)->type].bytes; (*pTS)->length = (int16_t)tDataTypes[(*pTS)->type].bytes;
(*pTS)->value = tcalloc((*pTS)->length, 1); (*pTS)->value = tcalloc((*pTS)->length, 1);
if ((*pTS)->value == NULL) {
tfree((*pTS)->key);
tfree(*pTS);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy((*pTS)->value, &tsVal, (*pTS)->length); memcpy((*pTS)->value, &tsVal, (*pTS)->length);
*num_kvs += 1; *num_kvs += 1;
...@@ -541,7 +587,6 @@ int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, ...@@ -541,7 +587,6 @@ int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs,
cJSON *metricVal = cJSON_GetObjectItem(root, "value"); cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) { if (metricVal == NULL) {
tscError("OTD:0x%"PRIx64" failed to parse metric value from JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
...@@ -566,7 +611,6 @@ int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char ** ...@@ -566,7 +611,6 @@ int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
//only pick up the first ID value as child table name //only pick up the first ID value as child table name
cJSON *id = cJSON_GetObjectItem(tags, "ID"); cJSON *id = cJSON_GetObjectItem(tags, "ID");
if (id != NULL) { if (id != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册