diff --git a/tests/examples/lua/build.sh b/tests/examples/lua/build.sh old mode 100644 new mode 100755 diff --git a/tests/examples/lua/lua_connector.c b/tests/examples/lua/lua_connector.c index f37657e822cf819a9a5877858a1c216f11ea4296..f4065bb27452fba0e4cccbd8d7024e68579f5637 100644 --- a/tests/examples/lua/lua_connector.c +++ b/tests/examples/lua/lua_connector.c @@ -7,8 +7,15 @@ #include #include -static int l_connect(lua_State *L) -{ +struct cb_param{ + lua_State* state; + int callback; + void * stream; +}; + + + +static int l_connect(lua_State *L){ TAOS * taos; char *host = lua_tostring(L, 1); char *user = lua_tostring(L, 2); @@ -29,6 +36,7 @@ static int l_connect(lua_State *L) lua_pushstring(L, taos_errstr(taos)); lua_setfield(L, table_index, "error"); lua_pushlightuserdata(L,NULL); + lua_setfield(L, table_index, "conn"); }else{ printf("success to connect server\n"); lua_pushnumber(L, 0); @@ -49,7 +57,7 @@ static int l_query(lua_State *L){ lua_newtable(L); int table_index = lua_gettop(L); - printf("receive command:%s\r\n",s); + // printf("receive command:%s\r\n",s); if(taos_query(taos, s)!=0){ printf("failed, reason:%s\n", taos_errstr(taos)); lua_pushnumber(L, -1); @@ -78,8 +86,12 @@ static int l_query(lua_State *L){ TAOS_FIELD *fields = taos_fetch_fields(result); char temp[256]; + int affectRows = taos_affected_rows(taos); + // printf(" affect rows:%d\r\n", affectRows); lua_pushnumber(L, 0); lua_setfield(L, table_index, "code"); + lua_pushinteger(L, affectRows); + lua_setfield(L, table_index, "affected"); lua_newtable(L); while ((row = taos_fetch_row(result))) { @@ -95,7 +107,7 @@ static int l_query(lua_State *L){ } lua_pushstring(L,fields[i].name); - //printf("field name:%s,type:%d\n",fields[i].name,fields[i].type); + switch (fields[i].type) { case TSDB_DATA_TYPE_TINYINT: lua_pushinteger(L,*((char *)row[i])); @@ -142,6 +154,115 @@ static int l_query(lua_State *L){ return 1; } +void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ + + struct cb_param* p = (struct cb_param*) param; + TAOS_FIELD *fields = taos_fetch_fields(result); + int numFields = taos_num_fields(result); + + printf("\n\r-----------------------------------------------------------------------------------\n"); + + // printf("r:%d, L:%d\n",p->callback, p->state); + + lua_State *L = p->state; + lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback); + + lua_newtable(L); + + for (int i = 0; i < numFields; ++i) { + if (row[i] == NULL) { + continue; + } + + lua_pushstring(L,fields[i].name); + + switch (fields[i].type) { + case TSDB_DATA_TYPE_TINYINT: + lua_pushinteger(L,*((char *)row[i])); + break; + case TSDB_DATA_TYPE_SMALLINT: + lua_pushinteger(L,*((short *)row[i])); + break; + case TSDB_DATA_TYPE_INT: + lua_pushinteger(L,*((int *)row[i])); + break; + case TSDB_DATA_TYPE_BIGINT: + lua_pushinteger(L,*((int64_t *)row[i])); + break; + case TSDB_DATA_TYPE_FLOAT: + lua_pushnumber(L,*((float *)row[i])); + break; + case TSDB_DATA_TYPE_DOUBLE: + lua_pushnumber(L,*((double *)row[i])); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + lua_pushstring(L,(char *)row[i]); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + lua_pushinteger(L,*((int64_t *)row[i])); + break; + case TSDB_DATA_TYPE_BOOL: + lua_pushinteger(L,*((char *)row[i])); + break; + default: + lua_pushnil(L); + break; + } + + lua_settable(L, -3); + } + + lua_call(L, 1, 0); + + printf("-----------------------------------------------------------------------------------\n\r"); +} + +static int l_open_stream(lua_State *L){ + int r = luaL_ref(L, LUA_REGISTRYINDEX); + TAOS * taos = lua_topointer(L,1); + char * sqlstr = lua_tostring(L,2); + int stime = luaL_checknumber(L,3); + + lua_newtable(L); + int table_index = lua_gettop(L); + + struct cb_param *p = malloc(sizeof(struct cb_param)); + p->state = L; + p->callback=r; + // printf("r:%d, L:%d\n",r,L); + void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL); + if (s == NULL) { + printf("failed to open stream, reason:%s\n", taos_errstr(taos)); + free(p); + lua_pushnumber(L, -1); + lua_setfield(L, table_index, "code"); + lua_pushstring(L, taos_errstr(taos)); + lua_setfield(L, table_index, "error"); + lua_pushlightuserdata(L,NULL); + lua_setfield(L, table_index, "stream"); + }else{ + // printf("success to open stream\n"); + lua_pushnumber(L, 0); + lua_setfield(L, table_index, "code"); + lua_pushstring(L, taos_errstr(taos)); + lua_setfield(L, table_index, "error"); + p->stream = s; + lua_pushlightuserdata(L,p); + lua_setfield(L, table_index, "stream");//stream has different content in lua and c. + } + + return 1; +} + +static int l_close_stream(lua_State *L){ + //TODO:get stream and free cb_param + struct cb_param *p = lua_touserdata(L,1); + taos_close_stream(p->stream); + free(p); + return 0; +} + static int l_close(lua_State *L){ TAOS * taos= lua_topointer(L,1); lua_newtable(L); @@ -166,6 +287,8 @@ static const struct luaL_Reg lib[] = { {"connect", l_connect}, {"query", l_query}, {"close", l_close}, + {"open_stream", l_open_stream}, + {"close_stream", l_close_stream}, {NULL, NULL} }; diff --git a/tests/examples/lua/test.lua b/tests/examples/lua/test.lua index f644b82dd409ea2408be9be7c194f965f96cc82a..38ae1c82f2b1bbcb473257200241fb20ab44878b 100644 --- a/tests/examples/lua/test.lua +++ b/tests/examples/lua/test.lua @@ -35,10 +35,12 @@ if res.code ~=0 then return end -res = driver.query(conn,"insert into m1 values (1592222222222,0,'robotspace'), (1592222222223,1,'Hilink'),(1592222222224,2,'Harmony')") +res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')") if res.code ~=0 then print(res.error) return +else + print("insert successfully, affected:"..res.affected) end res = driver.query(conn,"select * from m1") @@ -55,4 +57,69 @@ else end end +res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)") +if res.code ~=0 then + print(res.error) + return +end +res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)") +if res.code ~=0 then + print(res.error) + return +end +res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)") + +if res.code ~=0 then + print(res.error) + return +else + print("insert successfully, affected:"..res.affected) +end + +res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type") +if res.code ~=0 then + print("select error:"..res.error) + return +else + print("in lua, result:") + for i = 1, #(res.item) do + print("res:"..res.item[i].count) + end +end + +function callback(t) + print("continuous query result:") + for key, value in pairs(t) do + print("key:"..key..", value:"..value) + end +end + +local stream +res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback) +if res.code ~=0 then + print("open stream error:"..res.error) + return +else + print("openstream ok") + stream = res.stream +end + +--From now on we begin continous query in an definite (infinite if you want) loop. +local loop_index = 0 +while loop_index < 20 do + local t = os.time()*1000 + local v = loop_index + res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v)) + + if res.code ~=0 then + print(res.error) + return + else + print("insert successfully, affected:"..res.affected) + end + os.execute("sleep " .. 1) + loop_index = loop_index + 1 +end + +driver.close_stream(stream) driver.close(conn)