提交 4ae20f6a 编写于 作者: G Ganlin Zhao

[TD-6442]<feature>: Support OpenTSDB telnet style data import format

上级 81a8e5c7
......@@ -17,7 +17,7 @@
/* telnet style API parser */
static uint64_t HandleId = 0;
uint64_t genUID() {
static uint64_t genUID() {
uint64_t id;
do {
......@@ -36,14 +36,14 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index,
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (isdigit(*cur)) {
tscError("SML:0x%"PRIx64" Metric cannnot start with digit", info->id);
tscError("OTD:0x%"PRIx64" Metric cannnot start with digit", info->id);
tfree(pSml->stableName);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
while (*cur != '\0') {
if (len > TSDB_TABLE_NAME_LEN) {
tscError("SML:0x%"PRIx64" Metric cannot exceeds 193 characters", info->id);
tscError("OTD:0x%"PRIx64" Metric cannot exceeds 193 characters", info->id);
tfree(pSml->stableName);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
......@@ -56,9 +56,14 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index,
cur++;
len++;
}
if (len == 0) {
tfree(pSml->stableName);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
pSml->stableName[len] = '\0';
*index = cur + 1;
tscDebug("SML:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len);
tscDebug("OTD:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len);
return TSDB_CODE_SUCCESS;
}
......@@ -138,7 +143,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
}
if (!convertSmlValueType(pVal, value, len, info)) {
tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
tscError("OTD:0x%"PRIx64" Failed to convert metric value string(%s) to any type",
info->id, value);
tfree(value);
tfree(*pKVs);
......@@ -161,12 +166,12 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj
//key field cannot start with digit
if (isdigit(*cur)) {
tscError("SML:0x%"PRIx64" Tag key cannnot start with digit", info->id);
tscError("OTD:0x%"PRIx64" Tag key cannnot start with digit", info->id);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
while (*cur != '\0') {
if (len > TSDB_COL_NAME_LEN) {
tscError("SML:0x%"PRIx64" Key field cannot exceeds 65 characters", info->id);
tscError("OTD:0x%"PRIx64" Tag key cannot exceeds 65 characters", info->id);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
if (*cur == '=') {
......@@ -177,6 +182,9 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj
cur++;
len++;
}
if (len == 0) {
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
key[len] = '\0';
if (checkDuplicateKey(key, pHash, info)) {
......@@ -185,13 +193,13 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj
pKV->key = tcalloc(len + 1, 1);
memcpy(pKV->key, key, len + 1);
//tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
//tscDebug("OTD:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
*index = cur + 1;
return TSDB_CODE_SUCCESS;
}
static bool parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index,
static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index,
bool *is_last_kv, SSmlLinesInfo* info) {
const char *start, *cur;
char *value = NULL;
......@@ -209,11 +217,16 @@ static bool parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index,
len++;
}
if (len == 0) {
tfree(pKV->key);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
value = tcalloc(len + 1, 1);
memcpy(value, start, len);
value[len] = '\0';
if (!convertSmlValueType(pKV, value, len, info)) {
tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
tscError("OTD:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
info->id, value);
//free previous alocated key field
tfree(pKV->key);
......@@ -241,12 +254,12 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
while (*cur != '\0') {
ret = parseTelnetTagKey(pkv, &cur, pHash, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse key", info->id);
tscError("OTD:0x%"PRIx64" Unable to parse key", info->id);
return ret;
}
ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse value", info->id);
tscError("OTD:0x%"PRIx64" Unable to parse value", info->id);
return ret;
}
if ((strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
......@@ -292,36 +305,36 @@ int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlL
//Parse metric
ret = parseTelnetMetric(smlData, &index, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse metric", info->id);
tscError("OTD:0x%"PRIx64" Unable to parse metric", info->id);
return ret;
}
tscDebug("SML:0x%"PRIx64" Parse metric finished", info->id);
tscDebug("OTD:0x%"PRIx64" Parse metric finished", info->id);
//Parse timestamp
ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &index, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id);
tscError("OTD:0x%"PRIx64" Unable to parse timestamp", info->id);
return ret;
}
tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id);
tscDebug("OTD:0x%"PRIx64" Parse timestamp finished", info->id);
//Parse value
ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &index, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse metric value", info->id);
tscError("OTD:0x%"PRIx64" Unable to parse metric value", info->id);
return ret;
}
tscDebug("SML:0x%"PRIx64" Parse value finished", info->id);
tscDebug("OTD:0x%"PRIx64" Parse metric value finished", info->id);
//Parse tagKVs
SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
ret = parseTelnetTagKvs(&smlData->tags, &smlData->tagNum, &index, &smlData->childTableName, keyHashTable, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse tags", info->id);
tscError("OTD:0x%"PRIx64" Unable to parse tags", info->id);
taosHashCleanup(keyHashTable);
return ret;
}
tscDebug("SML:0x%"PRIx64" Parse tags finished", info->id);
tscDebug("OTD:0x%"PRIx64" Parse tags finished", info->id);
taosHashCleanup(keyHashTable);
......@@ -333,11 +346,11 @@ int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray*
TAOS_SML_DATA_POINT point = {0};
int32_t code = tscParseTelnetLine(lines[i], &point, info);
if (code != TSDB_CODE_SUCCESS) {
tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
tscError("OTD:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
destroySmlDataPoint(&point);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} else {
tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i);
tscDebug("OTD:0x%"PRIx64" data point line parse success. line %d", info->id, i);
}
taosArrayPush(points, &point);
......@@ -352,14 +365,14 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) {
info->id = genUID();
if (numLines <= 0 || numLines > 65536) {
tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
}
for (int i = 0; i < numLines; ++i) {
if (lines[i] == NULL) {
tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i);
tfree(info);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
......@@ -368,12 +381,12 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) {
SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
if (lpPoints == NULL) {
tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines failed to allocate memory", info->id);
tfree(info);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
code = tscParseTelnetLines(lines, numLines, lpPoints, NULL, info);
size_t numPoints = taosArrayGetSize(lpPoints);
......@@ -384,11 +397,11 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) {
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
code = tscSmlInsert(taos, points, (int)numPoints, info);
if (code != 0) {
tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines error: %s", info->id, tstrerror((code)));
}
cleanup:
tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines finish inserting %d lines. code: %d", info->id, numLines, code);
points = TARRAY_GET_START(lpPoints);
numPoints = taosArrayGetSize(lpPoints);
for (int i=0; i<numPoints; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册