From 4ae20f6abcb1194ef8db8b6e8e5b8445311562f8 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 30 Aug 2021 16:08:08 +0800 Subject: [PATCH] [TD-6442]: Support OpenTSDB telnet style data import format --- src/client/src/tscParseOpenTSDB.c | 69 ++++++++++++++++++------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/src/client/src/tscParseOpenTSDB.c b/src/client/src/tscParseOpenTSDB.c index 6bdc6ea7f3..80d3d28589 100644 --- a/src/client/src/tscParseOpenTSDB.c +++ b/src/client/src/tscParseOpenTSDB.c @@ -17,7 +17,7 @@ /* telnet style API parser */ static uint64_t HandleId = 0; -uint64_t genUID() { +static uint64_t genUID() { uint64_t id; do { @@ -36,14 +36,14 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, return TSDB_CODE_TSC_OUT_OF_MEMORY; } if (isdigit(*cur)) { - tscError("SML:0x%"PRIx64" Metric cannnot start with digit", info->id); + tscError("OTD:0x%"PRIx64" Metric cannnot start with digit", info->id); tfree(pSml->stableName); return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } while (*cur != '\0') { if (len > TSDB_TABLE_NAME_LEN) { - tscError("SML:0x%"PRIx64" Metric cannot exceeds 193 characters", info->id); + tscError("OTD:0x%"PRIx64" Metric cannot exceeds 193 characters", info->id); tfree(pSml->stableName); return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } @@ -56,9 +56,14 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, cur++; len++; } + if (len == 0) { + tfree(pSml->stableName); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + pSml->stableName[len] = '\0'; *index = cur + 1; - tscDebug("SML:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len); + tscDebug("OTD:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len); return TSDB_CODE_SUCCESS; } @@ -138,7 +143,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch } if (!convertSmlValueType(pVal, value, len, info)) { - tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type", + tscError("OTD:0x%"PRIx64" Failed to convert metric value string(%s) to any type", info->id, value); tfree(value); tfree(*pKVs); @@ -161,12 +166,12 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj //key field cannot start with digit if (isdigit(*cur)) { - tscError("SML:0x%"PRIx64" Tag key cannnot start with digit", info->id); + tscError("OTD:0x%"PRIx64" Tag key cannnot start with digit", info->id); return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } while (*cur != '\0') { if (len > TSDB_COL_NAME_LEN) { - tscError("SML:0x%"PRIx64" Key field cannot exceeds 65 characters", info->id); + tscError("OTD:0x%"PRIx64" Tag key cannot exceeds 65 characters", info->id); return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } if (*cur == '=') { @@ -177,6 +182,9 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj cur++; len++; } + if (len == 0) { + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } key[len] = '\0'; if (checkDuplicateKey(key, pHash, info)) { @@ -185,13 +193,13 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj pKV->key = tcalloc(len + 1, 1); memcpy(pKV->key, key, len + 1); - //tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len); + //tscDebug("OTD:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len); *index = cur + 1; return TSDB_CODE_SUCCESS; } -static bool parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index, +static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index, bool *is_last_kv, SSmlLinesInfo* info) { const char *start, *cur; char *value = NULL; @@ -209,11 +217,16 @@ static bool parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index, len++; } + if (len == 0) { + tfree(pKV->key); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + value = tcalloc(len + 1, 1); memcpy(value, start, len); value[len] = '\0'; if (!convertSmlValueType(pKV, value, len, info)) { - tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type", + tscError("OTD:0x%"PRIx64" Failed to convert sml value string(%s) to any type", info->id, value); //free previous alocated key field tfree(pKV->key); @@ -241,12 +254,12 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs, while (*cur != '\0') { ret = parseTelnetTagKey(pkv, &cur, pHash, info); if (ret) { - tscError("SML:0x%"PRIx64" Unable to parse key", info->id); + tscError("OTD:0x%"PRIx64" Unable to parse key", info->id); return ret; } ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info); if (ret) { - tscError("SML:0x%"PRIx64" Unable to parse value", info->id); + tscError("OTD:0x%"PRIx64" Unable to parse value", info->id); return ret; } if ((strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) { @@ -292,36 +305,36 @@ int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlL //Parse metric ret = parseTelnetMetric(smlData, &index, info); if (ret) { - tscError("SML:0x%"PRIx64" Unable to parse metric", info->id); + tscError("OTD:0x%"PRIx64" Unable to parse metric", info->id); return ret; } - tscDebug("SML:0x%"PRIx64" Parse metric finished", info->id); + tscDebug("OTD:0x%"PRIx64" Parse metric finished", info->id); //Parse timestamp ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &index, info); if (ret) { - tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id); + tscError("OTD:0x%"PRIx64" Unable to parse timestamp", info->id); return ret; } - tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id); + tscDebug("OTD:0x%"PRIx64" Parse timestamp finished", info->id); //Parse value ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &index, info); if (ret) { - tscError("SML:0x%"PRIx64" Unable to parse metric value", info->id); + tscError("OTD:0x%"PRIx64" Unable to parse metric value", info->id); return ret; } - tscDebug("SML:0x%"PRIx64" Parse value finished", info->id); + tscDebug("OTD:0x%"PRIx64" Parse metric value finished", info->id); //Parse tagKVs SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); ret = parseTelnetTagKvs(&smlData->tags, &smlData->tagNum, &index, &smlData->childTableName, keyHashTable, info); if (ret) { - tscError("SML:0x%"PRIx64" Unable to parse tags", info->id); + tscError("OTD:0x%"PRIx64" Unable to parse tags", info->id); taosHashCleanup(keyHashTable); return ret; } - tscDebug("SML:0x%"PRIx64" Parse tags finished", info->id); + tscDebug("OTD:0x%"PRIx64" Parse tags finished", info->id); taosHashCleanup(keyHashTable); @@ -333,11 +346,11 @@ int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* TAOS_SML_DATA_POINT point = {0}; int32_t code = tscParseTelnetLine(lines[i], &point, info); if (code != TSDB_CODE_SUCCESS) { - tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]); + tscError("OTD:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]); destroySmlDataPoint(&point); return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } else { - tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i); + tscDebug("OTD:0x%"PRIx64" data point line parse success. line %d", info->id, i); } taosArrayPush(points, &point); @@ -352,14 +365,14 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) { info->id = genUID(); if (numLines <= 0 || numLines > 65536) { - tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); + tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); code = TSDB_CODE_TSC_APP_ERROR; return code; } for (int i = 0; i < numLines; ++i) { if (lines[i] == NULL) { - tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i); + tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i); tfree(info); code = TSDB_CODE_TSC_APP_ERROR; return code; @@ -368,12 +381,12 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) { SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT)); if (lpPoints == NULL) { - tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); + tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines failed to allocate memory", info->id); tfree(info); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]); + tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]); code = tscParseTelnetLines(lines, numLines, lpPoints, NULL, info); size_t numPoints = taosArrayGetSize(lpPoints); @@ -384,11 +397,11 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) { TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints); code = tscSmlInsert(taos, points, (int)numPoints, info); if (code != 0) { - tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code))); + tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines error: %s", info->id, tstrerror((code))); } cleanup: - tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code); + tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines finish inserting %d lines. code: %d", info->id, numLines, code); points = TARRAY_GET_START(lpPoints); numPoints = taosArrayGetSize(lpPoints); for (int i=0; i