From 79e1a7bcf8dcee26cd4d00e29807b0127d106d7e Mon Sep 17 00:00:00 2001 From: robot Date: Tue, 12 Nov 2019 10:40:44 +0800 Subject: [PATCH] Add support for continuous query in Lua connector. Malloc memory to store Lua state and callback function from Lua. Connector can support multiple connection by this way. Memory will be freed when connection isn't established successfully or connection is closed by lua call. Stream content in Lua is not the same one with stream content in connector. A loop in test code do the job for continuous query. record is inserted between query actions. --- tests/examples/lua/build.sh | 0 tests/examples/lua/lua_connector.c | 131 ++++++++++++++++++++++++++++- tests/examples/lua/test.lua | 69 ++++++++++++++- 3 files changed, 195 insertions(+), 5 deletions(-) mode change 100644 => 100755 tests/examples/lua/build.sh diff --git a/tests/examples/lua/build.sh b/tests/examples/lua/build.sh old mode 100644 new mode 100755 diff --git a/tests/examples/lua/lua_connector.c b/tests/examples/lua/lua_connector.c index f37657e822..f4065bb274 100644 --- a/tests/examples/lua/lua_connector.c +++ b/tests/examples/lua/lua_connector.c @@ -7,8 +7,15 @@ #include #include -static int l_connect(lua_State *L) -{ +struct cb_param{ + lua_State* state; + int callback; + void * stream; +}; + + + +static int l_connect(lua_State *L){ TAOS * taos; char *host = lua_tostring(L, 1); char *user = lua_tostring(L, 2); @@ -29,6 +36,7 @@ static int l_connect(lua_State *L) lua_pushstring(L, taos_errstr(taos)); lua_setfield(L, table_index, "error"); lua_pushlightuserdata(L,NULL); + lua_setfield(L, table_index, "conn"); }else{ printf("success to connect server\n"); lua_pushnumber(L, 0); @@ -49,7 +57,7 @@ static int l_query(lua_State *L){ lua_newtable(L); int table_index = lua_gettop(L); - printf("receive command:%s\r\n",s); + // printf("receive command:%s\r\n",s); if(taos_query(taos, s)!=0){ printf("failed, reason:%s\n", taos_errstr(taos)); lua_pushnumber(L, -1); @@ -78,8 +86,12 @@ static int l_query(lua_State *L){ TAOS_FIELD *fields = taos_fetch_fields(result); char temp[256]; + int affectRows = taos_affected_rows(taos); + // printf(" affect rows:%d\r\n", affectRows); lua_pushnumber(L, 0); lua_setfield(L, table_index, "code"); + lua_pushinteger(L, affectRows); + lua_setfield(L, table_index, "affected"); lua_newtable(L); while ((row = taos_fetch_row(result))) { @@ -95,7 +107,7 @@ static int l_query(lua_State *L){ } lua_pushstring(L,fields[i].name); - //printf("field name:%s,type:%d\n",fields[i].name,fields[i].type); + switch (fields[i].type) { case TSDB_DATA_TYPE_TINYINT: lua_pushinteger(L,*((char *)row[i])); @@ -142,6 +154,115 @@ static int l_query(lua_State *L){ return 1; } +void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ + + struct cb_param* p = (struct cb_param*) param; + TAOS_FIELD *fields = taos_fetch_fields(result); + int numFields = taos_num_fields(result); + + printf("\n\r-----------------------------------------------------------------------------------\n"); + + // printf("r:%d, L:%d\n",p->callback, p->state); + + lua_State *L = p->state; + lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback); + + lua_newtable(L); + + for (int i = 0; i < numFields; ++i) { + if (row[i] == NULL) { + continue; + } + + lua_pushstring(L,fields[i].name); + + switch (fields[i].type) { + case TSDB_DATA_TYPE_TINYINT: + lua_pushinteger(L,*((char *)row[i])); + break; + case TSDB_DATA_TYPE_SMALLINT: + lua_pushinteger(L,*((short *)row[i])); + break; + case TSDB_DATA_TYPE_INT: + lua_pushinteger(L,*((int *)row[i])); + break; + case TSDB_DATA_TYPE_BIGINT: + lua_pushinteger(L,*((int64_t *)row[i])); + break; + case TSDB_DATA_TYPE_FLOAT: + lua_pushnumber(L,*((float *)row[i])); + break; + case TSDB_DATA_TYPE_DOUBLE: + lua_pushnumber(L,*((double *)row[i])); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + lua_pushstring(L,(char *)row[i]); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + lua_pushinteger(L,*((int64_t *)row[i])); + break; + case TSDB_DATA_TYPE_BOOL: + lua_pushinteger(L,*((char *)row[i])); + break; + default: + lua_pushnil(L); + break; + } + + lua_settable(L, -3); + } + + lua_call(L, 1, 0); + + printf("-----------------------------------------------------------------------------------\n\r"); +} + +static int l_open_stream(lua_State *L){ + int r = luaL_ref(L, LUA_REGISTRYINDEX); + TAOS * taos = lua_topointer(L,1); + char * sqlstr = lua_tostring(L,2); + int stime = luaL_checknumber(L,3); + + lua_newtable(L); + int table_index = lua_gettop(L); + + struct cb_param *p = malloc(sizeof(struct cb_param)); + p->state = L; + p->callback=r; + // printf("r:%d, L:%d\n",r,L); + void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL); + if (s == NULL) { + printf("failed to open stream, reason:%s\n", taos_errstr(taos)); + free(p); + lua_pushnumber(L, -1); + lua_setfield(L, table_index, "code"); + lua_pushstring(L, taos_errstr(taos)); + lua_setfield(L, table_index, "error"); + lua_pushlightuserdata(L,NULL); + lua_setfield(L, table_index, "stream"); + }else{ + // printf("success to open stream\n"); + lua_pushnumber(L, 0); + lua_setfield(L, table_index, "code"); + lua_pushstring(L, taos_errstr(taos)); + lua_setfield(L, table_index, "error"); + p->stream = s; + lua_pushlightuserdata(L,p); + lua_setfield(L, table_index, "stream");//stream has different content in lua and c. + } + + return 1; +} + +static int l_close_stream(lua_State *L){ + //TODO:get stream and free cb_param + struct cb_param *p = lua_touserdata(L,1); + taos_close_stream(p->stream); + free(p); + return 0; +} + static int l_close(lua_State *L){ TAOS * taos= lua_topointer(L,1); lua_newtable(L); @@ -166,6 +287,8 @@ static const struct luaL_Reg lib[] = { {"connect", l_connect}, {"query", l_query}, {"close", l_close}, + {"open_stream", l_open_stream}, + {"close_stream", l_close_stream}, {NULL, NULL} }; diff --git a/tests/examples/lua/test.lua b/tests/examples/lua/test.lua index f644b82dd4..38ae1c82f2 100644 --- a/tests/examples/lua/test.lua +++ b/tests/examples/lua/test.lua @@ -35,10 +35,12 @@ if res.code ~=0 then return end -res = driver.query(conn,"insert into m1 values (1592222222222,0,'robotspace'), (1592222222223,1,'Hilink'),(1592222222224,2,'Harmony')") +res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')") if res.code ~=0 then print(res.error) return +else + print("insert successfully, affected:"..res.affected) end res = driver.query(conn,"select * from m1") @@ -55,4 +57,69 @@ else end end +res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)") +if res.code ~=0 then + print(res.error) + return +end +res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)") +if res.code ~=0 then + print(res.error) + return +end +res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)") + +if res.code ~=0 then + print(res.error) + return +else + print("insert successfully, affected:"..res.affected) +end + +res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type") +if res.code ~=0 then + print("select error:"..res.error) + return +else + print("in lua, result:") + for i = 1, #(res.item) do + print("res:"..res.item[i].count) + end +end + +function callback(t) + print("continuous query result:") + for key, value in pairs(t) do + print("key:"..key..", value:"..value) + end +end + +local stream +res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback) +if res.code ~=0 then + print("open stream error:"..res.error) + return +else + print("openstream ok") + stream = res.stream +end + +--From now on we begin continous query in an definite (infinite if you want) loop. +local loop_index = 0 +while loop_index < 20 do + local t = os.time()*1000 + local v = loop_index + res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v)) + + if res.code ~=0 then + print(res.error) + return + else + print("insert successfully, affected:"..res.affected) + end + os.execute("sleep " .. 1) + loop_index = loop_index + 1 +end + +driver.close_stream(stream) driver.close(conn) -- GitLab