提交 9fe6d234 编写于 作者: wmmhello's avatar wmmhello

feat:add new interface for schemaless

上级 a0cd5786
...@@ -79,7 +79,7 @@ ...@@ -79,7 +79,7 @@
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" " #define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
#define MAX_RETRY_TIMES 5 #define MAX_RETRY_TIMES 5
#define LINE_BATCH 2000 #define LINE_BATCH 2
//================================================================================================= //=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
...@@ -2199,7 +2199,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { ...@@ -2199,7 +2199,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
} }
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
ret = smlParseTelnetString(info, (const char *)data, POINTER_SHIFT(data, len), tinfo, cols); ret = smlParseTelnetString(info, (const char *)data, (char*)data + len, tinfo, cols);
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) { } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols); ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
} else { } else {
...@@ -2408,6 +2408,9 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char ...@@ -2408,6 +2408,9 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char
} }
len++; len++;
} }
if(info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#'){ // this line is comment
continue;
}
} }
if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
...@@ -2418,7 +2421,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char ...@@ -2418,7 +2421,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char
ASSERT(0); ASSERT(0);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]); uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
return code; return code;
} }
} }
...@@ -2692,11 +2695,16 @@ TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t * ...@@ -2692,11 +2695,16 @@ TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *
} }
int numLines = 0; int numLines = 0;
*totalRows = 0;
char *tmp = lines;
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
if(lines[i] == '\n' || i == len - 1){ if(lines[i] == '\n' || i == len - 1){
numLines++; numLines++;
if(tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL){ //ignore comment
(*totalRows)++;
}
tmp = lines + i + 1;
} }
} }
*totalRows = numLines;
return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision); return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision);
} }
...@@ -44,7 +44,7 @@ TEST(testCase, smlParseInfluxString_Test) { ...@@ -44,7 +44,7 @@ TEST(testCase, smlParseInfluxString_Test) {
char *tmp = "\\,st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3"; char *tmp = "\\,st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3";
char *sql = (char *)taosMemoryCalloc(256, 1); char *sql = (char *)taosMemoryCalloc(256, 1);
memcpy(sql, tmp, strlen(tmp) + 1); memcpy(sql, tmp, strlen(tmp) + 1);
int ret = smlParseInfluxString(sql, &elements, &msgBuf); int ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql); ASSERT_EQ(elements.measure, sql);
ASSERT_EQ(elements.measureLen, strlen(",st")); ASSERT_EQ(elements.measureLen, strlen(",st"));
...@@ -63,14 +63,14 @@ TEST(testCase, smlParseInfluxString_Test) { ...@@ -63,14 +63,14 @@ TEST(testCase, smlParseInfluxString_Test) {
tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
memcpy(sql, tmp, strlen(tmp) + 1); memcpy(sql, tmp, strlen(tmp) + 1);
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
// case 3 false // case 3 false
tmp = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"; tmp = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
memcpy(sql, tmp, strlen(tmp) + 1); memcpy(sql, tmp, strlen(tmp) + 1);
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 1); ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 1);
ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3")); ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3"));
...@@ -79,7 +79,7 @@ TEST(testCase, smlParseInfluxString_Test) { ...@@ -79,7 +79,7 @@ TEST(testCase, smlParseInfluxString_Test) {
tmp = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000"; tmp = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000";
memcpy(sql, tmp, strlen(tmp) + 1); memcpy(sql, tmp, strlen(tmp) + 1);
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql); ASSERT_EQ(elements.measure, sql);
ASSERT_EQ(elements.measureLen, strlen("st")); ASSERT_EQ(elements.measureLen, strlen("st"));
...@@ -98,7 +98,7 @@ TEST(testCase, smlParseInfluxString_Test) { ...@@ -98,7 +98,7 @@ TEST(testCase, smlParseInfluxString_Test) {
tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 "; tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ";
memcpy(sql, tmp, strlen(tmp) + 1); memcpy(sql, tmp, strlen(tmp) + 1);
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql + 1); ASSERT_EQ(elements.measure, sql + 1);
ASSERT_EQ(elements.measureLen, strlen("st")); ASSERT_EQ(elements.measureLen, strlen("st"));
...@@ -116,21 +116,21 @@ TEST(testCase, smlParseInfluxString_Test) { ...@@ -116,21 +116,21 @@ TEST(testCase, smlParseInfluxString_Test) {
tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 "; tmp = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 ";
memcpy(sql, tmp, strlen(tmp) + 1); memcpy(sql, tmp, strlen(tmp) + 1);
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
// case 7 // case 7
tmp = " st , "; tmp = " st , ";
memcpy(sql, tmp, strlen(tmp) + 1); memcpy(sql, tmp, strlen(tmp) + 1);
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
// case 8 false // case 8 false
tmp = ", st , "; tmp = ", st , ";
memcpy(sql, tmp, strlen(tmp) + 1); memcpy(sql, tmp, strlen(tmp) + 1);
memset(&elements, 0, sizeof(SSmlLineInfo)); memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(sql, &elements, &msgBuf); ret = smlParseInfluxString(sql, sql + strlen(sql), &elements, &msgBuf);
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
taosMemoryFree(sql); taosMemoryFree(sql);
} }
...@@ -542,7 +542,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) { ...@@ -542,7 +542,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) {
"sys.procs.running 1479496100 42 host= web01", "sys.procs.running 1479496100 42 host= web01",
}; };
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
int ret = smlParseTelnetLine(info, (void *)sql[i]); int ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
} }
...@@ -561,7 +561,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) { ...@@ -561,7 +561,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) {
int ret = TSDB_CODE_SUCCESS; int ret = TSDB_CODE_SUCCESS;
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
ret = smlParseTelnetLine(info, (void *)sql[i]); ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
if (ret != TSDB_CODE_SUCCESS) break; if (ret != TSDB_CODE_SUCCESS) break;
} }
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
...@@ -617,7 +617,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) { ...@@ -617,7 +617,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) {
int ret = TSDB_CODE_SUCCESS; int ret = TSDB_CODE_SUCCESS;
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
ret = smlParseTelnetLine(info, (void *)sql[i]); ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
} }
...@@ -653,7 +653,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) { ...@@ -653,7 +653,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
int ret = TSDB_CODE_SUCCESS; int ret = TSDB_CODE_SUCCESS;
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
ret = smlParseTelnetLine(info, (void *)sql[i]); ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
if (ret != TSDB_CODE_SUCCESS) break; if (ret != TSDB_CODE_SUCCESS) break;
} }
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
...@@ -688,7 +688,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) { ...@@ -688,7 +688,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
}; };
int ret = TSDB_CODE_SUCCESS; int ret = TSDB_CODE_SUCCESS;
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
ret = smlParseTelnetLine(info, (void *)sql[i]); ret = smlParseTelnetLine(info, (void *)sql[i], strlen(sql[i]));
if (ret != TSDB_CODE_SUCCESS) break; if (ret != TSDB_CODE_SUCCESS) break;
} }
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
...@@ -1002,7 +1002,7 @@ TEST(testCase, sml_col_4096_Test) { ...@@ -1002,7 +1002,7 @@ TEST(testCase, sml_col_4096_Test) {
int ret = TSDB_CODE_SUCCESS; int ret = TSDB_CODE_SUCCESS;
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) { for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
ret = smlParseInfluxLine(info, sql[i]); ret = smlParseInfluxLine(info, sql[i], strlen(sql[i]));
if (ret != TSDB_CODE_SUCCESS) break; if (ret != TSDB_CODE_SUCCESS) break;
} }
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
......
...@@ -1155,37 +1155,64 @@ int smlProcess_18784_Test() { ...@@ -1155,37 +1155,64 @@ int smlProcess_18784_Test() {
return code; return code;
} }
int sml_19221_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
const char *sql[] = {
"qelhxo,id=pnnqhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000\nqelhxo,id=pnnhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000\nqelhxo,id=pnqhsa,t0=t,t1=127i8 c11=L\"ncharColValue\",c0=t,c1=127i8 1626006833639000000",
};
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
void* tmp = taosMemoryCalloc(256, 1);
memcpy(tmp, sql[0], strlen(sql[0]));
*(char*)(tmp+44) = 0;
int32_t totalRows = 0;
pRes = taos_schemaless_insert_raw(taos, tmp, strlen(sql[0]), &totalRows, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
return code;
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
int ret = 0; int ret = 0;
ret = smlProcess_influx_Test(); // ret = smlProcess_influx_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = smlProcess_telnet_Test(); // ret = smlProcess_telnet_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = smlProcess_json1_Test(); // ret = smlProcess_json1_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = smlProcess_json2_Test(); // ret = smlProcess_json2_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = smlProcess_json3_Test(); // ret = smlProcess_json3_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = smlProcess_json4_Test(); // ret = smlProcess_json4_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = sml_TD15662_Test(); // ret = sml_TD15662_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = sml_TD15742_Test(); // ret = sml_TD15742_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = sml_16384_Test(); // ret = sml_16384_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = sml_oom_Test(); // ret = sml_oom_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = sml_16368_Test(); // ret = sml_16368_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = sml_dup_time_Test(); // ret = sml_dup_time_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = sml_16960_Test(); // ret = sml_16960_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = sml_add_tag_col_Test(); // ret = sml_add_tag_col_Test();
ASSERT(!ret); // ASSERT(!ret);
ret = smlProcess_18784_Test(); // ret = smlProcess_18784_Test();
// ASSERT(!ret);
ret = sml_19221_Test();
ASSERT(!ret); ASSERT(!ret);
return ret; return ret;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册