From ea7ca9b2419646b25c4cb0613885f7b7f41bf0de Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 30 Aug 2021 16:08:08 +0800 Subject: [PATCH] [TD-6442]: Support OpenTSDB telnet style data import format --- src/client/inc/tscParseLine.h | 5 ++ src/client/src/tscParseOpenTSDB.c | 84 +++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/src/client/inc/tscParseLine.h b/src/client/inc/tscParseLine.h index d33a475511..28e5362932 100644 --- a/src/client/inc/tscParseLine.h +++ b/src/client/inc/tscParseLine.h @@ -52,14 +52,19 @@ typedef struct { SHashObj* smlDataToSchema; } SSmlLinesInfo; +int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info); int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint); bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info); int32_t isValidChildTableName(const char *pTbName, int16_t len); + bool convertSmlValueType(TAOS_SML_KV *pVal, char *value, uint16_t len, SSmlLinesInfo* info); + int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value, uint16_t len, SSmlLinesInfo* info); +void destroySmlDataPoint(TAOS_SML_DATA_POINT* point); + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscParseOpenTSDB.c b/src/client/src/tscParseOpenTSDB.c index dbff93fa96..c4e175a0bc 100644 --- a/src/client/src/tscParseOpenTSDB.c +++ b/src/client/src/tscParseOpenTSDB.c @@ -13,6 +13,18 @@ #include "tscParseLine.h" //========================================================================= // telnet style API parser +static uint64_t HandleId = 0; + +uint64_t genUID() { + uint64_t id; + + do { + id = atomic_add_fetch_64(&HandleId, 1); + } while (id == 0); + + return id; +} + static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, SSmlLinesInfo* info) { const char *cur = *index; uint16_t len = 0; @@ -317,3 +329,75 @@ int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlL return TSDB_CODE_SUCCESS; } +int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) { + for (int32_t i = 0; i < numLines; ++i) { + TAOS_SML_DATA_POINT point = {0}; + int32_t code = tscParseTelnetLine(lines[i], &point, info); + if (code != TSDB_CODE_SUCCESS) { + tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]); + destroySmlDataPoint(&point); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } else { + tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i); + } + + taosArrayPush(points, &point); + } + return 0; +} + +int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) { + int32_t code = 0; + + SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo)); + info->id = genUID(); + + if (numLines <= 0 || numLines > 65536) { + tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); + code = TSDB_CODE_TSC_APP_ERROR; + return code; + } + + for (int i = 0; i < numLines; ++i) { + if (lines[i] == NULL) { + tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i); + free(info); + code = TSDB_CODE_TSC_APP_ERROR; + return code; + } + } + + SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT)); + if (lpPoints == NULL) { + tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); + free(info); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]); + code = tscParseTelnetLines(lines, numLines, lpPoints, NULL, info); + size_t numPoints = taosArrayGetSize(lpPoints); + + if (code != 0) { + goto cleanup; + } + + TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints); + code = tscSmlInsert(taos, points, (int)numPoints, info); + if (code != 0) { + tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code))); + } + +cleanup: + tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code); + points = TARRAY_GET_START(lpPoints); + numPoints = taosArrayGetSize(lpPoints); + for (int i=0; i