diff --git a/src/client/inc/tscParseLine.h b/src/client/inc/tscParseLine.h index 30a316210c3d58539d3f6d0a5fb55fdd24108fad..802e88eef9389c509cf3f509657f22fa23187513 100644 --- a/src/client/inc/tscParseLine.h +++ b/src/client/inc/tscParseLine.h @@ -103,9 +103,9 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value, void destroySmlDataPoint(TAOS_SML_DATA_POINT* point); -int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, +int taos_insert_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows); -int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, +int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows); int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows); diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index c1bdbce91dacd413da79d2ac4ac4af12def2ab06..3abf56502faa7823842b1060df1f05d7fd94582a 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -577,7 +577,9 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab // Check if the table name available or not if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; - sprintf(pSql->cmd.payload, "table name is invalid"); + if(pSql->cmd.payload){ + sprintf(pSql->cmd.payload, "table name is invalid"); + } taosReleaseRef(tscObjRef, pSql->self); return code; } @@ -1966,21 +1968,15 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value, return TSDB_CODE_SUCCESS; } -static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **idx, SSmlLinesInfo* info) { - const char *start, *cur; +static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **idx, int32_t len, SSmlLinesInfo* info) { + const char *start; int32_t ret = TSDB_CODE_SUCCESS; - int len = 0; char key[] = "ts"; char *value = NULL; - start = cur = *idx; + start = *idx; *pTS = calloc(1, sizeof(TAOS_SML_KV)); - while(*cur != '\0') { - cur++; - len++; - } - if (len > 0) { value = calloc(len + 1, 1); memcpy(value, start, len); @@ -2013,12 +2009,12 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) { return false; } -static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *pHash, SSmlLinesInfo* info) { +static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **idx, int32_t sqlLen, SHashObj *pHash, SSmlLinesInfo* info) { const char *cur = *idx; char key[TSDB_COL_NAME_LEN + 1]; // +1 to avoid key[len] over write int16_t len = 0; - while (*cur != '\0') { + while (cur - *idx < sqlLen) { if (len > TSDB_COL_NAME_LEN - 1) { tscError("SML:0x%"PRIx64" Key field cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1); return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; @@ -2053,7 +2049,7 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *pHash, } -static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx, +static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx, int32_t sqlLen, bool *is_last_kv, SSmlLinesInfo* info, bool isTag) { const char *start, *cur; int32_t ret = TSDB_CODE_SUCCESS; @@ -2163,13 +2159,13 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx, cur++; break; } else if (double_quote == true) { - if (*cur != ' ' && *cur != ',' && *cur != '\0') { + if (*cur != ' ' && *cur != ',' && (cur - *idx < sqlLen)) { tscError("SML:0x%"PRIx64" tag value: state(%d), incorrect character(%c) behind closing \"", info->id, tag_state, *cur); ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR; goto error; } - if (*cur == ' ' || *cur == '\0') { + if (*cur == ' ' || (cur - *idx == sqlLen)) { *is_last_kv = true; } @@ -2308,13 +2304,13 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx, cur++; break; } else if (double_quote == true) { - if (*cur != ' ' && *cur != ',' && *cur != '\0') { + if (*cur != ' ' && *cur != ',' && (cur - *idx < sqlLen)) { tscError("SML:0x%"PRIx64" field value: state(%d), incorrect character(%c) behind closing \"", info->id, val_state, *cur); ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR; goto error; } - if (*cur == ' ' || *cur == '\0') { + if (*cur == ' ' || (cur - *idx == sqlLen)) { *is_last_kv = true; } @@ -2384,7 +2380,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx, } free(value); - *idx = (*cur == '\0') ? cur : cur + 1; + *idx = (cur - *idx == sqlLen) ? cur : cur + 1; return ret; error: @@ -2395,7 +2391,7 @@ error: return ret; } -static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **idx, +static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **idx, int32_t sqlLen, uint8_t *has_tags, SSmlLinesInfo* info) { const char *cur = *idx; int16_t len = 0; @@ -2405,7 +2401,7 @@ static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **idx, return TSDB_CODE_TSC_OUT_OF_MEMORY; } - while (*cur != '\0') { + while (cur - *idx < sqlLen) { if (len > TSDB_TABLE_NAME_LEN - 1) { tscError("SML:0x%"PRIx64" Measurement field cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1); free(pSml->stableName); @@ -2464,7 +2460,7 @@ int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* i static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs, - const char **idx, bool isField, + const char **idx, int32_t len, bool isField, TAOS_SML_DATA_POINT* smlData, SHashObj *pHash, SSmlLinesInfo* info) { const char *cur = *idx; @@ -2495,13 +2491,13 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs, addEscapeCharToString(childTableName, (int32_t)(childTableNameLen)); } - while (*cur != '\0') { - ret = parseSmlKey(pkv, &cur, pHash, info); + while (cur - *idx < len) { + ret = parseSmlKey(pkv, &cur, len - (cur - *idx), pHash, info); if (ret) { tscError("SML:0x%"PRIx64" Unable to parse key", info->id); goto error; } - ret = parseSmlValue(pkv, &cur, &is_last_kv, info, !isField); + ret = parseSmlValue(pkv, &cur, len - (cur - *idx), &is_last_kv, info, !isField); if (ret) { tscError("SML:0x%"PRIx64" Unable to parse value", info->id); goto error; @@ -2574,14 +2570,14 @@ static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *t free(ts); } -int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) { +int32_t tscParseLine(const char* sql, int32_t len, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) { const char* idx = sql; int32_t ret = TSDB_CODE_SUCCESS; uint8_t has_tags = 0; TAOS_SML_KV *timestamp = NULL; SHashObj *keyHashTable = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - ret = parseSmlMeasurement(smlData, &idx, &has_tags, info); + ret = parseSmlMeasurement(smlData, &idx, len, &has_tags, info); if (ret) { tscError("SML:0x%"PRIx64" Unable to parse measurement", info->id); taosHashCleanup(keyHashTable); @@ -2591,7 +2587,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf //Parse Tags if (has_tags) { - ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &idx, false, smlData, keyHashTable, info); + ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &idx, len - (idx - sql), false, smlData, keyHashTable, info); if (ret) { tscError("SML:0x%"PRIx64" Unable to parse tag", info->id); taosHashCleanup(keyHashTable); @@ -2601,7 +2597,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf tscDebug("SML:0x%"PRIx64" Parse tags finished, num of tags:%d", info->id, smlData->tagNum); //Parse fields - ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &idx, true, smlData, keyHashTable, info); + ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &idx, len - (idx - sql), true, smlData, keyHashTable, info); if (ret) { tscError("SML:0x%"PRIx64" Unable to parse field", info->id); taosHashCleanup(keyHashTable); @@ -2617,7 +2613,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf taosHashCleanup(keyHashTable); //Parse timestamp - ret = parseSmlTimeStamp(×tamp, &idx, info); + ret = parseSmlTimeStamp(×tamp, &idx, len - (idx - sql), info); if (ret) { tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id); return ret; @@ -2652,24 +2648,58 @@ void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) { free(point->childTableName); } -int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) { - for (int32_t i = 0; i < numLines; ++i) { - TAOS_SML_DATA_POINT point = {0}; - int32_t code = tscParseLine(lines[i], &point, info); - if (code != TSDB_CODE_SUCCESS) { - tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]); - destroySmlDataPoint(&point); - return code; - } else { - tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i); - } - - taosArrayPush(points, &point); +static int32_t tscParseLinesInner(char* line, int32_t len, SArray* points, SSmlLinesInfo* info){ + TAOS_SML_DATA_POINT point = {0}; + int32_t code = tscParseLine(line, len, &point, info); + if (code != TSDB_CODE_SUCCESS) { + tscError("SML:0x%"PRIx64" data point line parse failed.", info->id); + destroySmlDataPoint(&point); + return code; + } else { + tscDebug("SML:0x%"PRIx64" data point line parse success.", info->id); } + + taosArrayPush(points, &point); return TSDB_CODE_SUCCESS; } -int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) { +int32_t tscParseLines(char* data, int32_t len, char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + if(!data && lines){ + for (int32_t i = 0; i < numLines; ++i) { + code = tscParseLinesInner(lines[i], strlen(lines[i]), points, info); + if(code != TSDB_CODE_SUCCESS){ + return code; + } + } + }else if(data && !lines){ + char* tmp = data; + int32_t lenTmp = 0; + for(int i = 0; i < len; i++){ + if(data[i] == '\n' || i == len - 1){ + if(data[i] != '\n' || i == len - 1){ + lenTmp ++; + } + if(lenTmp > 0) { + code = tscParseLinesInner(tmp, lenTmp, points, info); + if(code != TSDB_CODE_SUCCESS){ + return code; + } + } + if(i < len - 1) { + tmp = data + i + 1; + } + lenTmp = 0; + }else{ + lenTmp ++; + } + } + } + + return code; +} + +int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) { int32_t code = 0; SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo)); @@ -2677,6 +2707,18 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p info->tsType = tsType; info->protocol = protocol; + if (data){ + numLines = 0; + for(int i = 0; i < len; i++){ + if(data[i] == '\0'){ + data[i] = '0'; + } + if(data[i] == '\n' || i == len - 1){ + numLines++; + } + } + } + if (numLines <= 0 || numLines > 65536*32) { tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536*32. numLines: %d", info->id, numLines); tfree(info); @@ -2684,12 +2726,14 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p return code; } - for (int i = 0; i < numLines; ++i) { - if (lines[i] == NULL) { - tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i); - tfree(info); - code = TSDB_CODE_TSC_APP_ERROR; - return code; + if(lines){ + for (int i = 0; i < numLines; ++i) { + if (lines[i] == NULL) { + tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i); + tfree(info); + code = TSDB_CODE_TSC_APP_ERROR; + return code; + } } } @@ -2700,8 +2744,8 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p return TSDB_CODE_TSC_OUT_OF_MEMORY; } - tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]); - code = tscParseLines(lines, numLines, lpPoints, NULL, info); + tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines", info->id, numLines); + code = tscParseLines(data, len, lines, numLines, lpPoints, NULL, info); size_t numPoints = taosArrayGetSize(lpPoints); if (code != 0) { @@ -2816,10 +2860,10 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr switch (protocol) { case TSDB_SML_LINE_PROTOCOL: - code = taos_insert_lines(taos, lines, numLines, protocol, tsType, &affected_rows); + code = taos_insert_lines(taos, NULL, 0, lines, numLines, protocol, tsType, &affected_rows); break; case TSDB_SML_TELNET_PROTOCOL: - code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows); + code = taos_insert_telnet_lines(taos, NULL, 0, lines, numLines, protocol, tsType, &affected_rows); break; case TSDB_SML_JSON_PROTOCOL: code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows); @@ -2830,6 +2874,39 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr } + SSqlObj *pSql = createSmlQueryObj(taos, affected_rows, code); + + return (TAOS_RES*)pSql; +} + +TAOS_RES *taos_schemaless_insert_new(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){ + int code = TSDB_CODE_SUCCESS; + int affected_rows = 0; + SMLTimeStampType tsType = SML_TIME_STAMP_NOW; + + if (protocol == TSDB_SML_LINE_PROTOCOL) { + code = convertPrecisionType(precision, &tsType); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + } + + switch (protocol) { + case TSDB_SML_LINE_PROTOCOL: + code = taos_insert_lines(taos, lines, len, NULL, 0, protocol, tsType, &affected_rows); + break; + case TSDB_SML_TELNET_PROTOCOL: + code = taos_insert_telnet_lines(taos, lines, len, NULL, 0, protocol, tsType, &affected_rows); + break; + case TSDB_SML_JSON_PROTOCOL: + code = taos_insert_json_payload(taos, lines, protocol, tsType, &affected_rows); + break; + default: + code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE; + break; + } + + SSqlObj *pSql = createSmlQueryObj(taos, affected_rows, code); return (TAOS_RES*)pSql; diff --git a/src/client/src/tscParseOpenTSDB.c b/src/client/src/tscParseOpenTSDB.c index 525bfa4bd3ac1cdbb43d68ef4fa3697bd9b20ac3..a1c011db2af8d5bb2d1c2cce8f928c369e35bc60 100644 --- a/src/client/src/tscParseOpenTSDB.c +++ b/src/client/src/tscParseOpenTSDB.c @@ -33,7 +33,7 @@ static uint64_t genUID() { return id; } -static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, SSmlLinesInfo* info) { +static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, int32_t sqlLen, SSmlLinesInfo* info) { const char *cur = *idx; uint16_t len = 0; @@ -49,7 +49,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, SS } */ - while (*cur != '\0') { + while (cur - *idx < sqlLen) { if (len > TSDB_TABLE_NAME_LEN - 1) { tscError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1); tfree(pSml->stableName); @@ -82,7 +82,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, SS return TSDB_CODE_SUCCESS; } -static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **idx, SSmlLinesInfo* info) { +static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **idx, int32_t sqlLen, SSmlLinesInfo* info) { //Timestamp must be the first KV to parse assert(*num_kvs == 0); @@ -96,7 +96,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char //allocate fields for timestamp and value *pTS = tcalloc(OTD_MAX_FIELDS_NUM, sizeof(TAOS_SML_KV)); - while(*cur != '\0') { + while(cur - *idx < sqlLen) { if (*cur == ' ') { if (*(cur + 1) != ' ') { break; @@ -109,7 +109,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char len++; } - if (len > 0 && *cur != '\0') { + if (len > 0 && cur - *idx < sqlLen) { value = tcalloc(len + 1, 1); memcpy(value, start, len); } else { @@ -135,7 +135,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char return ret; } -static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **idx, SSmlLinesInfo* info) { +static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **idx, int32_t sqlLen, SSmlLinesInfo* info) { //skip timestamp TAOS_SML_KV *pVal = *pKVs + 1; const char *start, *cur; @@ -158,7 +158,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch len += 2; } - while(*cur != '\0') { + while(cur - *idx < sqlLen) { if (*cur == ' ') { if (searchQuote == true) { if (*(cur - 1) == '"' && len != 1 && len != 2) { @@ -181,7 +181,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch len++; } - if (len > 0 && *cur != '\0') { + if (len > 0 && cur - *idx < sqlLen) { value = tcalloc(len + 1, 1); memcpy(value, start, len); } else { @@ -205,7 +205,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch return ret; } -static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *pHash, SSmlLinesInfo* info) { +static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, int32_t sqlLen, SHashObj *pHash, SSmlLinesInfo* info) { const char *cur = *idx; char key[TSDB_COL_NAME_LEN]; uint16_t len = 0; @@ -215,7 +215,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p // tscError("OTD:0x%"PRIx64" Tag key cannot start with digit", info->id); // return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; //} - while (*cur != '\0') { + while (cur - *idx < sqlLen) { if (len > TSDB_COL_NAME_LEN - 1) { tscError("OTD:0x%"PRIx64" Tag key cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1); return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; @@ -231,7 +231,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p cur++; len++; } - if (len == 0 || *cur == '\0') { + if (len == 0 || cur - *idx == sqlLen) { return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; } key[len] = '\0'; @@ -249,7 +249,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p } -static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx, +static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx, int32_t sqlLen, bool *is_last_kv, SSmlLinesInfo* info) { const char *start, *cur; char *value = NULL; @@ -258,9 +258,9 @@ static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx, while (1) { // whitespace or '\0' identifies a value - if (*cur == ' ' || *cur == '\0') { + if (*cur == ' ' || cur - *idx == sqlLen) { // '\0' indicates end of value - *is_last_kv = (*cur == '\0') ? true : false; + *is_last_kv = (cur - *idx == sqlLen) ? true : false; if (*cur == ' ' && *(cur + 1) == ' ') { cur++; continue; @@ -290,12 +290,12 @@ static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx, } tfree(value); - *idx = (*cur == '\0') ? cur : cur + 1; + *idx = (cur - *idx == sqlLen) ? cur : cur + 1; return TSDB_CODE_SUCCESS; } static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs, - const char **idx, char **childTableName, + const char **idx, int32_t sqlLen, char **childTableName, SHashObj *pHash, SSmlLinesInfo* info) { const char *cur = *idx; int32_t ret = TSDB_CODE_SUCCESS; @@ -312,13 +312,13 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs, memcpy(childTbName, tsSmlChildTableName, childTableNameLen); addEscapeCharToString(childTbName, (int32_t)(childTableNameLen)); } - while (*cur != '\0') { - ret = parseTelnetTagKey(pkv, &cur, pHash, info); + while (cur - *idx < sqlLen) { + ret = parseTelnetTagKey(pkv, &cur, sqlLen - (cur - *idx), pHash, info); if (ret) { tscError("OTD:0x%"PRIx64" Unable to parse key", info->id); return ret; } - ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info); + ret = parseTelnetTagValue(pkv, &cur, sqlLen - (cur - *idx), &is_last_kv, info); if (ret) { tscError("OTD:0x%"PRIx64" Unable to parse value", info->id); return ret; @@ -356,12 +356,12 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs, return ret; } -static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) { +static int32_t tscParseTelnetLine(const char* line, int32_t len, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) { const char* idx = line; int32_t ret = TSDB_CODE_SUCCESS; //Parse metric - ret = parseTelnetMetric(smlData, &idx, info); + ret = parseTelnetMetric(smlData, &idx, len, info); if (ret) { tscError("OTD:0x%"PRIx64" Unable to parse metric", info->id); return ret; @@ -369,7 +369,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData tscDebug("OTD:0x%"PRIx64" Parse metric finished", info->id); //Parse timestamp - ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &idx, info); + ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &idx, len - (idx - line), info); if (ret) { tscError("OTD:0x%"PRIx64" Unable to parse timestamp", info->id); return ret; @@ -377,7 +377,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData tscDebug("OTD:0x%"PRIx64" Parse timestamp finished", info->id); //Parse value - ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &idx, info); + ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &idx, len - (idx - line), info); if (ret) { tscError("OTD:0x%"PRIx64" Unable to parse metric value", info->id); return ret; @@ -386,7 +386,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData //Parse tagKVs SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - ret = parseTelnetTagKvs(&smlData->tags, &smlData->tagNum, &idx, &smlData->childTableName, keyHashTable, info); + ret = parseTelnetTagKvs(&smlData->tags, &smlData->tagNum, &idx, len - (idx - line), &smlData->childTableName, keyHashTable, info); if (ret) { tscError("OTD:0x%"PRIx64" Unable to parse tags", info->id); taosHashCleanup(keyHashTable); @@ -399,24 +399,58 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData return TSDB_CODE_SUCCESS; } -static int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) { - for (int32_t i = 0; i < numLines; ++i) { - TAOS_SML_DATA_POINT point = {0}; - int32_t code = tscParseTelnetLine(lines[i], &point, info); - if (code != TSDB_CODE_SUCCESS) { - tscError("OTD:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]); - destroySmlDataPoint(&point); - return code; - } else { - tscDebug("OTD:0x%"PRIx64" data point line parse success. line %d", info->id, i); - } - - taosArrayPush(points, &point); +static int32_t tscParseTelnetLinesInner(char* data, int32_t len, SArray* points, SSmlLinesInfo* info) { + TAOS_SML_DATA_POINT point = {0}; + int32_t code = tscParseTelnetLine(data, len, &point, info); + if (code != TSDB_CODE_SUCCESS) { + tscError("OTD:0x%"PRIx64" data point line parse failed.", info->id); + destroySmlDataPoint(&point); + return code; + } else { + tscDebug("OTD:0x%"PRIx64" data point line parse success.", info->id); } + + taosArrayPush(points, &point); return TSDB_CODE_SUCCESS; } -int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) { +static int32_t tscParseTelnetLines(char* data, int32_t len, char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) { + int32_t code = TSDB_CODE_SUCCESS; + if(!data && lines){ + for (int32_t i = 0; i < numLines; ++i) { + code = tscParseTelnetLinesInner(lines[i], strlen(lines[i]), points, info); + if(code != TSDB_CODE_SUCCESS){ + return code; + } + } + }else if(data && !lines){ + char* tmp = data; + int32_t lenTmp = 0; + for(int i = 0; i < len; i++){ + if(data[i] == '\n' || i == len - 1){ + if(data[i] != '\n' || i == len - 1){ + lenTmp++; + } + if(lenTmp > 0) { + code = tscParseTelnetLinesInner(tmp, lenTmp, points, info); + if(code != TSDB_CODE_SUCCESS){ + return code; + } + } + if(i < len - 1) { + tmp = data + i + 1; + } + lenTmp = 0; + }else{ + lenTmp ++; + } + } + } + + return code; +} + +int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) { int32_t code = 0; SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo)); @@ -424,6 +458,18 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco info->tsType = tsType; info->protocol = protocol; + if (data && !lines){ + numLines = 0; + for(int i = 0; i < len; i++){ + if(data[i] == '\0'){ + data[i] = '0'; + } + if(data[i] == '\n' || i == len - 1){ + numLines++; + } + } + } + if (numLines <= 0 || numLines > 65536) { tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); tfree(info); @@ -431,12 +477,14 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco return code; } - for (int i = 0; i < numLines; ++i) { - if (lines[i] == NULL) { - tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i); - tfree(info); - code = TSDB_CODE_TSC_APP_ERROR; - return code; + if(!data && lines){ + for (int i = 0; i < numLines; ++i) { + if (lines[i] == NULL) { + tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i); + tfree(info); + code = TSDB_CODE_TSC_APP_ERROR; + return code; + } } } @@ -448,7 +496,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco } tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]); - code = tscParseTelnetLines(lines, numLines, lpPoints, NULL, info); + code = tscParseTelnetLines(data, len, lines, numLines, lpPoints, NULL, info); size_t numPoints = taosArrayGetSize(lpPoints); if (code != 0) { diff --git a/src/inc/taos.h b/src/inc/taos.h index 44d83969a8090fc1e98fcec08065dd1834cf3f75..03993c2830e29b27ef998f07e8ae3b614b37991e 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -215,6 +215,7 @@ DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); +DLL_EXPORT TAOS_RES *taos_schemaless_insert_new(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision); DLL_EXPORT int32_t taos_parse_time(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);