提交 fbc3ca60 编写于 作者: H Haojun Liao

Merge branch '3.0' into feature/3.0_liaohj

......@@ -22,7 +22,7 @@
static int running = 1;
static void msg_process(TAOS_RES* msg) {
char buf[1024];
memset(buf, 0, 1024);
/*memset(buf, 0, 1024);*/
printf("topic: %s\n", tmq_get_topic_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
while (1) {
......
......@@ -48,6 +48,7 @@ enum {
typedef enum EStreamType {
STREAM_NORMAL = 1,
STREAM_INVERT,
STREAM_REPROCESS,
STREAM_INVALID,
} EStreamType;
......
......@@ -193,7 +193,6 @@ typedef struct SScanPhysiNode {
} SScanPhysiNode;
typedef SScanPhysiNode STagScanPhysiNode;
typedef SScanPhysiNode SStreamScanPhysiNode;
typedef struct SSystemTableScanPhysiNode {
SScanPhysiNode scan;
......@@ -217,6 +216,7 @@ typedef struct STableScanPhysiNode {
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;
typedef STableScanPhysiNode SStreamScanPhysiNode;
typedef struct SProjectPhysiNode {
SPhysiNode node;
......
......@@ -77,7 +77,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void* smlInitHandle(SQuery* pQuery);
void smlDestroyHandle(void* pHandle);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* colsSchema, SArray* cols, bool format,
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format,
STableMeta* pTableMeta, char* tableName, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
......
......@@ -303,6 +303,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
break;
}
}
str[len] = 0;
return len;
}
......@@ -567,7 +568,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
// todo directly call fp
}
taos_query_l(taos, sql, (int32_t) strlen(sql));
taos_query_l(taos, sql, (int32_t)strlen(sql));
}
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
......
此差异已折叠。
......@@ -33,7 +33,7 @@ int main(int argc, char **argv) {
return RUN_ALL_TESTS();
}
TEST(testCase, smlParseString_Test) {
TEST(testCase, smlParseInfluxString_Test) {
char msg[256] = {0};
SSmlMsgBuf msgBuf;
msgBuf.buf = msg;
......@@ -42,7 +42,7 @@ TEST(testCase, smlParseString_Test) {
// case 1
char *sql = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ,32,c=3";
int ret = smlParseString(sql, &elements, &msgBuf);
int ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql);
ASSERT_EQ(elements.measureLen, strlen("st"));
......@@ -60,13 +60,13 @@ TEST(testCase, smlParseString_Test) {
// case 2 false
sql = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf);
ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_NE(ret, 0);
// case 3 false
sql = "st, t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf);
ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 2);
ASSERT_EQ(elements.colsLen, strlen("t1=3,t2=4,t3=t3"));
......@@ -74,7 +74,7 @@ TEST(testCase, smlParseString_Test) {
// case 4 tag is null
sql = "st, c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000";
memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf);
ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql);
ASSERT_EQ(elements.measureLen, strlen("st"));
......@@ -92,7 +92,7 @@ TEST(testCase, smlParseString_Test) {
// case 5 tag is null
sql = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 1626006833639000000 ";
memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf);
ret = smlParseInfluxString(sql, &elements, &msgBuf);
sql++;
ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.measure, sql);
......@@ -111,13 +111,13 @@ TEST(testCase, smlParseString_Test) {
// case 6
sql = " st c1=3i64,c3=\"passit hello,c1=2\",c2=false,c4=4f64 ";
memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf);
ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_EQ(ret, 0);
// case 7
sql = " st , ";
memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf);
ret = smlParseInfluxString(sql, &elements, &msgBuf);
sql++;
ASSERT_EQ(ret, 0);
ASSERT_EQ(elements.cols, sql + elements.measureTagsLen + 3);
......@@ -126,7 +126,7 @@ TEST(testCase, smlParseString_Test) {
// case 8 false
sql = ", st , ";
memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseString(sql, &elements, &msgBuf);
ret = smlParseInfluxString(sql, &elements, &msgBuf);
ASSERT_NE(ret, 0);
}
......@@ -140,15 +140,13 @@ TEST(testCase, smlParseCols_Error_Test) {
"c=f64", // double
"c=8f64f",
"c=8ef64",
"c=1.7976931348623158e+390f64",
"c=f32", // float
"c=8f32f",
"c=8wef32",
"c=-3.402823466e+39f32",
"c=", // float
"c=", // double
"c=8f",
"c=8we",
"c=3.402823466e+39",
"c=i8", // tiny int
"c=-8i8f",
"c=8wei8",
......@@ -218,7 +216,7 @@ TEST(testCase, smlParseCols_tag_Test) {
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
const char *data =
"cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
"cbin=\"passit helloc=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
int32_t len = strlen(data);
int32_t ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
......@@ -230,7 +228,7 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
ASSERT_EQ(kv->valueLen, 18);
ASSERT_EQ(kv->valueLen, 17);
ASSERT_EQ(strncasecmp(kv->value, "\"passit", 7), 0);
taosMemoryFree(kv);
......@@ -280,7 +278,7 @@ TEST(testCase, smlParseCols_Test) {
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
int32_t len = strlen(data);
int32_t ret = smlParseCols(data, len, cols, false, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
......@@ -321,17 +319,17 @@ TEST(testCase, smlParseCols_Test) {
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_EQ(kv->length, 8);
//ASSERT_EQ(kv->d, 4.31);
printf("4.31 = kv->f:%f\n", kv->d);
printf("4.31 = kv->d:%f\n", kv->d);
taosMemoryFree(kv);
// float
kv = (SSmlKv *)taosArrayGetP(cols, 4);
ASSERT_EQ(strncasecmp(kv->key, "cf32_", 5), 0);
ASSERT_EQ(strncasecmp(kv->key, "cf64_", 5), 0);
ASSERT_EQ(kv->keyLen, 5);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_FLOAT);
ASSERT_EQ(kv->length, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_EQ(kv->length, 8);
//ASSERT_EQ(kv->f, 8.32);
printf("8.32 = kv->f:%f\n", kv->f);
printf("8.32 = kv->d:%f\n", kv->d);
taosMemoryFree(kv);
// float
......@@ -467,7 +465,7 @@ TEST(testCase, smlParseCols_Test) {
taosHashCleanup(dumplicateKey);
}
TEST(testCase, smlParseLine_Test) {
TEST(testCase, smlProcess_influx_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
......@@ -483,7 +481,7 @@ TEST(testCase, smlParseLine_Test) {
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql[9] = {
const char *sql[11] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000",
......@@ -492,14 +490,24 @@ TEST(testCase, smlParseLine_Test) {
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000",
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000"
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000",
"stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=3,c4=4 1451629500000000000",
"stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000"
};
smlInsertLines(info, (char**)sql, 9);
// for (int i = 0; i < 3; i++) {
// smlParseLine(info, sql[i]);
// }
smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 11);
int rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 2);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
}
// different types
TEST(testCase, smlParseLine_error_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
......@@ -520,24 +528,247 @@ TEST(testCase, smlParseLine_error_Test) {
"measure,t1=3 c1=8",
"measure,t2=3 c1=8u8"
};
int ret = smlInsertLines(info, (char **)sql, 2);
int ret = smlProcess(info, (char **)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
}
// TEST(testCase, smlParseTS_Test) {
// char msg[256] = {0};
// SSmlMsgBuf msgBuf;
// msgBuf.buf = msg;
// msgBuf.len = 256;
// SSmlLineInfo elements = {0};
//
// SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, dataFormat);
// if(!info){
// return (TAOS_RES*)request;
// }
// ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
// if(ret != TSDB_CODE_SUCCESS){
// uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
// return ret;
TEST(testCase, smlGetTimestampLen_Test) {
uint8_t len = smlGetTimestampLen(0);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(390);
ASSERT_EQ(len, 3);
len = smlGetTimestampLen(-1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(-10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(-390);
ASSERT_EQ(len, 3);
}
TEST(testCase, smlProcess_telnet_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql[4] = {
"sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0",
"sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01 ",
"sys.if.bytes.out 1479496102 1.3E3 network=tcp",
"sys.procs.running 1479496100 42 host=web01"
};
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0);
TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 2);
int rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 1);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961");
ASSERT_NE(res, nullptr);
fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 2);
rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 2);
for (int i = 0; i < rowNum; ++i) {
TAOS_ROW rows = taos_fetch_row(res);
}
}
TEST(testCase, smlProcess_json_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, nullptr);
const char *sql = "[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"web01\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" },\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 9,\n"
" \"tags\": {\n"
" \"host\": \"web02\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" }\n"
"]";
int ret = smlProcess(info, (char**)(&sql), -1);
ASSERT_EQ(ret, 0);
TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 2);
// int rowNum = taos_affected_rows(res);
// ASSERT_EQ(rowNum, 1);
// for (int i = 0; i < rowNum; ++i) {
// TAOS_ROW rows = taos_fetch_row(res);
// }
// }
sql = "{\n"
" \"metric\": \"meter_current\",\n"
" \"timestamp\": {\n"
" \"value\" : 1346846400,\n"
" \"type\" : \"s\"\n"
" },\n"
" \"value\": {\n"
" \"value\" : 10.3,\n"
" \"type\" : \"i64\"\n"
" },\n"
" \"tags\": {\n"
" \"groupid\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"bigint\"\n"
" },\n"
" \"location\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"binary\"\n"
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
ret = smlProcess(info, (char**)(&sql), -1);
ASSERT_EQ(ret, 0);
sql = "{\n"
" \"metric\": \"meter_current\",\n"
" \"timestamp\": {\n"
" \"value\" : 1346846400,\n"
" \"type\" : \"s\"\n"
" },\n"
" \"value\": {\n"
" \"value\" : 10.3,\n"
" \"type\" : \"i64\"\n"
" },\n"
" \"tags\": {\n"
" \"t1\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"bigint\"\n"
" },\n"
" \"t2\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"int\"\n"
" },\n"
" \"t3\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"i16\"\n"
" },\n"
" \"t4\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"i8\"\n"
" },\n"
" \"t5\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"f32\"\n"
" },\n"
" \"t6\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"double\"\n"
" },\n"
" \"t7\": { \n"
" \"value\" : \"8323\",\n"
" \"type\" : \"binary\"\n"
" },\n"
" \"t8\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"binary\"\n"
" },\n"
" \"t9\": { \n"
" \"value\" : true,\n"
" \"type\" : \"bool\"\n"
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
ret = smlProcess(info, (char**)(&sql), -1);
ASSERT_EQ(ret, 0);
sql = "{\n"
" \"metric\": \"meter_current\",\n"
" \"timestamp\": {\n"
" \"value\" : 1346846400000,\n"
" \"type\" : \"ms\"\n"
" },\n"
" \"value\": \"ni\",\n"
" \"tags\": {\n"
" \"t1\": { \n"
" \"value\" : 20,\n"
" \"type\" : \"i64\"\n"
" },\n"
" \"t2\": { \n"
" \"value\" : 25,\n"
" \"type\" : \"i32\"\n"
" },\n"
" \"t3\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"smallint\"\n"
" },\n"
" \"t4\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"tinyint\"\n"
" },\n"
" \"t5\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"float\"\n"
" },\n"
" \"t6\": { \n"
" \"value\" : 0.2,\n"
" \"type\" : \"f64\"\n"
" },\n"
" \"t7\": \"nsj\",\n"
" \"t8\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"binary\"\n"
" },\n"
" \"t9\": false,\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
ret = smlProcess(info, (char**)(&sql), -1);
ASSERT_EQ(ret, 0);
}
......@@ -1447,6 +1447,10 @@ void blockDebugShowData(const SArray* dataBlocks) {
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (pColInfoData->hasNull) {
printf(" %15s |", "NULL");
continue;
}
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
......@@ -1464,6 +1468,9 @@ void blockDebugShowData(const SArray* dataBlocks) {
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15lu |", *(uint64_t*)var);
break;
case TSDB_DATA_TYPE_DOUBLE:
printf(" %15f |", *(double*)var);
break;
}
}
printf("\n");
......
......@@ -457,9 +457,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
if (pHeadWithCkSum->head.msgType != TDMT_VND_SUBMIT) {
walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum);
ASSERT(walSkipFetchBody(pExec->pWalReader, pHeadWithCkSum) == 0);
} else {
walFetchBody(pExec->pWalReader, &pHeadWithCkSum);
ASSERT(walFetchBody(pExec->pWalReader, &pHeadWithCkSum) == 0);
}
SWalReadHead* pHead = &pHeadWithCkSum->head;
......@@ -950,6 +950,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
};
pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
......
......@@ -371,9 +371,18 @@ typedef struct STagScanInfo {
STableGroupInfo *pTableGroups;
} STagScanInfo;
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER,
} EStreamScanMode;
typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info
......@@ -383,8 +392,12 @@ typedef struct SStreamBlockScanInfo {
SArray* pColMatchInfo; //
SNode* pCondition;
SArray* tsArray;
SUpdateInfo* pUpdateInfo;
SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader;
EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
......@@ -690,8 +703,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock,
SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo,
SNode* pConditions, SOperatorInfo* pOperatorDumy, SInterval* pInterval);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
......@@ -745,6 +759,15 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
uint64_t groupId, int32_t numOfOutput);
#ifdef __cplusplus
}
......
......@@ -344,6 +344,28 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
return pResultRow;
}
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
uint64_t groupId, int32_t numOfOutput) {
SAggSupporter* pSup = &pInfo->aggSup;
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
continue;
}
pResInfo->initialized = false;
if (pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
}
}
}
/**
* the struct of key in hash table
* +----------+---------------+
......@@ -4765,18 +4787,48 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
int32_t numOfCols = 0;
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
} else {
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
}
if (pDataReader == NULL && terrno != 0) {
qDebug("pDataReader is NULL");
// return NULL;
} else {
qDebug("pDataReader is not NULL");
}
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SSDataBlock* pResBlockDumy = createResDataBlock(pDescNode);
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
SInterval interval = extractIntervalInfo(pTableScanNode);
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(
pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList,
pResBlockDumy, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
// int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
// queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t numOfCols = 0;
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions, pOperatorDumy, &interval);
taosArrayDestroy(tableIdList);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
......
......@@ -568,7 +568,40 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
if (pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
pInfo->interval.precision, NULL);
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
pTableScanInfo->cond.twindow = win;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
pTableScanInfo->scanTimes = 0;
return true;
} else {
return false;
}
}
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pOperatorDumy);
if (pResult == NULL) {
if (prepareDataScan(pInfo)) {
// scan next window data
pResult = doTableScan(pInfo->pOperatorDumy);
}
}
return pResult;
}
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) {
SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
......@@ -576,13 +609,19 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
taosArrayPush(pInfo->tsArray, ts + i);
}
}
if (taosArrayGetSize(pInfo->tsArray) > 0) {
int32_t size = taosArrayGetSize(pInfo->tsArray);
if (size > 0 && invertible) {
// TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
return NULL;
SSDataBlock* p = createOneDataBlock(pInfo->pRes, false);
taosArraySet(p->pDataBlock, 0, pInfo->tsArray);
p->info.rows = size;
p->info.type = STREAM_REPROCESS;
taosArrayClear(pInfo->tsArray);
return p;
}
return NULL;
}
......@@ -609,14 +648,23 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++;
return taosArrayGetP(pInfo->pBlockLists, current);
} else {
if (total > 0) {
ASSERT(total == 2);
SSDataBlock* pRes = taosArrayGetP(pInfo->pBlockLists, 0);
SSDataBlock* pUpRes = taosArrayGetP(pInfo->pBlockLists, 1);
blockDataDestroy(pUpRes);
taosArrayClear(pInfo->pBlockLists);
return pRes;
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
return pInfo->pRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
blockDataCleanup(pInfo->pRes);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo);
if (pSDB == NULL) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else {
return pSDB;
}
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
......@@ -682,12 +730,18 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) {
pOperator->status = OP_EXEC_DONE;
} else {
SSDataBlock* upRes = getUpdateDataBlock(pInfo);
} else if (pInfo->interval.interval > 0) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
if (upRes) {
taosArrayPush(pInfo->pBlockLists, &(pInfo->pRes));
taosArrayPush(pInfo->pBlockLists, &upRes);
return upRes;
pInfo->pUpdateRes = upRes;
if (upRes->info.type = STREAM_REPROCESS) {
pInfo->updateResIndex = 0;
prepareDataScan(pInfo);
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (upRes->info.type = STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return upRes;
}
}
}
......@@ -695,8 +749,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition) {
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader,
SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList,
SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy,
SInterval* pInterval) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -736,7 +792,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
}
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInitP(pInterval, 10000); // TODO(liuyao) get watermark from physical plan
if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
......@@ -746,6 +802,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pInfo->readerHandle = streamReadHandle;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pOperatorDumy;
pInfo->interval = *pInterval;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
......@@ -82,7 +82,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T
}
// get the correct time window according to the handled timestamp
static STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win) {
STimeWindow w = {0};
......@@ -186,7 +186,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return forwardStep;
}
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
int32_t midPos = -1;
int32_t numOfRows;
......@@ -249,7 +249,7 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
return midPos;
}
static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order) {
assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
......@@ -988,6 +988,20 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
}
}
}
static void doClearWindows(SIntervalAggOperatorInfo* pInfo, int32_t numOfOutput, SSDataBlock* pBlock) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], &pInfo->interval,
pInfo->interval.precision, NULL);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
doClearWindow(pInfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
......@@ -1028,6 +1042,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
}
if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(pInfo, pOperator->numOfExprs, pBlock);
continue;
}
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
}
......
......@@ -19,17 +19,18 @@ int32_t udf2_destroy() {
int32_t udf2_start(SUdfInterBuf *buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(int64_t);
buf->bufLen = sizeof(double);
buf->numOfResult = 0;
return 0;
}
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf;
double sumSquares = *(double*)interBuf->buf;
int8_t numOutput = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (col->colMeta.type != TSDB_DATA_TYPE_INT) {
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT ||
col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
......@@ -39,17 +40,29 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
if (udfColDataIsNull(col, j)) {
continue;
}
char* cell = udfColDataGetData(col, j);
int32_t num = *(int32_t*)cell;
sumSquares += num * num;
switch (col->colMeta.type) {
case TSDB_DATA_TYPE_INT: {
char* cell = udfColDataGetData(col, j);
int32_t num = *(int32_t*)cell;
sumSquares += num * num;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
char* cell = udfColDataGetData(col, j);
double num = *(double*)cell;
sumSquares += num * num;
break;
}
default:
break;
}
numOutput = 1;
}
}
if (numOutput == 1) {
*(int64_t*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(int64_t);
*(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double);
}
newInterBuf->numOfResult = numOutput;
return 0;
......@@ -60,7 +73,7 @@ int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
resultData->numOfResult = 0;
return 0;
}
int64_t sumSquares = *(int64_t*)(buf->buf);
double sumSquares = *(double*)(buf->buf);
*(double*)(resultData->buf) = sqrt(sumSquares);
resultData->bufLen = sizeof(double);
resultData->numOfResult = 1;
......
......@@ -1142,9 +1142,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
return code;
}
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiScanNodeToJson(pObj, pJson); }
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiTableScanNodeToJson(pObj, pJson); }
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiScanNode(pJson, pObj); }
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiTableScanNode(pJson, pObj); }
static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet";
static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite";
......
......@@ -1679,8 +1679,8 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
return TSDB_CODE_SUCCESS;
}
int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* colsSchema, SArray* cols, bool format,
STableMeta* pTableMeta, char* tableName, char* msgBuf, int16_t msgBufLen) {
int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols, bool format,
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
......@@ -1722,8 +1722,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* cols
initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
int32_t rowNum = format ? taosArrayGetSize(colsFormat) : taosArrayGetSize(cols);
if (rowNum <= 0) {
int32_t rowNum = taosArrayGetSize(cols);
if(rowNum <= 0) {
return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
}
ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
......@@ -1734,13 +1734,10 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* cols
for (int32_t r = 0; r < rowNum; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
void* rowData = NULL;
void *rowData = taosArrayGetP(cols, r);
size_t rowDataSize = 0;
if (format) {
rowData = taosArrayGetP(colsFormat, r);
if(format){
rowDataSize = taosArrayGetSize(rowData);
} else {
rowData = taosArrayGetP(cols, r);
}
// 1. set the parsed value from sql string
......
......@@ -460,9 +460,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
pTableScan->scanRange = pScanLogicNode->scanRange;
pTableScan->ratio = pScanLogicNode->ratio;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
}
if (pCxt->pExecNodeList) {
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
}
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pTableScan->dataRequired = pScanLogicNode->dataRequired;
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
......@@ -505,13 +509,12 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan =
(SStreamScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision,
(SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
int32_t res = createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
if (res == TSDB_CODE_SUCCESS) {
ENodeType type = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
setNodeType(*pPhyNode, type);
}
return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
return res;
}
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
......@@ -786,7 +789,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(
SScanPhysiNode* pScan = (SScanPhysiNode*)makePhysiNode(
pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
/*blockDebugShowData(pRes);*/
blockDebugShowData(pRes);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
......
......@@ -54,7 +54,7 @@ int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0;
TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
......@@ -156,7 +156,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
int32_t code;
int64_t code;
// TODO: valid ver
if (ver > pRead->pWal->vers.commitVer) {
......@@ -214,23 +214,24 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) {
return -1;
}
*ppHead = ptr;
pReadHead = &((*ppHead)->head);
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) {
ASSERT(0);
return -1;
}
if (pReadHead->version != ver) {
wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
ver);
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (walValidBodyCksum(*ppHead) != 0) {
wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver);
wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
......@@ -257,7 +258,7 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p
}
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
int64_t code;
// TODO: check wal life
if (pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
......
......@@ -19,8 +19,8 @@
#include "tref.h"
#include "walInt.h"
static int walSeekWritePos(SWal* pWal, int64_t ver) {
int code = 0;
static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
int64_t code = 0;
TdFilePtr pIdxTFile = pWal->pWriteIdxTFile;
TdFilePtr pLogTFile = pWal->pWriteLogTFile;
......@@ -45,7 +45,7 @@ static int walSeekWritePos(SWal* pWal, int64_t ver) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return code;
return 0;
}
int walSetWrite(SWal* pWal) {
......@@ -124,7 +124,7 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
}
int walSeekWriteVer(SWal* pWal, int64_t ver) {
int code;
int64_t code;
if (ver == pWal->vers.lastVer) {
return 0;
}
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c startUdfd -v 1
system sh/cfg.sh -n dnode1 -c udf -v 1
print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start
......
import taos
import sys
import inspect
import traceback
from util.log import *
from util.sql import *
from util.cases import *
PRIVILEGES_ALL = "ALL"
PRIVILEGES_READ = "READ"
PRIVILEGES_WRITE = "WRITE"
class TDconnect:
def __init__(self,
host = None,
port = None,
user = None,
password = None,
database = None,
config = None,
) -> None:
self._conn = None
self._host = host
self._user = user
self._password = password
self._database = database
self._port = port
self._config = config
def __enter__(self):
self._conn = taos.connect(
host =self._host,
port =self._port,
user =self._user,
password=self._password,
database=self._database,
config =self._config
)
self.cursor = self._conn.cursor()
return self
def error(self, sql):
expectErrNotOccured = True
try:
self.cursor.execute(sql)
except BaseException:
expectErrNotOccured = False
if expectErrNotOccured:
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured" )
else:
self.queryRows = 0
self.queryCols = 0
self.queryResult = None
tdLog.info(f"sql:{sql}, expect error occured")
def query(self, sql, row_tag=None):
# sourcery skip: raise-from-previous-error, raise-specific-error
self.sql = sql
try:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.notice(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, {repr(e)}")
traceback.print_exc()
raise Exception(repr(e))
if row_tag:
return self.queryResult
return self.queryRows
def __exit__(self, types, values, trace):
if self._conn:
self.cursor.close()
self._conn.close()
def taos_connect(
host = "127.0.0.1",
port = 6030,
user = "root",
passwd = "taosdata",
database= None,
config = None
):
return TDconnect(
host = host,
port=port,
user=user,
password=passwd,
database=database,
config=config
)
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
@property
def __user_list(self):
return [f"user_test{i}" for i in range(self.users_count) ]
@property
def __passwd_list(self):
return [f"taosdata{i}" for i in range(self.users_count) ]
@property
def __privilege(self):
return [ PRIVILEGES_ALL, PRIVILEGES_READ, PRIVILEGES_WRITE ]
def __priv_level(self, dbname=None):
return f"{dbname}.*" if dbname else "*.*"
def create_user_current(self):
users = self.__user_list
passwds = self.__passwd_list
for i in range(self.users_count):
tdSql.execute(f"create user {users[i]} pass '{passwds[i]}' ")
tdSql.query("show users")
tdSql.checkRows(self.users_count + 1)
def create_user_err(self):
sqls = [
"create users u1 pass 'u1passwd' ",
"create user '' pass 'u1passwd' ",
"create user pass 'u1passwd' ",
"create user u1 pass u1passwd ",
"create user u1 password 'u1passwd' ",
"create user u1 pass u1passwd ",
"create user u1 pass '' ",
"create user u1 pass ' ' ",
"create user u1 pass ",
"create user u1 u2 pass 'u1passwd' 'u2passwd' ",
"create user u1 u2 pass 'u1passwd', 'u2passwd' ",
"create user u1, u2 pass 'u1passwd', 'u2passwd' ",
"create user u1, u2 pass 'u1passwd' 'u2passwd' ",
# length of user_name must <= 23
"create user u12345678901234567890123 pass 'u1passwd' " ,
# length of passwd must <= 128
"create user u1 pass 'u12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678' " ,
# password must have not " ' ~ ` \
"create user u1 pass 'u1passwd\\' " ,
"create user u1 pass 'u1passwd~' " ,
"create user u1 pass 'u1passwd\"' " ,
"create user u1 pass 'u1passwd\'' " ,
"create user u1 pass 'u1passwd`' " ,
# must after create a user named u1
"create user u1 pass 'u1passwd' " ,
]
tdSql.execute("create user u1 pass 'u1passwd' ")
for sql in sqls:
tdSql.error(sql)
def __alter_pass_sql(self, user, passwd):
return f'''ALTER USER {user} PASS '{passwd}' '''
def alter_pass_current(self):
self.__init_pass = True
for count, i in enumerate(range(self.users_count)):
if self.__init_pass:
tdSql.query(self.__alter_pass_sql(self.__user_list[i], f"new{self.__passwd_list[i]}"))
self.__init_pass = count != self.users_count - 1
else:
tdSql.query(self.__alter_pass_sql(self.__user_list[i], self.__passwd_list[i] ) )
self.__init_pass = count == self.users_count - 1
def alter_pass_err(self): # sourcery skip: remove-redundant-fstring
sqls = [
f"alter users {self.__user_list[0]} pass 'newpass' " ,
f"alter user {self.__user_list[0]} pass '' " ,
f"alter user {self.__user_list[0]} pass ' ' " ,
f"alter user anyuser pass 'newpass' " ,
f"alter user {self.__user_list[0]} pass " ,
f"alter user {self.__user_list[0]} password 'newpass' " ,
]
for sql in sqls:
tdSql.error(sql)
def grant_user_privileges(self, privilege, dbname=None, user_name="root"):
return f"GRANT {privilege} ON {self.__priv_level(dbname)} TO {user_name} "
def test_user_create(self):
self.create_user_current()
self.create_user_err()
def test_alter_pass(self):
self.alter_pass_current()
self.alter_pass_err()
def user_login(self, user, passwd):
login_except = False
try:
with taos_connect(user=user, passwd=passwd) as conn:
cursor = conn.cursor
except BaseException:
login_except = True
cursor = None
return login_except, cursor
def login_currrent(self, user, passwd):
login_except, _ = self.user_login(user, passwd)
if login_except:
tdLog.exit(f"connect failed, user: {user} and pass: {passwd} do not match!")
else:
tdLog.info("connect successfully, user and pass matched!")
def login_err(self, user, passwd):
login_except, _ = self.user_login(user, passwd)
if login_except:
tdLog.info("connect failed, except error occured!")
else:
tdLog.exit("connect successfully, except error not occrued!")
def __drop_user(self, user):
return f"DROP USER {user}"
def drop_user_current(self):
for user in self.__user_list:
tdSql.query(self.__drop_user(user))
def drop_user_error(self):
sqls = [
f"DROP {self.__user_list[0]}",
f"DROP user {self.__user_list[0]} {self.__user_list[1]}",
f"DROP user {self.__user_list[0]} , {self.__user_list[1]}",
f"DROP users {self.__user_list[0]} {self.__user_list[1]}",
f"DROP users {self.__user_list[0]} , {self.__user_list[1]}",
"DROP user root",
"DROP user abcde",
"DROP user ALL",
]
for sql in sqls:
tdSql.error(sql)
def test_drop_user(self):
# must drop err first
self.drop_user_error()
self.drop_user_current()
def run(self):
# 默认只有 root 用户
tdLog.printNoPrefix("==========step0: init, user list only has root account")
tdSql.query("show users")
tdSql.checkData(0, 0, "root")
tdSql.checkData(0, 1, "super")
# root用户权限
# 创建用户测试
tdLog.printNoPrefix("==========step1: create user test")
self.users_count = 5
self.test_user_create()
# 查看用户
tdLog.printNoPrefix("==========step2: show user test")
tdSql.query("show users")
tdSql.checkRows(self.users_count + 2)
# 密码登录认证
self.login_currrent(self.__user_list[0], self.__passwd_list[0])
self.login_err(self.__user_list[0], f"new{self.__passwd_list[0]}")
# 修改密码
tdLog.printNoPrefix("==========step3: alter user pass test")
self.test_alter_pass()
# 密码修改后的登录认证
tdLog.printNoPrefix("==========step4: check login test")
self.login_err(self.__user_list[0], self.__passwd_list[0])
self.login_currrent(self.__user_list[0], f"new{self.__passwd_list[0]}")
# 普通用户权限
# 密码登录
_, user = self.user_login(self.__user_list[0], f"new{self.__passwd_list[0]}")
with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as conn:
user = conn
# 不能创建用户
tdLog.printNoPrefix("==========step5: normal user can not create user")
user.error("create use utest1 pass 'utest1pass'")
# 可以查看用户
tdLog.printNoPrefix("==========step6: normal user can show user")
user.query("show users")
assert user.queryRows == self.users_count + 2
# 不可以修改其他用户的密码
tdLog.printNoPrefix("==========step7: normal user can not alter other user pass")
user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] ))
user.error("root", "taosdata_root")
# 可以修改自己的密码
tdLog.printNoPrefix("==========step8: normal user can alter owner pass")
user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0]))
# 不可以删除用户,包括自己
tdLog.printNoPrefix("==========step9: normal user can not drop any user ")
user.error(f"drop user {self.__user_list[0]}")
user.error(f"drop user {self.__user_list[1]}")
user.error("drop user root")
# root删除用户测试
tdLog.printNoPrefix("==========step10: super user drop normal user")
self.test_drop_user()
tdSql.query("show users")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "root")
tdSql.checkData(0, 1, "super")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -45,16 +45,16 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:query timestamp type")
# tdSql.query("select * from t1 where ts between now()-1m and now()+10m")
# tdSql.checkRows(10)
# tdSql.query("select * from t1 where ts between '2021-01-01 00:00:00.000' and '2121-01-01 00:00:00.000'")
tdSql.query("select * from t1 where ts between now()-1m and now()+10m")
tdSql.checkRows(10)
tdSql.query("select * from t1 where ts between '2021-01-01 00:00:00.000' and '2121-01-01 00:00:00.000'")
# tdSql.checkRows(11)
# tdSql.query("select * from t1 where ts between '1969-01-01 00:00:00.000' and '1969-12-31 23:59:59.999'")
# tdSql.checkRows(0)
# tdSql.query("select * from t1 where ts between -2793600 and 31507199")
tdSql.query("select * from t1 where ts between '1969-01-01 00:00:00.000' and '1969-12-31 23:59:59.999'")
# tdSql.checkRows(0)
# tdSql.query("select * from t1 where ts between 1609430400000 and 4765104000000")
# tdSql.checkRows(11)
tdSql.query("select * from t1 where ts between -2793600 and 31507199")
tdSql.checkRows(0)
tdSql.query("select * from t1 where ts between 1609430400000 and 4765104000000")
tdSql.checkRows(11)
tdLog.printNoPrefix("==========step4:query int type")
......@@ -68,11 +68,11 @@ class TDTestCase:
tdSql.checkRows(0)
# tdSql.query("select * from t1 where c1 between 0x64 and 0x69")
# tdSql.checkRows(6)
# tdSql.query("select * from t1 where c1 not between 100 and 106")
# tdSql.checkRows(11)
tdSql.query("select * from t1 where c1 not between 100 and 106")
tdSql.checkRows(11)
tdSql.query(f"select * from t1 where c1 between {2**31-2} and {2**31+1}")
tdSql.checkRows(1)
tdSql.error(f"select * from t2 where c1 between null and {1-2**31}")
tdSql.query(f"select * from t2 where c1 between null and {1-2**31}")
# tdSql.checkRows(3)
tdSql.query(f"select * from t2 where c1 between {-2**31} and {1-2**31}")
tdSql.checkRows(1)
......@@ -88,12 +88,12 @@ class TDTestCase:
tdSql.query("select * from t1 where c2 between 'DC3' and 'SYN'")
tdSql.checkRows(0)
tdSql.query("select * from t1 where c2 not between 0.1 and 0.2")
# tdSql.checkRows(11)
tdSql.checkRows(11)
tdSql.query(f"select * from t1 where c2 between {pow(10,38)*3.4} and {pow(10,38)*3.4+1}")
# tdSql.checkRows(1)
tdSql.query(f"select * from t2 where c2 between {-3.4*10**38-1} and {-3.4*10**38}")
# tdSql.checkRows(2)
tdSql.error(f"select * from t2 where c2 between null and {-3.4*10**38}")
tdSql.query(f"select * from t2 where c2 between null and {-3.4*10**38}")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step6:query bigint type")
......@@ -101,7 +101,7 @@ class TDTestCase:
tdSql.query(f"select * from t1 where c3 between {2**31} and {2**31+10}")
tdSql.checkRows(10)
tdSql.query(f"select * from t1 where c3 between {-2**63} and {2**63}")
# tdSql.checkRows(11)
tdSql.checkRows(11)
tdSql.query(f"select * from t1 where c3 between {2**31+10} and {2**31}")
tdSql.checkRows(0)
tdSql.query("select * from t1 where c3 between 'a' and 'z'")
......@@ -112,7 +112,7 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.query(f"select * from t2 where c3 between {-2**63} and {1-2**63}")
# tdSql.checkRows(3)
tdSql.error(f"select * from t2 where c3 between null and {1-2**63}")
tdSql.query(f"select * from t2 where c3 between null and {1-2**63}")
# tdSql.checkRows(2)
tdLog.printNoPrefix("==========step7:query double type")
......@@ -129,10 +129,10 @@ class TDTestCase:
tdSql.query("select * from t1 where c4 not between 1 and 2")
# tdSql.checkRows(0)
tdSql.query(f"select * from t1 where c4 between {1.7*10**308} and {1.7*10**308+1}")
# tdSql.checkRows(1)
tdSql.checkRows(1)
tdSql.query(f"select * from t2 where c4 between {-1.7*10**308-1} and {-1.7*10**308}")
# tdSql.checkRows(3)
tdSql.error(f"select * from t2 where c4 between null and {-1.7*10**308}")
tdSql.query(f"select * from t2 where c4 between null and {-1.7*10**308}")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step8:query smallint type")
......@@ -151,7 +151,7 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.query("select * from t2 where c5 between -32768 and -32767")
tdSql.checkRows(1)
tdSql.error("select * from t2 where c5 between null and -32767")
tdSql.query("select * from t2 where c5 between null and -32767")
# tdSql.checkRows(1)
tdLog.printNoPrefix("==========step9:query tinyint type")
......@@ -170,21 +170,21 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.query("select * from t2 where c6 between -128 and -127")
tdSql.checkRows(1)
tdSql.error("select * from t2 where c6 between null and -127")
tdSql.query("select * from t2 where c6 between null and -127")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step10:invalid query type")
# tdSql.query("select * from supt where location between 'beijing' and 'shanghai'")
# tdSql.checkRows(23)
# # 非0值均解析为1,因此"between 负值 and o"解析为"between 1 and 0"
# tdSql.query("select * from supt where isused between 0 and 1")
# tdSql.checkRows(23)
# tdSql.query("select * from supt where isused between -1 and 0")
# tdSql.checkRows(0)
# tdSql.error("select * from supt where isused between false and true")
# tdSql.query("select * from supt where family between '拖拉机' and '自行车'")
# tdSql.checkRows(23)
tdSql.query("select * from supt where location between 'beijing' and 'shanghai'")
tdSql.checkRows(23)
# 非0值均解析为1,因此"between 负值 and o"解析为"between 1 and 0"
tdSql.query("select * from supt where isused between 0 and 1")
tdSql.checkRows(23)
tdSql.query("select * from supt where isused between -1 and 0")
tdSql.checkRows(0)
tdSql.error("select * from supt where isused between false and true")
tdSql.query("select * from supt where family between '拖拉机' and '自行车'")
tdSql.checkRows(23)
tdLog.printNoPrefix("==========step11:query HEX/OCT/BIN type")
......
......@@ -114,7 +114,7 @@ class TDTestCase:
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test scenario 1: ")
tdLog.printNoPrefix("======== test case 1: Produce while consume")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
......@@ -122,8 +122,8 @@ class TDTestCase:
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 100, \
'batchNum': 10, \
'rowsPerTbl': 1000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
......@@ -163,8 +163,7 @@ class TDTestCase:
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
......@@ -172,7 +171,7 @@ class TDTestCase:
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
......@@ -209,18 +208,19 @@ class TDTestCase:
else:
time.sleep(5)
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3)))
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
# mulit rows and mulit tables in one sql, this num of msg is not sure
#tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ")
tdLog.printNoPrefix("======== test case 2: add child table with consuming ")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
......@@ -275,9 +275,9 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
rowsOfNewCtb = 1000
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
......@@ -285,7 +285,7 @@ class TDTestCase:
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
......@@ -312,7 +312,6 @@ class TDTestCase:
# create new child table and insert data
newCtbName = 'newctb'
rowsOfNewCtb = 1000
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
startTs = parameterDict["startTs"]
for j in range(rowsOfNewCtb):
......@@ -332,14 +331,135 @@ class TDTestCase:
else:
time.sleep(5)
expectmsgcnt += rowsOfNewCtb
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 3: tow topics, each contains a stable, \
but at the beginning, no ctables in the stable of one topic,\
after starting consumer, create ctables ")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
# wait db ready
while 1:
tdSql.query("show databases")
if tdSql.getRows() == 4:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
break
else:
time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready
while 1:
tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1:
break
else:
time.sleep(1)
tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column2'
topicFromCtb = 'topic_ctb_column2'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1)
tdSql.query("show topics")
topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0)
tdLog.info("show topics: %s, %s"%(topic1, topic2))
if topic1 != topicFromStb and topic1 != topicFromCtb:
tdLog.exit("topic error1")
if topic2 != topicFromStb and topic2 != topicFromCtb:
tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
rowsOfNewCtb = 1000
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
countOfStb = tdSql.getData(0, 0)
if countOfStb != 0:
tdLog.info("count from stb: %d"%countOfStb)
break
else:
time.sleep(1)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# create new child table and insert data
newCtbName = 'newctb'
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
startTs = parameterDict["startTs"]
for j in range(rowsOfNewCtb):
sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
tdSql.execute(sql)
tdLog.debug("insert data into new child table ............ [OK]")
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test scenario 2 end ...... ")
tdLog.printNoPrefix("======== test case 3 end ...... ")
def run(self):
tdSql.prepare()
......@@ -353,7 +473,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath)
#self.tmqCase2(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath)
#self.tmqCase3(cfgPath, buildPath)
def stop(self):
......
......@@ -98,12 +98,24 @@ static void printHelp() {
}
void initLogFile() {
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
char file[256];
sprintf(file, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
time_t now;
struct tm curTime;
char filename[256];
now = taosTime(NULL);
taosLocalTime(&now, &curTime);
sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt",
configDir,
curTime.tm_year+1900,
curTime.tm_mon+1,
curTime.tm_mday,
curTime.tm_hour,
curTime.tm_min,
curTime.tm_sec);
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
fprintf(stderr, "Failed to open %s for save result\n", filename);
exit(-1);
}
g_fp = pFile;
......@@ -333,8 +345,8 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalMsgs >= pInfo->expectMsgCnt, so break\n");
if (totalRows >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
break;
}
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册