提交 79e1a7bc 编写于 作者: R robot

Add support for continuous query in Lua connector.

Malloc memory to store Lua state and callback function from Lua. Connector can support multiple connection by this way.
Memory will be freed when connection isn't established successfully or connection is closed by lua call. Stream content
in Lua is not the same one with stream content in connector.

A loop in test code do the job for continuous query. record is inserted between query actions.
上级 317c98a2
文件模式从 100644 更改为 100755
...@@ -7,8 +7,15 @@ ...@@ -7,8 +7,15 @@
#include <lualib.h> #include <lualib.h>
#include <taos.h> #include <taos.h>
static int l_connect(lua_State *L) struct cb_param{
{ lua_State* state;
int callback;
void * stream;
};
static int l_connect(lua_State *L){
TAOS * taos; TAOS * taos;
char *host = lua_tostring(L, 1); char *host = lua_tostring(L, 1);
char *user = lua_tostring(L, 2); char *user = lua_tostring(L, 2);
...@@ -29,6 +36,7 @@ static int l_connect(lua_State *L) ...@@ -29,6 +36,7 @@ static int l_connect(lua_State *L)
lua_pushstring(L, taos_errstr(taos)); lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error"); lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL); lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "conn");
}else{ }else{
printf("success to connect server\n"); printf("success to connect server\n");
lua_pushnumber(L, 0); lua_pushnumber(L, 0);
...@@ -49,7 +57,7 @@ static int l_query(lua_State *L){ ...@@ -49,7 +57,7 @@ static int l_query(lua_State *L){
lua_newtable(L); lua_newtable(L);
int table_index = lua_gettop(L); int table_index = lua_gettop(L);
printf("receive command:%s\r\n",s); // printf("receive command:%s\r\n",s);
if(taos_query(taos, s)!=0){ if(taos_query(taos, s)!=0){
printf("failed, reason:%s\n", taos_errstr(taos)); printf("failed, reason:%s\n", taos_errstr(taos));
lua_pushnumber(L, -1); lua_pushnumber(L, -1);
...@@ -78,8 +86,12 @@ static int l_query(lua_State *L){ ...@@ -78,8 +86,12 @@ static int l_query(lua_State *L){
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256]; char temp[256];
int affectRows = taos_affected_rows(taos);
// printf(" affect rows:%d\r\n", affectRows);
lua_pushnumber(L, 0); lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code"); lua_setfield(L, table_index, "code");
lua_pushinteger(L, affectRows);
lua_setfield(L, table_index, "affected");
lua_newtable(L); lua_newtable(L);
while ((row = taos_fetch_row(result))) { while ((row = taos_fetch_row(result))) {
...@@ -95,7 +107,7 @@ static int l_query(lua_State *L){ ...@@ -95,7 +107,7 @@ static int l_query(lua_State *L){
} }
lua_pushstring(L,fields[i].name); lua_pushstring(L,fields[i].name);
//printf("field name:%s,type:%d\n",fields[i].name,fields[i].type);
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i])); lua_pushinteger(L,*((char *)row[i]));
...@@ -142,6 +154,115 @@ static int l_query(lua_State *L){ ...@@ -142,6 +154,115 @@ static int l_query(lua_State *L){
return 1; return 1;
} }
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
struct cb_param* p = (struct cb_param*) param;
TAOS_FIELD *fields = taos_fetch_fields(result);
int numFields = taos_num_fields(result);
printf("\n\r-----------------------------------------------------------------------------------\n");
// printf("r:%d, L:%d\n",p->callback, p->state);
lua_State *L = p->state;
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
lua_newtable(L);
for (int i = 0; i < numFields; ++i) {
if (row[i] == NULL) {
continue;
}
lua_pushstring(L,fields[i].name);
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
lua_pushnumber(L,*((float *)row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
lua_pushstring(L,(char *)row[i]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
lua_pushinteger(L,*((char *)row[i]));
break;
default:
lua_pushnil(L);
break;
}
lua_settable(L, -3);
}
lua_call(L, 1, 0);
printf("-----------------------------------------------------------------------------------\n\r");
}
static int l_open_stream(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = lua_topointer(L,1);
char * sqlstr = lua_tostring(L,2);
int stime = luaL_checknumber(L,3);
lua_newtable(L);
int table_index = lua_gettop(L);
struct cb_param *p = malloc(sizeof(struct cb_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL);
if (s == NULL) {
printf("failed to open stream, reason:%s\n", taos_errstr(taos));
free(p);
lua_pushnumber(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "stream");
}else{
// printf("success to open stream\n");
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
p->stream = s;
lua_pushlightuserdata(L,p);
lua_setfield(L, table_index, "stream");//stream has different content in lua and c.
}
return 1;
}
static int l_close_stream(lua_State *L){
//TODO:get stream and free cb_param
struct cb_param *p = lua_touserdata(L,1);
taos_close_stream(p->stream);
free(p);
return 0;
}
static int l_close(lua_State *L){ static int l_close(lua_State *L){
TAOS * taos= lua_topointer(L,1); TAOS * taos= lua_topointer(L,1);
lua_newtable(L); lua_newtable(L);
...@@ -166,6 +287,8 @@ static const struct luaL_Reg lib[] = { ...@@ -166,6 +287,8 @@ static const struct luaL_Reg lib[] = {
{"connect", l_connect}, {"connect", l_connect},
{"query", l_query}, {"query", l_query},
{"close", l_close}, {"close", l_close},
{"open_stream", l_open_stream},
{"close_stream", l_close_stream},
{NULL, NULL} {NULL, NULL}
}; };
......
...@@ -35,10 +35,12 @@ if res.code ~=0 then ...@@ -35,10 +35,12 @@ if res.code ~=0 then
return return
end end
res = driver.query(conn,"insert into m1 values (1592222222222,0,'robotspace'), (1592222222223,1,'Hilink'),(1592222222224,2,'Harmony')") res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
if res.code ~=0 then if res.code ~=0 then
print(res.error) print(res.error)
return return
else
print("insert successfully, affected:"..res.affected)
end end
res = driver.query(conn,"select * from m1") res = driver.query(conn,"select * from m1")
...@@ -55,4 +57,69 @@ else ...@@ -55,4 +57,69 @@ else
end end
end end
res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)")
if res.code ~=0 then
print(res.error)
return
end
res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)")
if res.code ~=0 then
print(res.error)
return
end
res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)")
if res.code ~=0 then
print(res.error)
return
else
print("insert successfully, affected:"..res.affected)
end
res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type")
if res.code ~=0 then
print("select error:"..res.error)
return
else
print("in lua, result:")
for i = 1, #(res.item) do
print("res:"..res.item[i].count)
end
end
function callback(t)
print("continuous query result:")
for key, value in pairs(t) do
print("key:"..key..", value:"..value)
end
end
local stream
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback)
if res.code ~=0 then
print("open stream error:"..res.error)
return
else
print("openstream ok")
stream = res.stream
end
--From now on we begin continous query in an definite (infinite if you want) loop.
local loop_index = 0
while loop_index < 20 do
local t = os.time()*1000
local v = loop_index
res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v))
if res.code ~=0 then
print(res.error)
return
else
print("insert successfully, affected:"..res.affected)
end
os.execute("sleep " .. 1)
loop_index = loop_index + 1
end
driver.close_stream(stream)
driver.close(conn) driver.close(conn)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册