From 368d50f29ed84c8a2380b002d5ef6acd5cd99835 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 30 Aug 2021 16:08:08 +0800 Subject: [PATCH] [TD-6442]: Support OpenTSDB telnet style data import format --- src/client/src/tscParseLineProtocol.c | 140 ++++++++++++++++++++++++-- 1 file changed, 134 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index d2b0bb847c..41e36b072a 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -2341,10 +2341,138 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch return ret; } +static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) { + const char *cur = *index; + char key[TSDB_COL_NAME_LEN + 1]; // +1 to avoid key[len] over write + uint16_t len = 0; + + //key field cannot start with digit + if (isdigit(*cur)) { + tscError("SML:0x%"PRIx64" Tag key cannnot start with digit", info->id); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + while (*cur != '\0') { + if (len > TSDB_COL_NAME_LEN) { + tscError("SML:0x%"PRIx64" Key field cannot exceeds 65 characters", info->id); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + if (*cur == '=') { + break; + } + + key[len] = *cur; + cur++; + len++; + } + key[len] = '\0'; + + if (checkDuplicateKey(key, pHash, info)) { + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + + pKV->key = calloc(len + 1, 1); + memcpy(pKV->key, key, len + 1); + //tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len); + *index = cur + 1; + return TSDB_CODE_SUCCESS; +} + + +static bool parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index, + bool *is_last_kv, SSmlLinesInfo* info) { + const char *start, *cur; + char *value = NULL; + uint16_t len = 0; + start = cur = *index; + + while (1) { + // ',' or '\0' identifies a value + if (*cur == ',' || *cur == '\0') { + // '\0' indicates end of value + *is_last_kv = (*cur == '\0') ? true : false; + break; + } + cur++; + len++; + } + + value = calloc(len + 1, 1); + memcpy(value, start, len); + value[len] = '\0'; + if (!convertSmlValueType(pKV, value, len, info)) { + tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type", + info->id, value); + //free previous alocated key field + free(pKV->key); + pKV->key = NULL; + free(value); + return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; + } + free(value); + + return TSDB_CODE_SUCCESS; +} + static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs, - const char **index, TAOS_SML_DATA_POINT* smlData, + const char **index, char* childTableName, SHashObj *pHash, SSmlLinesInfo* info) { - return 0; + const char *cur = *index; + int32_t ret = TSDB_CODE_SUCCESS; + TAOS_SML_KV *pkv; + bool is_last_kv = false; + + int32_t capacity = 4; + *pKVs = calloc(capacity, sizeof(TAOS_SML_KV)); + pkv = *pKVs; + + while (*cur != '\0') { + ret = parseTelnetTagKey(pkv, &cur, pHash, info); + if (ret) { + tscError("SML:0x%"PRIx64" Unable to parse key", info->id); + return ret; + } + ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info); + if (ret) { + tscError("SML:0x%"PRIx64" Unable to parse value", info->id); + return ret; + } + if ((strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) { + ret = isValidChildTableName(pkv->value, pkv->length); + if (ret) { + return ret; + } + childTableName = malloc(pkv->length + 1); + memcpy(childTableName, pkv->value, pkv->length); + childTableName[pkv->length] = '\0'; + free(pkv->key); + free(pkv->value); + } else { + *num_kvs += 1; + } + + if (is_last_kv) { + break; + } + + //reallocate addtional memory for more kvs + TAOS_SML_KV *more_kvs = NULL; + + if ((*num_kvs + 1) > capacity) { + capacity *= 3; capacity /= 2; + more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV)); + } else { + more_kvs = *pKVs; + } + + if (!more_kvs) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + *pKVs = more_kvs; + //move pKV points to next TAOS_SML_KV block + pkv = *pKVs + *num_kvs; + } + + return ret; } int32_t tscParseTelnetAPI(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) { @@ -2373,17 +2501,17 @@ int32_t tscParseTelnetAPI(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLin tscError("SML:0x%"PRIx64" Unable to parse metric value", info->id); return ret; } - tscDebug("SML:0x%"PRIx64" Parse tags finished, num of tags:%d", info->id, smlData->tagNum); + tscDebug("SML:0x%"PRIx64" Parse value finished", info->id); //Parse tagKVs SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - ret = parseTelnetTagKvs(&smlData->fields, &smlData->fieldNum, &index, smlData, keyHashTable, info); + ret = parseTelnetTagKvs(&smlData->fields, &smlData->fieldNum, &index, smlData->childTableName, keyHashTable, info); if (ret) { - tscError("SML:0x%"PRIx64" Unable to parse field", info->id); + tscError("SML:0x%"PRIx64" Unable to parse tags", info->id); taosHashCleanup(keyHashTable); return ret; } - tscDebug("SML:0x%"PRIx64" Parse fields finished, num of fields:%d", info->id, smlData->fieldNum); + tscDebug("SML:0x%"PRIx64" Parse tags finished", info->id); taosHashCleanup(keyHashTable); -- GitLab