diff --git a/src/modules/http/src/tgHandle.c b/src/modules/http/src/tgHandle.c index 250dd27096a5cd7062e1da47b8c989a7fe1c5152..db64b4601f08f9dcd313cc58db90f52bdc5501ce 100644 --- a/src/modules/http/src/tgHandle.c +++ b/src/modules/http/src/tgHandle.c @@ -19,6 +19,43 @@ #include "tgJson.h" #include "tsdb.h" +/* + * taos.telegraf.cfg formats like + { + "metrics": [ + { + "name" : "system", + "tbname" : "system_uptime", + "fields": [ + "uptime" + ] + }, + { + "name": "system", + "tbname" : "system_uptime_format", + "fields": [ + "uptime_format" + ] + }, + { + "name": "swap", + "tbname" : "swap_in", + "fields": [ + "in" + ] + }, + { + "name": "cpu", + "tbname" : "cpu_time", + "fields": [ + "time_active", + "time_guest" + ] + } + ] + } + */ + #define TG_MAX_SORT_TAG_SIZE 20 static HttpDecodeMethod tgDecodeMethod = {"telegraf", tgProcessRquest}; @@ -26,80 +63,72 @@ static HttpEncodeMethod tgQueryMethod = {tgStartQueryJson, tgStopQueryJs tgBuildSqlAffectRowsJson, tgInitQueryJson, tgCleanQueryJson, tgCheckFinished, tgSetNextCmd}; -typedef struct { - char *tagName; - char *tagAlias; - char *tagType; -} STgTag; +static const char DEFAULT_TELEGRAF_CFG[] = + "{\"metrics\":[" + "{\"name\":\"system\",\"tbname\":\"system_uptime\",\"fields\":[\"uptime\"]}," + "{\"name\":\"system\",\"tbname\":\"system_uptime_format\",\"fields\":[\"uptime_format\"]}," + "{\"name\":\"swap\",\"tbname\":\"swap_in\",\"fields\":[\"in\"]}," + "{\"name\":\"cpu\",\"tbname\":\"cpu_time\",\"fields\":[\"time_active\",\"time_guest\"]}" + "]}"; typedef struct { - char *fieldName; - char *fieldAlias; - char *fieldType; -} STgField; + char *name; + char *tbName; + char **fields; + int fieldNum; +} STgSchema; typedef struct { - char * stName; - char * stAlias; - STgTag * tags; - STgField *fields; - int16_t tagNum; - int16_t fieldNum; - char * createSTableStr; -} STgStable; + STgSchema *schemas; + int size; + int pos; +} STgSchemas; -/* - * hash of STgStable - */ -static void *tgSchemaHash = NULL; +static STgSchemas tgSchemas = {0}; -/* - * formats like - * behind the midline is an alias of field/tag/stable - { - "metrics": [{ - "name": "win_cpu-cpu", - "fields": { - "Percent_DPC_Time": "float", - "Percent_Idle_Time": "float", - "Percent_Interrupt_Time": "float", - "Percent_Privileged_Time": "float", - "Percent_Processor_Time": "float", - "Percent_User_Time": "float" - }, - "tags": { - "host": "binary(32)", - "instance": "binary(32)", - "objectname": "binary(32)" +void tgFreeSchema(STgSchema *schema) { + if (schema->name != NULL) { + free(schema->name); + schema->name = NULL; + } + if (schema->tbName != NULL) { + free(schema->tbName); + schema->tbName = NULL; + } + if (schema->fields != NULL) { + for (int f = 0; f < schema->fieldNum; ++f) { + if (schema->fields[f] != NULL) { + free(schema->fields[f]); + schema->fields[f] = NULL; + } } - }, - { - "fields": { - "Bytes_Received_persec-f1": "float", - "Bytes_Sent_persec-f2": "float", - "Packets_Outbound_Discarded-f3": "float", - "Packets_Outbound_Errors-f4": "float", - "Packets_Received_Discarded-f5": "float", - "Packets_Received_Errors": "float", - "Packets_Received_persec": "float", - "Packets_Sent_persec": "float" - }, - "name": "win_net", - "tags": { - "host": "binary(32)", - "instance": "binary(32)", - "objectname": "binary(32)" - }, - "timestamp": 1536219762000 - }] + free(schema->fields); + schema->fields = NULL; + schema->fieldNum = 0; } - */ -void tgReadSchemaMetric(cJSON *metric) { - STgStable stable = {0}; - int createSTableStrLen = 100; - bool parsedOk = true; +} - // stable name +void tgFreeSchemas() { + if (tgSchemas.schemas != NULL) { + for (int s = 0; s < tgSchemas.size; ++s) { + tgFreeSchema(&tgSchemas.schemas[s]); + } + free(tgSchemas.schemas); + tgSchemas.size = 0; + } +} + +void tgInitSchemas(int size) { + tgFreeSchemas(); + tgSchemas.schemas = calloc(sizeof(STgSchema), size); + tgSchemas.size = 0; +} + +void tgParseSchemaMetric(cJSON *metric) { + STgSchema schema = {0}; + bool parsedOk = true; + + // name cJSON *name = cJSON_GetObjectItem(metric, "name"); if (name == NULL) { parsedOk = false; @@ -118,331 +147,143 @@ void tgReadSchemaMetric(cJSON *metric) { parsedOk = false; goto ParseEnd; } - int aliasPos = -1; - for (int i = 0; i < nameLen - 1; ++i) { - if (name->valuestring[i] == '-') { - aliasPos = i; - break; - } + + schema.name = calloc(nameLen + 1, 1); + strcpy(schema.name, name->valuestring); + + // tbname + cJSON *tbname = cJSON_GetObjectItem(metric, "tbname"); + if (tbname == NULL) { + parsedOk = false; + goto ParseEnd; } - if (aliasPos == -1) { - stable.stName = stable.stAlias = calloc((size_t)nameLen + 1, 1); - strcpy(stable.stName, name->valuestring); - createSTableStrLen += nameLen; - } else { - stable.stName = calloc((size_t)aliasPos + 1, 1); - stable.stAlias = calloc((size_t)(nameLen - aliasPos), 1); - strncpy(stable.stName, name->valuestring, (size_t)aliasPos); - strncpy(stable.stAlias, name->valuestring + aliasPos + 1, (size_t)(nameLen - aliasPos - 1)); - createSTableStrLen += (nameLen - aliasPos); + if (tbname->type != cJSON_String) { + parsedOk = false; + goto ParseEnd; } - - // tags - cJSON *tags = cJSON_GetObjectItem(metric, "tags"); - if (tags == NULL) { + if (tbname->valuestring == NULL) { parsedOk = false; goto ParseEnd; } - int tagsSize = cJSON_GetArraySize(tags); - if (tagsSize <= 0 || tagsSize > TSDB_MAX_TAGS) { + int tbnameLen = (int)strlen(tbname->valuestring); + if (tbnameLen == 0) { parsedOk = false; goto ParseEnd; } - stable.tags = calloc(sizeof(STgTag), (size_t)tagsSize); - stable.tagNum = (int16_t)tagsSize; - for (int i = 0; i < tagsSize; i++) { - STgTag *tagSchema = &stable.tags[i]; - cJSON * tag = cJSON_GetArrayItem(tags, i); - if (tag == NULL) { - parsedOk = false; - goto ParseEnd; - } - if (tag->string == NULL) { - parsedOk = false; - goto ParseEnd; - } - int nameLen = (int)strlen(tag->string); - if (nameLen == 0 || nameLen > TSDB_METER_NAME_LEN) { - parsedOk = false; - goto ParseEnd; - } - int aliasPos = -1; - for (int i = 0; i < nameLen - 1; ++i) { - if (tag->string[i] == '-') { - aliasPos = i; - break; - } - } - if (aliasPos == -1) { - tagSchema->tagName = calloc((size_t)nameLen + 1, 1); - strcpy(tagSchema->tagName, tag->string); - tagSchema->tagAlias = calloc((size_t)nameLen + 3, 1); - strcpy(tagSchema->tagAlias, "t_"); - strcpy(tagSchema->tagAlias + 2, tag->string); - createSTableStrLen += (nameLen + 4); - } else { - tagSchema->tagName = calloc((size_t)aliasPos + 1, 1); - tagSchema->tagAlias = calloc((size_t)(nameLen - aliasPos), 1); - strncpy(tagSchema->tagName, tag->string, (size_t)aliasPos); - strncpy(tagSchema->tagAlias, tag->string + aliasPos + 1, (size_t)(nameLen - aliasPos - 1)); - createSTableStrLen += (nameLen - aliasPos + 2); - } - if (tag->type == cJSON_String) { - if (tag->valuestring == NULL) { - parsedOk = false; - goto ParseEnd; - } - int valueLen = (int)strlen(tag->valuestring); - if (valueLen == 0) { - parsedOk = false; - goto ParseEnd; - } - if (strcasecmp(tag->valuestring, "timestamp") == 0 || strcasecmp(tag->valuestring, "bool") == 0 || - strcasecmp(tag->valuestring, "tinyint") == 0 || strcasecmp(tag->valuestring, "smallint") == 0 || - strcasecmp(tag->valuestring, "int") == 0 || strcasecmp(tag->valuestring, "bigint") == 0 || - strcasecmp(tag->valuestring, "float") == 0 || strcasecmp(tag->valuestring, "double") == 0 || - strncasecmp(tag->valuestring, "binary", 6) == 0 || strncasecmp(tag->valuestring, "nchar", 5) == 0) { - tagSchema->tagType = calloc((size_t)valueLen + 1, 1); - strcpy(tagSchema->tagType, tag->valuestring); - createSTableStrLen += valueLen; - } else { - tagSchema->tagType = calloc(11, 1); - strcpy(tagSchema->tagType, "binary(32)"); - createSTableStrLen += 12; - } - } else if (tag->type == cJSON_False || tag->type == cJSON_True) { - tagSchema->tagType = calloc(8, 1); - strcpy(tagSchema->tagType, "tinyint"); - createSTableStrLen += 10; - } else { - tagSchema->tagType = calloc(7, 1); - strcpy(tagSchema->tagType, "bigint"); - createSTableStrLen += 9; - } - } + schema.tbName = calloc(tbnameLen + 1, 1); + strcpy(schema.tbName, tbname->valuestring); - // fields + // fields cJSON *fields = cJSON_GetObjectItem(metric, "fields"); if (fields == NULL) { - parsedOk = false; goto ParseEnd; } int fieldSize = cJSON_GetArraySize(fields); if (fieldSize <= 0 || fieldSize > TSDB_MAX_COLUMNS) { - parsedOk = false; goto ParseEnd; } - stable.fields = calloc(sizeof(STgField), (size_t)fieldSize); - stable.fieldNum = (int16_t)fieldSize; - for (int i = 0; i < fieldSize; i++) { - STgField *fieldSchema = &stable.fields[i]; - cJSON * field = cJSON_GetArrayItem(fields, i); - if (field == NULL) { - parsedOk = false; - goto ParseEnd; - } - if (field->string == NULL) { - parsedOk = false; - goto ParseEnd; - } - int nameLen = (int)strlen(field->string); - if (nameLen == 0 || nameLen > TSDB_METER_NAME_LEN) { - parsedOk = false; - goto ParseEnd; - } - int aliasPos = -1; - for (int i = 0; i < nameLen - 1; ++i) { - if (field->string[i] == '-') { - aliasPos = i; - break; - } - } - if (aliasPos == -1) { - fieldSchema->fieldName = calloc((size_t)nameLen + 1, 1); - strcpy(fieldSchema->fieldName, field->string); - fieldSchema->fieldAlias = calloc((size_t)nameLen + 3, 1); - strcpy(fieldSchema->fieldAlias, "f_"); - strcpy(fieldSchema->fieldAlias + 2, field->string); - createSTableStrLen += (nameLen + 4); - } else { - fieldSchema->fieldName = calloc((size_t)aliasPos + 1, 1); - fieldSchema->fieldAlias = calloc((size_t)(nameLen - aliasPos), 1); - strncpy(fieldSchema->fieldName, field->string, (size_t)aliasPos); - strncpy(fieldSchema->fieldAlias, field->string + aliasPos + 1, (size_t)(nameLen - aliasPos - 1)); - createSTableStrLen += (nameLen - aliasPos + 2); - } - if (field->type == cJSON_String) { - if (field->valuestring == NULL) { + if (fieldSize > 0) { + schema.fields = calloc(sizeof(STgSchema), (size_t)fieldSize); + schema.fieldNum = fieldSize; + for (int i = 0; i < fieldSize; i++) { + cJSON *field = cJSON_GetArrayItem(fields, i); + if (field == NULL) { parsedOk = false; goto ParseEnd; } - int valueLen = (int)strlen(field->valuestring); - if (valueLen == 0) { + if (field->valuestring == NULL) { parsedOk = false; goto ParseEnd; } - if (strcasecmp(field->valuestring, "timestamp") == 0 || strcasecmp(field->valuestring, "bool") == 0 || - strcasecmp(field->valuestring, "tinyint") == 0 || strcasecmp(field->valuestring, "smallint") == 0 || - strcasecmp(field->valuestring, "int") == 0 || strcasecmp(field->valuestring, "bigint") == 0 || - strcasecmp(field->valuestring, "float") == 0 || strcasecmp(field->valuestring, "double") == 0 || - strncasecmp(field->valuestring, "binary", 6) == 0 || strncasecmp(field->valuestring, "nchar", 5) == 0) { - fieldSchema->fieldType = calloc((size_t)valueLen + 1, 1); - strcpy(fieldSchema->fieldType, field->valuestring); - createSTableStrLen += valueLen; - } else { - fieldSchema->fieldType = calloc(11, 1); - strcpy(fieldSchema->fieldType, "binary(32)"); - createSTableStrLen += 12; + int nameLen = (int)strlen(field->valuestring); + if (nameLen == 0 || nameLen > TSDB_METER_NAME_LEN) { + parsedOk = false; + goto ParseEnd; } - } else if (field->type == cJSON_False || field->type == cJSON_True) { - fieldSchema->fieldType = calloc(8, 1); - strcpy(fieldSchema->fieldType, "tinyint"); - createSTableStrLen += 10; - } else { - fieldSchema->fieldType = calloc(7, 1); - strcpy(fieldSchema->fieldType, "double"); - createSTableStrLen += 9; + schema.fields[i] = calloc(nameLen + 1, 1); + strcpy(schema.fields[i], field->valuestring); } } - // assembling create stable sql - stable.createSTableStr = calloc((size_t)createSTableStrLen, 1); - strcpy(stable.createSTableStr, "create table if not exists %s.%s(ts timestamp"); - int len = (int)strlen(stable.createSTableStr); - for (int i = 0; i < stable.fieldNum; ++i) { - STgField *field = &stable.fields[i]; - len += sprintf(stable.createSTableStr + len, ",%s %s", field->fieldAlias, field->fieldType); - } - len += sprintf(stable.createSTableStr + len, ") tags("); - for (int i = 0; i < stable.tagNum; ++i) { - STgTag *tag = &stable.tags[i]; - if (i == 0) { - len += sprintf(stable.createSTableStr + len, "%s %s", tag->tagAlias, tag->tagType); - } else { - len += sprintf(stable.createSTableStr + len, ",%s %s", tag->tagAlias, tag->tagType); - } - } - sprintf(stable.createSTableStr + len, ")"); - ParseEnd: - if (parsedOk) { - taosAddStrHash(tgSchemaHash, stable.stName, (char *)(&stable)); + tgSchemas.schemas[tgSchemas.size++] = schema; } else { - if (stable.stName != NULL) { - free(stable.stName); - } - if (stable.stAlias != NULL) { - free(stable.stName); - } - for (int i = 0; i < stable.tagNum; ++i) { - if (stable.tags[i].tagName != NULL) { - free(stable.tags[i].tagName); - } - if (stable.tags[i].tagAlias != NULL) { - free(stable.tags[i].tagAlias); - } - if (stable.tags[i].tagType != NULL) { - free(stable.tags[i].tagType); - } - } - if (stable.tags != NULL) { - free(stable.tags); - } - for (int i = 0; i < stable.fieldNum; ++i) { - if (stable.fields[i].fieldName != NULL) { - free(stable.fields[i].fieldName); - } - if (stable.fields[i].fieldAlias != NULL) { - free(stable.fields[i].fieldAlias); - } - if (stable.fields[i].fieldType != NULL) { - free(stable.fields[i].fieldType); - } - } - if (stable.fields != NULL) { - free(stable.fields); - } - if (stable.createSTableStr != NULL) { - free(stable.createSTableStr); - } + tgFreeSchema(&schema); } } -int tgReadSchema(const char *fileName) { - FILE *fp = fopen(fileName, "r"); - if (fp == NULL) { - //httpTrace("failed to open telegraf schema config file:%s, use default schema", fileName); - return -1; - } - httpPrint("open telegraf schema config file:%s successfully", fileName); - - fseek(fp, 0, SEEK_END); - size_t contentSize = (size_t)ftell(fp); - rewind(fp); - char * content = (char *)calloc(contentSize * sizeof(char) + 1, 1); - size_t result = fread(content, 1, contentSize, fp); - if (result != contentSize) { - httpError("failed to read telegraf schema config file:%s, use default schema", fileName); - return -1; - } - +int tgParseSchema(const char *content, char*fileName) { cJSON *root = cJSON_Parse(content); if (root == NULL) { - httpError("failed to parse telegraf schema config file:%s, invalid json format", fileName); + httpError("failed to parse telegraf schema file:%s, invalid json format, content:%s", fileName, content); return -1; } - + int size = 0; cJSON *metrics = cJSON_GetObjectItem(root, "metrics"); if (metrics != NULL) { - int size = cJSON_GetArraySize(metrics); + size = cJSON_GetArraySize(metrics); if (size <= 0) { - httpError("failed to parse telegraf schema config file:%s, metrics size is 0", fileName); + httpError("failed to parse telegraf schema file:%s, metrics size is 0", fileName); cJSON_Delete(root); return -1; } + tgInitSchemas(size); for (int i = 0; i < size; i++) { cJSON *metric = cJSON_GetArrayItem(metrics, i); if (metric != NULL) { - tgReadSchemaMetric(metric); + tgParseSchemaMetric(metric); } } } else { - tgReadSchemaMetric(root); + size = 1; + tgInitSchemas(size); + tgParseSchemaMetric(root); } cJSON_Delete(root); + return size; +} + +int tgReadSchema(const char *fileName) { + FILE *fp = fopen(fileName, "r"); + if (fp == NULL) { + return -1; + } + + httpPrint("open telegraf schema file:%s success", fileName); + fseek(fp, 0, SEEK_END); + size_t contentSize = (size_t)ftell(fp); + rewind(fp); + char *content = (char *)calloc(contentSize * sizeof(char) + 1, 1); + size_t result = fread(content, 1, contentSize, fp); + if (result != contentSize) { + httpError("failed to read telegraf schema file:%s", fileName); + return -1; + } + + int schemaNum = tgParseSchema(content, fileName); + free(content); fclose(fp); + httpPrint("parse telegraf schema file:%s, schema size:%d", fileName, schemaNum); - httpPrint("parse telegraf schema config file:%s successfully, stable schema size:%d", fileName); - return 0; + return schemaNum; } -/* - * in case of file not exist - * we use default schema: - * such as: - * diskio - * mem - * processes - * procstat - * system - * disk - * swap - * kernel - */ void tgInitHandle(HttpServer *pServer) { - tgSchemaHash = taosInitStrHash(100, sizeof(STgStable), taosHashStringStep1); char fileName[256] = {0}; sprintf(fileName, "%s/taos.telegraf.cfg", configDir); - if (tgReadSchema(fileName) == -1) { - taosCleanUpStrHash(tgSchemaHash); - tgSchemaHash = NULL; + if (tgReadSchema(fileName) <= 0) { + tgFreeSchemas(); + if (tgParseSchema(DEFAULT_TELEGRAF_CFG, "default") <= 0) { + tgFreeSchemas(); + } } + httpAddMethod(pServer, &tgDecodeMethod); } @@ -481,23 +322,57 @@ char *tgGetDbFromUrl(HttpContext *pContext) { return pParser->path[TG_DB_URL_POS].pos; } +char *tgGetStableName(char *stname, cJSON *fields, int fieldsSize) { + for (int s = 0; s < tgSchemas.size; ++s) { + STgSchema *schema = &tgSchemas.schemas[s]; + if (strcasecmp(schema->name, stname) != 0) { + continue; + } + + bool schemaMatched = true; + for (int f = 0; f < schema->fieldNum; ++f) { + char *fieldName = schema->fields[f]; + bool fieldMatched = false; + + for (int i = 0; i < fieldsSize; i++) { + cJSON *field = cJSON_GetArrayItem(fields, i); + if (strcasecmp(field->string, fieldName) == 0) { + fieldMatched = true; + break; + } + } + + if (!fieldMatched) { + schemaMatched = false; + break; + } + } + + if (schemaMatched) { + return schema->tbName; + } + } + + return stname; +} + /* * parse single metric { - "fields": { - "field_1": 30, - "field_2": 4, - "field_N": 59, - "n_images": 660 - }, - "name": "docker", - "tags": { - "host": "raynor" - }, - "timestamp": 1458229140 + "fields": { + "field_1": 30, + "field_2": 4, + "field_N": 59, + "n_images": 660 + }, + "name": "docker", + "tags": { + "host": "raynor" + }, + "timestamp": 1458229140 } */ -bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric, char *db) { +bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { // metric name cJSON *name = cJSON_GetObjectItem(metric, "name"); if (name == NULL) { @@ -698,7 +573,7 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric, table_cmd->timestamp = stable_cmd->timestamp = httpAddToSqlCmdBuffer(pContext, "%ld", timestamp->valueint); // stable name - char *stname = name->valuestring; + char *stname = tgGetStableName(name->valuestring, fields, fieldsSize); table_cmd->metric = stable_cmd->metric = httpAddToSqlCmdBuffer(pContext, "%s", stname); if (tsTelegrafUseFieldNum == 0) { table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s", stname); @@ -851,267 +726,6 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric, return true; } -bool tgProcessSingleMetricUseConfigSchema(HttpContext *pContext, cJSON *metric, char *db) { - // metric name - cJSON *name = cJSON_GetObjectItem(metric, "name"); - if (name == NULL) { - httpSendErrorResp(pContext, HTTP_TG_METRIC_NULL); - return false; - } - if (name->type != cJSON_String) { - httpSendErrorResp(pContext, HTTP_TG_METRIC_TYPE); - return false; - } - if (name->valuestring == NULL) { - httpSendErrorResp(pContext, HTTP_TG_METRIC_NAME_NULL); - return false; - } - int nameLen = (int)strlen(name->valuestring); - if (nameLen == 0) { - httpSendErrorResp(pContext, HTTP_TG_METRIC_NAME_NULL); - return false; - } - STgStable *stable = (STgStable *)taosGetStrHashData(tgSchemaHash, name->valuestring); - if (stable == NULL) { - httpSendErrorResp(pContext, HTTP_TG_STABLE_NOT_EXIST); - return false; - } - - // timestamp - cJSON *timestamp = cJSON_GetObjectItem(metric, "timestamp"); - if (timestamp == NULL) { - httpSendErrorResp(pContext, HTTP_TG_TIMESTAMP_NULL); - return false; - } - if (timestamp->type != cJSON_Number) { - httpSendErrorResp(pContext, HTTP_TG_TIMESTAMP_TYPE); - return false; - } - if (timestamp->valueint <= 0) { - httpSendErrorResp(pContext, HTTP_TG_TIMESTAMP_VAL_NULL); - return false; - } - - // tags - cJSON *tags = cJSON_GetObjectItem(metric, "tags"); - if (tags == NULL) { - httpSendErrorResp(pContext, HTTP_TG_TAGS_NULL); - return false; - } - int tagsSize = cJSON_GetArraySize(tags); - if (tagsSize <= 0) { - httpSendErrorResp(pContext, HTTP_TG_TAGS_SIZE_0); - return false; - } - for (int i = 0; i < tagsSize; i++) { - cJSON *tag = cJSON_GetArrayItem(tags, i); - if (tag == NULL) { - httpSendErrorResp(pContext, HTTP_TG_TAG_NULL); - return false; - } - if (tag->string == NULL || strlen(tag->string) == 0) { - httpSendErrorResp(pContext, HTTP_TG_TAG_NAME_NULL); - return false; - } - if (tag->type != cJSON_Number && tag->type != cJSON_String) { - httpSendErrorResp(pContext, HTTP_TG_TAG_VALUE_TYPE); - return false; - } - if (tag->type == cJSON_String) { - if (tag->valuestring == NULL || strlen(tag->valuestring) == 0) { - httpSendErrorResp(pContext, HTTP_TG_TAG_VALUE_NULL); - return false; - } - } - } - - // fields - cJSON *fields = cJSON_GetObjectItem(metric, "fields"); - if (fields == NULL) { - httpSendErrorResp(pContext, HTTP_TG_FIELDS_NULL); - return false; - } - int fieldsSize = cJSON_GetArraySize(fields); - if (fieldsSize <= 0) { - httpSendErrorResp(pContext, HTTP_TG_FIELDS_SIZE_0); - return false; - } - for (int i = 0; i < fieldsSize; i++) { - cJSON *field = cJSON_GetArrayItem(fields, i); - if (field == NULL) { - httpSendErrorResp(pContext, HTTP_TG_FIELD_NULL); - return false; - } - if (field->string == NULL || strlen(field->string) == 0) { - httpSendErrorResp(pContext, HTTP_TG_FIELD_NAME_NULL); - return false; - } - if (field->type != cJSON_Number && field->type != cJSON_String) { - httpSendErrorResp(pContext, HTTP_TG_FIELD_VALUE_TYPE); - return false; - } - if (field->type == cJSON_String) { - if (field->valuestring == NULL || strlen(field->valuestring) == 0) { - httpSendErrorResp(pContext, HTTP_TG_FIELD_VALUE_NULL); - return false; - } - } - } - - // assembling cmds - HttpSqlCmd *stable_cmd = httpNewSqlCmd(pContext); - if (stable_cmd == NULL) { - httpSendErrorResp(pContext, HTTP_NO_ENOUGH_MEMORY); - return false; - } - stable_cmd->cmdType = HTTP_CMD_TYPE_CREATE_STBALE; - stable_cmd->cmdReturnType = HTTP_CMD_RETURN_TYPE_NO_RETURN; - - HttpSqlCmd *table_cmd = httpNewSqlCmd(pContext); - if (table_cmd == NULL) { - httpSendErrorResp(pContext, HTTP_NO_ENOUGH_MEMORY); - return false; - } - table_cmd->cmdType = HTTP_CMD_TYPE_INSERT; - table_cmd->tagNum = stable_cmd->tagNum = (int8_t)stable->tagNum; - table_cmd->timestamp = stable_cmd->timestamp = httpAddToSqlCmdBuffer(pContext, "%ld", timestamp->valueint); - - // stable name - char *stname = stable->stAlias; - - table_cmd->metric = stable_cmd->metric = httpAddToSqlCmdBuffer(pContext, "%s", stname); - table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s", stname); - table_cmd->stable = stable_cmd->stable = - httpShrinkTableName(pContext, table_cmd->stable, httpGetCmdsString(pContext, table_cmd->stable)); - - // stable tag for detail - for (int ts = 0; ts < stable->tagNum; ++ts) { - STgTag *tagSchema = &stable->tags[ts]; - bool tagParsed = false; - for (int tt = 0; tt < tagsSize; ++tt) { - cJSON *tag = cJSON_GetArrayItem(tags, tt); - if (strcasecmp(tag->string, tagSchema->tagName) != 0) { - continue; - } - - stable_cmd->tagNames[ts] = table_cmd->tagNames[ts] = httpAddToSqlCmdBuffer(pContext, tagSchema->tagAlias); - - if (tag->type == cJSON_String) { - stable_cmd->tagValues[ts] = table_cmd->tagValues[ts] = - httpAddToSqlCmdBuffer(pContext, "'%s'", tag->valuestring); - tagParsed = true; - } else if (tag->type == cJSON_Number) { - stable_cmd->tagValues[ts] = table_cmd->tagValues[ts] = httpAddToSqlCmdBuffer(pContext, "%ld", tag->valueint); - tagParsed = true; - } else if (tag->type == cJSON_True) { - stable_cmd->tagValues[ts] = table_cmd->tagValues[ts] = httpAddToSqlCmdBuffer(pContext, "1"); - tagParsed = true; - } else if (tag->type == cJSON_False) { - stable_cmd->tagValues[ts] = table_cmd->tagValues[ts] = httpAddToSqlCmdBuffer(pContext, "0"); - tagParsed = true; - } else { - } - - break; - } - - if (!tagParsed) { - stable_cmd->tagValues[ts] = table_cmd->tagValues[ts] = httpAddToSqlCmdBuffer(pContext, "NULL"); - } - } - - // table name - table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s", stname); - for (int ts = 0; ts < stable->tagNum; ++ts) { - STgTag *tagSchema = &stable->tags[ts]; - bool tagParsed = false; - for (int tt = 0; tt < tagsSize; ++tt) { - cJSON *tag = cJSON_GetArrayItem(tags, tt); - if (strcasecmp(tag->string, tagSchema->tagName) != 0) { - continue; - } - - if (tag->type == cJSON_String) { - httpAddToSqlCmdBufferNoTerminal(pContext, "_%s", tag->valuestring); - tagParsed = true; - } else if (tag->type == cJSON_Number) { - httpAddToSqlCmdBufferNoTerminal(pContext, "_%ld", tag->valueint); - tagParsed = true; - } else if (tag->type == cJSON_True) { - httpAddToSqlCmdBufferNoTerminal(pContext, "_1"); - tagParsed = true; - } else if (tag->type == cJSON_False) { - httpAddToSqlCmdBufferNoTerminal(pContext, "_0"); - tagParsed = true; - } else { - } - - break; - } - - if (!tagParsed) { - stable_cmd->tagValues[ts] = table_cmd->tagValues[ts] = httpAddToSqlCmdBufferNoTerminal(pContext, "_n"); - } - } - httpAddToSqlCmdBuffer(pContext, ""); - table_cmd->table = stable_cmd->table = - httpShrinkTableName(pContext, table_cmd->table, httpGetCmdsString(pContext, table_cmd->table)); - - // assembling create stable sql - stable_cmd->sql = - httpAddToSqlCmdBuffer(pContext, stable->createSTableStr, db, httpGetCmdsString(pContext, table_cmd->stable)); - - // assembling insert sql - table_cmd->sql = httpAddToSqlCmdBufferNoTerminal(pContext, "import into %s.%s using %s.%s tags(", db, - httpGetCmdsString(pContext, table_cmd->table), db, - httpGetCmdsString(pContext, table_cmd->stable)); - for (int ts = 0; ts < stable->tagNum; ++ts) { - if (ts != 0) { - httpAddToSqlCmdBufferNoTerminal(pContext, ",%s", httpGetCmdsString(pContext, stable_cmd->tagValues[ts])); - } else { - httpAddToSqlCmdBufferNoTerminal(pContext, "%s", httpGetCmdsString(pContext, stable_cmd->tagValues[ts])); - } - } - - httpAddToSqlCmdBufferNoTerminal(pContext, ") values(%ld", timestamp->valueint); - - // stable tag for detail - for (int fs = 0; fs < stable->fieldNum; ++fs) { - STgField *fieldSchema = &stable->fields[fs]; - bool fieldParsed = false; - for (int ff = 0; ff < fieldsSize; ++ff) { - cJSON *field = cJSON_GetArrayItem(fields, ff); - if (strcasecmp(field->string, fieldSchema->fieldName) != 0) { - continue; - } - - if (field->type == cJSON_String) { - httpAddToSqlCmdBufferNoTerminal(pContext, ",\"%s\"", field->valuestring); - fieldParsed = true; - } else if (field->type == cJSON_Number) { - httpAddToSqlCmdBufferNoTerminal(pContext, ",%lf", field->valuedouble); - fieldParsed = true; - } else if (field->type == cJSON_True) { - httpAddToSqlCmdBufferNoTerminal(pContext, ",1"); - fieldParsed = true; - } else if (field->type == cJSON_False) { - httpAddToSqlCmdBufferNoTerminal(pContext, ",0"); - fieldParsed = true; - } else { - } - - break; - } - - if (!fieldParsed) { - httpAddToSqlCmdBufferNoTerminal(pContext, ",NULL"); - } - } - httpAddToSqlCmdBuffer(pContext, ")"); - - return true; -} - /** * request from telegraf 1.7.0 * single request: @@ -1213,16 +827,9 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) { for (int i = 0; i < size; i++) { cJSON *metric = cJSON_GetArrayItem(metrics, i); if (metric != NULL) { - if (tgSchemaHash != NULL) { - if (!tgProcessSingleMetricUseConfigSchema(pContext, metric, db)) { - cJSON_Delete(root); - return false; - } - } else { - if (!tgProcessSingleMetricUseDefaultSchema(pContext, metric, db)) { - cJSON_Delete(root); - return false; - } + if (!tgProcessSingleMetric(pContext, metric, db)) { + cJSON_Delete(root); + return false; } } } @@ -1245,16 +852,9 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) { cmd->cmdReturnType = HTTP_CMD_RETURN_TYPE_NO_RETURN; cmd->sql = httpAddToSqlCmdBuffer(pContext, "create database if not exists %s", db); - if (tgSchemaHash != NULL) { - if (!tgProcessSingleMetricUseConfigSchema(pContext, root, db)) { - cJSON_Delete(root); - return false; - } - } else { - if (!tgProcessSingleMetricUseDefaultSchema(pContext, root, db)) { - cJSON_Delete(root); - return false; - } + if (!tgProcessSingleMetric(pContext, root, db)) { + cJSON_Delete(root); + return false; } } @@ -1268,9 +868,6 @@ bool tgProcessQueryRequest(HttpContext *pContext, char *db) { } bool tgProcessRquest(struct HttpContext *pContext) { - tgGetUserFromUrl(pContext); - tgGetPassFromUrl(pContext); - if (strlen(pContext->user) == 0 || strlen(pContext->pass) == 0) { httpSendErrorResp(pContext, HTTP_PARSE_USR_ERROR); return false;