提交 ca25392d 编写于 作者: wmmhello's avatar wmmhello

feat:add new interface for schemaless to support '\0' in line value

上级 01763823
......@@ -103,9 +103,9 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point);
int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol,
int taos_insert_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows);
int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol,
int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows);
int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows);
......
......@@ -1966,21 +1966,15 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
return TSDB_CODE_SUCCESS;
}
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **idx, SSmlLinesInfo* info) {
const char *start, *cur;
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **idx, int32_t len, SSmlLinesInfo* info) {
const char *start;
int32_t ret = TSDB_CODE_SUCCESS;
int len = 0;
char key[] = "ts";
char *value = NULL;
start = cur = *idx;
start = *idx;
*pTS = calloc(1, sizeof(TAOS_SML_KV));
while(*cur != '\0') {
cur++;
len++;
}
if (len > 0) {
value = calloc(len + 1, 1);
memcpy(value, start, len);
......@@ -2013,12 +2007,12 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
return false;
}
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *pHash, SSmlLinesInfo* info) {
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **idx, int32_t sqlLen, SHashObj *pHash, SSmlLinesInfo* info) {
const char *cur = *idx;
char key[TSDB_COL_NAME_LEN + 1]; // +1 to avoid key[len] over write
int16_t len = 0;
while (*cur != '\0') {
while (cur - *idx < sqlLen) {
if (len > TSDB_COL_NAME_LEN - 1) {
tscError("SML:0x%"PRIx64" Key field cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
......@@ -2053,7 +2047,7 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *pHash,
}
static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx,
static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx, int32_t sqlLen,
bool *is_last_kv, SSmlLinesInfo* info, bool isTag) {
const char *start, *cur;
int32_t ret = TSDB_CODE_SUCCESS;
......@@ -2163,13 +2157,13 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx,
cur++;
break;
} else if (double_quote == true) {
if (*cur != ' ' && *cur != ',' && *cur != '\0') {
if (*cur != ' ' && *cur != ',' && (cur - *idx < sqlLen)) {
tscError("SML:0x%"PRIx64" tag value: state(%d), incorrect character(%c) behind closing \"", info->id, tag_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
if (*cur == ' ' || *cur == '\0') {
if (*cur == ' ' || (cur - *idx == sqlLen)) {
*is_last_kv = true;
}
......@@ -2308,13 +2302,13 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx,
cur++;
break;
} else if (double_quote == true) {
if (*cur != ' ' && *cur != ',' && *cur != '\0') {
if (*cur != ' ' && *cur != ',' && (cur - *idx < sqlLen)) {
tscError("SML:0x%"PRIx64" field value: state(%d), incorrect character(%c) behind closing \"", info->id, val_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
if (*cur == ' ' || *cur == '\0') {
if (*cur == ' ' || (cur - *idx == sqlLen)) {
*is_last_kv = true;
}
......@@ -2384,7 +2378,7 @@ static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **idx,
}
free(value);
*idx = (*cur == '\0') ? cur : cur + 1;
*idx = (cur - *idx == sqlLen) ? cur : cur + 1;
return ret;
error:
......@@ -2395,7 +2389,7 @@ error:
return ret;
}
static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **idx,
static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **idx, int32_t sqlLen,
uint8_t *has_tags, SSmlLinesInfo* info) {
const char *cur = *idx;
int16_t len = 0;
......@@ -2405,7 +2399,7 @@ static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **idx,
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
while (*cur != '\0') {
while (cur - *idx < sqlLen) {
if (len > TSDB_TABLE_NAME_LEN - 1) {
tscError("SML:0x%"PRIx64" Measurement field cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
free(pSml->stableName);
......@@ -2464,7 +2458,7 @@ int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* i
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
const char **idx, bool isField,
const char **idx, int32_t len, bool isField,
TAOS_SML_DATA_POINT* smlData, SHashObj *pHash,
SSmlLinesInfo* info) {
const char *cur = *idx;
......@@ -2495,13 +2489,13 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
addEscapeCharToString(childTableName, (int32_t)(childTableNameLen));
}
while (*cur != '\0') {
ret = parseSmlKey(pkv, &cur, pHash, info);
while (cur - *idx < len) {
ret = parseSmlKey(pkv, &cur, len - (cur - *idx), pHash, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse key", info->id);
goto error;
}
ret = parseSmlValue(pkv, &cur, &is_last_kv, info, !isField);
ret = parseSmlValue(pkv, &cur, len - (cur - *idx), &is_last_kv, info, !isField);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse value", info->id);
goto error;
......@@ -2574,14 +2568,14 @@ static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *t
free(ts);
}
int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
int32_t tscParseLine(const char* sql, int32_t len, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
const char* idx = sql;
int32_t ret = TSDB_CODE_SUCCESS;
uint8_t has_tags = 0;
TAOS_SML_KV *timestamp = NULL;
SHashObj *keyHashTable = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
ret = parseSmlMeasurement(smlData, &idx, &has_tags, info);
ret = parseSmlMeasurement(smlData, &idx, len, &has_tags, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse measurement", info->id);
taosHashCleanup(keyHashTable);
......@@ -2591,7 +2585,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
//Parse Tags
if (has_tags) {
ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &idx, false, smlData, keyHashTable, info);
ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &idx, len - (idx - sql), false, smlData, keyHashTable, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse tag", info->id);
taosHashCleanup(keyHashTable);
......@@ -2601,7 +2595,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
tscDebug("SML:0x%"PRIx64" Parse tags finished, num of tags:%d", info->id, smlData->tagNum);
//Parse fields
ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &idx, true, smlData, keyHashTable, info);
ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &idx, len - (idx - sql), true, smlData, keyHashTable, info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse field", info->id);
taosHashCleanup(keyHashTable);
......@@ -2617,7 +2611,7 @@ int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInf
taosHashCleanup(keyHashTable);
//Parse timestamp
ret = parseSmlTimeStamp(&timestamp, &idx, info);
ret = parseSmlTimeStamp(&timestamp, &idx, len - (idx - sql), info);
if (ret) {
tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id);
return ret;
......@@ -2652,24 +2646,55 @@ void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
free(point->childTableName);
}
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numLines; ++i) {
TAOS_SML_DATA_POINT point = {0};
int32_t code = tscParseLine(lines[i], &point, info);
if (code != TSDB_CODE_SUCCESS) {
tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
destroySmlDataPoint(&point);
return code;
} else {
tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i);
}
taosArrayPush(points, &point);
static int32_t tscParseLinesInner(char* line, int32_t len, SArray* points, SSmlLinesInfo* info){
TAOS_SML_DATA_POINT point = {0};
int32_t code = tscParseLine(line, len, &point, info);
if (code != TSDB_CODE_SUCCESS) {
tscError("SML:0x%"PRIx64" data point line parse failed.", info->id);
destroySmlDataPoint(&point);
return code;
} else {
tscDebug("SML:0x%"PRIx64" data point line parse success.", info->id);
}
taosArrayPush(points, &point);
return TSDB_CODE_SUCCESS;
}
int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) {
int32_t tscParseLines(char* data, int32_t len, char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
if(!data && lines){
for (int32_t i = 0; i < numLines; ++i) {
code = tscParseLinesInner(lines[i], strlen(lines[i]), points, info);
if(code != TSDB_CODE_SUCCESS){
return code;
}
}
}else if(data && !lines){
char* tmp = data;
int32_t lenTmp = 0;
for(int i = 0; i < len; i++){
if(data[i] == '\n'){
if(lenTmp > 0) {
code = tscParseLinesInner(tmp, lenTmp, points, info);
if(code != TSDB_CODE_SUCCESS){
return code;
}
}
if(i < len - 1) {
tmp = data + i + 1;
}
lenTmp = 0;
}else{
lenTmp ++;
}
}
}
return code;
}
int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) {
int32_t code = 0;
SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
......@@ -2677,6 +2702,15 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
info->tsType = tsType;
info->protocol = protocol;
if (data){
numLines = 0;
for(int i = 0; i < len; i++){
if(data[i] == '\n' || i == len - 1){
numLines++;
}
}
}
if (numLines <= 0 || numLines > 65536*32) {
tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536*32. numLines: %d", info->id, numLines);
tfree(info);
......@@ -2684,12 +2718,14 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
return code;
}
for (int i = 0; i < numLines; ++i) {
if (lines[i] == NULL) {
tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
tfree(info);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
if(lines){
for (int i = 0; i < numLines; ++i) {
if (lines[i] == NULL) {
tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
tfree(info);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
}
}
}
......@@ -2701,7 +2737,7 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType p
}
tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
code = tscParseLines(lines, numLines, lpPoints, NULL, info);
code = tscParseLines(data, len, lines, numLines, lpPoints, NULL, info);
size_t numPoints = taosArrayGetSize(lpPoints);
if (code != 0) {
......@@ -2816,10 +2852,10 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
switch (protocol) {
case TSDB_SML_LINE_PROTOCOL:
code = taos_insert_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
code = taos_insert_lines(taos, NULL, 0, lines, numLines, protocol, tsType, &affected_rows);
break;
case TSDB_SML_TELNET_PROTOCOL:
code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
code = taos_insert_telnet_lines(taos, NULL, 0, lines, numLines, protocol, tsType, &affected_rows);
break;
case TSDB_SML_JSON_PROTOCOL:
code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows);
......@@ -2830,6 +2866,39 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
}
SSqlObj *pSql = createSmlQueryObj(taos, affected_rows, code);
return (TAOS_RES*)pSql;
}
TAOS_RES *taos_schemaless_insert_new(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){
int code = TSDB_CODE_SUCCESS;
int affected_rows = 0;
SMLTimeStampType tsType = SML_TIME_STAMP_NOW;
if (protocol == TSDB_SML_LINE_PROTOCOL) {
code = convertPrecisionType(precision, &tsType);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
}
switch (protocol) {
case TSDB_SML_LINE_PROTOCOL:
code = taos_insert_lines(taos, lines, len, NULL, 0, protocol, tsType, &affected_rows);
break;
case TSDB_SML_TELNET_PROTOCOL:
code = taos_insert_telnet_lines(taos, lines, len, NULL, 0, protocol, tsType, &affected_rows);
break;
case TSDB_SML_JSON_PROTOCOL:
code = taos_insert_json_payload(taos, lines, protocol, tsType, &affected_rows);
break;
default:
code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE;
break;
}
SSqlObj *pSql = createSmlQueryObj(taos, affected_rows, code);
return (TAOS_RES*)pSql;
......
......@@ -33,7 +33,7 @@ static uint64_t genUID() {
return id;
}
static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, SSmlLinesInfo* info) {
static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, int32_t sqlLen, SSmlLinesInfo* info) {
const char *cur = *idx;
uint16_t len = 0;
......@@ -49,7 +49,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, SS
}
*/
while (*cur != '\0') {
while (cur - *idx < sqlLen) {
if (len > TSDB_TABLE_NAME_LEN - 1) {
tscError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
tfree(pSml->stableName);
......@@ -82,7 +82,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **idx, SS
return TSDB_CODE_SUCCESS;
}
static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **idx, SSmlLinesInfo* info) {
static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **idx, int32_t sqlLen, SSmlLinesInfo* info) {
//Timestamp must be the first KV to parse
assert(*num_kvs == 0);
......@@ -96,7 +96,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
//allocate fields for timestamp and value
*pTS = tcalloc(OTD_MAX_FIELDS_NUM, sizeof(TAOS_SML_KV));
while(*cur != '\0') {
while(cur - *idx < sqlLen) {
if (*cur == ' ') {
if (*(cur + 1) != ' ') {
break;
......@@ -109,7 +109,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
len++;
}
if (len > 0 && *cur != '\0') {
if (len > 0 && cur - *idx < sqlLen) {
value = tcalloc(len + 1, 1);
memcpy(value, start, len);
} else {
......@@ -135,7 +135,7 @@ static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char
return ret;
}
static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **idx, SSmlLinesInfo* info) {
static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **idx, int32_t sqlLen, SSmlLinesInfo* info) {
//skip timestamp
TAOS_SML_KV *pVal = *pKVs + 1;
const char *start, *cur;
......@@ -158,7 +158,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
len += 2;
}
while(*cur != '\0') {
while(cur - *idx < sqlLen) {
if (*cur == ' ') {
if (searchQuote == true) {
if (*(cur - 1) == '"' && len != 1 && len != 2) {
......@@ -181,7 +181,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
len++;
}
if (len > 0 && *cur != '\0') {
if (len > 0 && cur - *idx < sqlLen) {
value = tcalloc(len + 1, 1);
memcpy(value, start, len);
} else {
......@@ -205,7 +205,7 @@ static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const ch
return ret;
}
static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *pHash, SSmlLinesInfo* info) {
static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, int32_t sqlLen, SHashObj *pHash, SSmlLinesInfo* info) {
const char *cur = *idx;
char key[TSDB_COL_NAME_LEN];
uint16_t len = 0;
......@@ -215,7 +215,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p
// tscError("OTD:0x%"PRIx64" Tag key cannot start with digit", info->id);
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
//}
while (*cur != '\0') {
while (cur - *idx < sqlLen) {
if (len > TSDB_COL_NAME_LEN - 1) {
tscError("OTD:0x%"PRIx64" Tag key cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
......@@ -231,7 +231,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p
cur++;
len++;
}
if (len == 0 || *cur == '\0') {
if (len == 0 || cur - *idx < sqlLen) {
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
key[len] = '\0';
......@@ -249,7 +249,7 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **idx, SHashObj *p
}
static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx,
static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx, int32_t sqlLen,
bool *is_last_kv, SSmlLinesInfo* info) {
const char *start, *cur;
char *value = NULL;
......@@ -258,9 +258,9 @@ static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx,
while (1) {
// whitespace or '\0' identifies a value
if (*cur == ' ' || *cur == '\0') {
if (*cur == ' ' || cur - *idx == sqlLen) {
// '\0' indicates end of value
*is_last_kv = (*cur == '\0') ? true : false;
*is_last_kv = (cur - *idx == sqlLen) ? true : false;
if (*cur == ' ' && *(cur + 1) == ' ') {
cur++;
continue;
......@@ -290,12 +290,12 @@ static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **idx,
}
tfree(value);
*idx = (*cur == '\0') ? cur : cur + 1;
*idx = (cur - *idx == sqlLen) ? cur : cur + 1;
return TSDB_CODE_SUCCESS;
}
static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
const char **idx, char **childTableName,
const char **idx, int32_t sqlLen, char **childTableName,
SHashObj *pHash, SSmlLinesInfo* info) {
const char *cur = *idx;
int32_t ret = TSDB_CODE_SUCCESS;
......@@ -312,13 +312,13 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
memcpy(childTbName, tsSmlChildTableName, childTableNameLen);
addEscapeCharToString(childTbName, (int32_t)(childTableNameLen));
}
while (*cur != '\0') {
ret = parseTelnetTagKey(pkv, &cur, pHash, info);
while (cur - *idx < sqlLen) {
ret = parseTelnetTagKey(pkv, &cur, sqlLen - (cur - *idx), pHash, info);
if (ret) {
tscError("OTD:0x%"PRIx64" Unable to parse key", info->id);
return ret;
}
ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info);
ret = parseTelnetTagValue(pkv, &cur, sqlLen - (cur - *idx), &is_last_kv, info);
if (ret) {
tscError("OTD:0x%"PRIx64" Unable to parse value", info->id);
return ret;
......@@ -356,12 +356,12 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
return ret;
}
static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
static int32_t tscParseTelnetLine(const char* line, int32_t len, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
const char* idx = line;
int32_t ret = TSDB_CODE_SUCCESS;
//Parse metric
ret = parseTelnetMetric(smlData, &idx, info);
ret = parseTelnetMetric(smlData, &idx, len, info);
if (ret) {
tscError("OTD:0x%"PRIx64" Unable to parse metric", info->id);
return ret;
......@@ -369,7 +369,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData
tscDebug("OTD:0x%"PRIx64" Parse metric finished", info->id);
//Parse timestamp
ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &idx, info);
ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &idx, len - (idx - line), info);
if (ret) {
tscError("OTD:0x%"PRIx64" Unable to parse timestamp", info->id);
return ret;
......@@ -377,7 +377,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData
tscDebug("OTD:0x%"PRIx64" Parse timestamp finished", info->id);
//Parse value
ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &idx, info);
ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &idx, len - (idx - line), info);
if (ret) {
tscError("OTD:0x%"PRIx64" Unable to parse metric value", info->id);
return ret;
......@@ -386,7 +386,7 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData
//Parse tagKVs
SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
ret = parseTelnetTagKvs(&smlData->tags, &smlData->tagNum, &idx, &smlData->childTableName, keyHashTable, info);
ret = parseTelnetTagKvs(&smlData->tags, &smlData->tagNum, &idx, len - (idx - line), &smlData->childTableName, keyHashTable, info);
if (ret) {
tscError("OTD:0x%"PRIx64" Unable to parse tags", info->id);
taosHashCleanup(keyHashTable);
......@@ -399,24 +399,55 @@ static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData
return TSDB_CODE_SUCCESS;
}
static int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numLines; ++i) {
TAOS_SML_DATA_POINT point = {0};
int32_t code = tscParseTelnetLine(lines[i], &point, info);
if (code != TSDB_CODE_SUCCESS) {
tscError("OTD:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
destroySmlDataPoint(&point);
return code;
} else {
tscDebug("OTD:0x%"PRIx64" data point line parse success. line %d", info->id, i);
}
taosArrayPush(points, &point);
static int32_t tscParseTelnetLinesInner(char* data, int32_t len, SArray* points, SSmlLinesInfo* info) {
TAOS_SML_DATA_POINT point = {0};
int32_t code = tscParseTelnetLine(data, len, &point, info);
if (code != TSDB_CODE_SUCCESS) {
tscError("OTD:0x%"PRIx64" data point line parse failed.", info->id);
destroySmlDataPoint(&point);
return code;
} else {
tscDebug("OTD:0x%"PRIx64" data point line parse success.", info->id);
}
taosArrayPush(points, &point);
return TSDB_CODE_SUCCESS;
}
int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) {
static int32_t tscParseTelnetLines(char* data, int32_t len, char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
if(!data && lines){
for (int32_t i = 0; i < numLines; ++i) {
code = tscParseTelnetLinesInner(lines[i], strlen(lines[i]), points, info);
if(code != TSDB_CODE_SUCCESS){
return code;
}
}
}else if(data && !lines){
char* tmp = data;
int32_t lenTmp = 0;
for(int i = 0; i < len; i++){
if(data[i] == '\n'){
if(lenTmp > 0) {
code = tscParseTelnetLinesInner(tmp, lenTmp, points, info);
if(code != TSDB_CODE_SUCCESS){
return code;
}
}
if(i < len - 1) {
tmp = data + i + 1;
}
lenTmp = 0;
}else{
lenTmp ++;
}
}
}
return code;
}
int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) {
int32_t code = 0;
SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
......@@ -424,6 +455,15 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco
info->tsType = tsType;
info->protocol = protocol;
if (data && !lines){
numLines = 0;
for(int i = 0; i < len; i++){
if(data[i] == '\n' || i == len - 1){
numLines++;
}
}
}
if (numLines <= 0 || numLines > 65536) {
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
tfree(info);
......@@ -431,12 +471,14 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco
return code;
}
for (int i = 0; i < numLines; ++i) {
if (lines[i] == NULL) {
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i);
tfree(info);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
if(!data && lines){
for (int i = 0; i < numLines; ++i) {
if (lines[i] == NULL) {
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i);
tfree(info);
code = TSDB_CODE_TSC_APP_ERROR;
return code;
}
}
}
......@@ -448,7 +490,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtoco
}
tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
code = tscParseTelnetLines(lines, numLines, lpPoints, NULL, info);
code = tscParseTelnetLines(data, len, lines, numLines, lpPoints, NULL, info);
size_t numPoints = taosArrayGetSize(lpPoints);
if (code != 0) {
......
......@@ -215,6 +215,7 @@ DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
DLL_EXPORT TAOS_RES *taos_schemaless_insert_new(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision);
DLL_EXPORT int32_t taos_parse_time(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册