未验证 提交 6c4626ab 编写于 作者: R robotspace 提交者: GitHub

Remove wrapper of stream call for continous query. (#11432)

上级 466689f3
...@@ -303,51 +303,6 @@ void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ ...@@ -303,51 +303,6 @@ void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
// printf("-----------------------------------------------------------------------------------\n\r"); // printf("-----------------------------------------------------------------------------------\n\r");
} }
static int l_open_stream(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
int stime = luaL_checknumber(L,3);
lua_newtable(L);
int table_index = lua_gettop(L);
struct cb_param *p = malloc(sizeof(struct cb_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL);
if (s == NULL) {
printf("failed to open stream, reason:%s\n", taos_errstr(taos));
free(p);
lua_pushnumber(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "stream");
}else{
// printf("success to open stream\n");
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
p->stream = s;
lua_pushlightuserdata(L,p);
lua_setfield(L, table_index, "stream");//stream has different content in lua and c.
}
return 1;
}
static int l_close_stream(lua_State *L){
//TODO:get stream and free cb_param
struct cb_param *p = lua_touserdata(L,1);
taos_close_stream(p->stream);
free(p);
return 0;
}
static int l_close(lua_State *L){ static int l_close(lua_State *L){
TAOS *taos= (TAOS*)lua_topointer(L,1); TAOS *taos= (TAOS*)lua_topointer(L,1);
lua_newtable(L); lua_newtable(L);
...@@ -373,8 +328,6 @@ static const struct luaL_Reg lib[] = { ...@@ -373,8 +328,6 @@ static const struct luaL_Reg lib[] = {
{"query", l_query}, {"query", l_query},
{"query_a",l_async_query}, {"query_a",l_async_query},
{"close", l_close}, {"close", l_close},
{"open_stream", l_open_stream},
{"close_stream", l_close_stream},
{NULL, NULL} {NULL, NULL}
}; };
......
...@@ -241,112 +241,6 @@ static int l_async_query(lua_State *L){ ...@@ -241,112 +241,6 @@ static int l_async_query(lua_State *L){
return 1; return 1;
} }
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
struct cb_param* p = (struct cb_param*) param;
TAOS_FIELD *fields = taos_fetch_fields(result);
int numFields = taos_num_fields(result);
// printf("\nnumfields:%d\n", numFields);
//printf("\n\r-----------------------------------------------------------------------------------\n");
lua_State *L = p->state;
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
lua_newtable(L);
for (int i = 0; i < numFields; ++i) {
if (row[i] == NULL) {
continue;
}
lua_pushstring(L,fields[i].name);
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
lua_pushnumber(L,*((float *)row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
lua_pushstring(L,(char *)row[i]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
lua_pushinteger(L,*((char *)row[i]));
break;
default:
lua_pushnil(L);
break;
}
lua_settable(L, -3);
}
lua_call(L, 1, 0);
// printf("-----------------------------------------------------------------------------------\n\r");
}
static int l_open_stream(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
int stime = luaL_checknumber(L,3);
lua_newtable(L);
int table_index = lua_gettop(L);
struct cb_param *p = malloc(sizeof(struct cb_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL);
if (s == NULL) {
printf("failed to open stream, reason:%s\n", taos_errstr(taos));
free(p);
lua_pushnumber(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "stream");
}else{
// printf("success to open stream\n");
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
p->stream = s;
lua_pushlightuserdata(L,p);
lua_setfield(L, table_index, "stream");//stream has different content in lua and c.
}
return 1;
}
static int l_close_stream(lua_State *L){
//TODO:get stream and free cb_param
struct cb_param *p = lua_touserdata(L,1);
taos_close_stream(p->stream);
free(p);
return 0;
}
static int l_close(lua_State *L){ static int l_close(lua_State *L){
TAOS *taos= (TAOS*)lua_topointer(L,1); TAOS *taos= (TAOS*)lua_topointer(L,1);
...@@ -373,8 +267,6 @@ static const struct luaL_Reg lib[] = { ...@@ -373,8 +267,6 @@ static const struct luaL_Reg lib[] = {
{"query", l_query}, {"query", l_query},
{"query_a",l_async_query}, {"query_a",l_async_query},
{"close", l_close}, {"close", l_close},
{"open_stream", l_open_stream},
{"close_stream", l_close_stream},
{NULL, NULL} {NULL, NULL}
}; };
......
...@@ -9,6 +9,50 @@ local config = { ...@@ -9,6 +9,50 @@ local config = {
max_packet_size = 1024 * 1024 max_packet_size = 1024 * 1024
} }
function dump(obj)
local getIndent, quoteStr, wrapKey, wrapVal, dumpObj
getIndent = function(level)
return string.rep("\t", level)
end
quoteStr = function(str)
return '"' .. string.gsub(str, '"', '\\"') .. '"'
end
wrapKey = function(val)
if type(val) == "number" then
return "[" .. val .. "]"
elseif type(val) == "string" then
return "[" .. quoteStr(val) .. "]"
else
return "[" .. tostring(val) .. "]"
end
end
wrapVal = function(val, level)
if type(val) == "table" then
return dumpObj(val, level)
elseif type(val) == "number" then
return val
elseif type(val) == "string" then
return quoteStr(val)
else
return tostring(val)
end
end
dumpObj = function(obj, level)
if type(obj) ~= "table" then
return wrapVal(obj)
end
level = level + 1
local tokens = {}
tokens[#tokens + 1] = "{"
for k, v in pairs(obj) do
tokens[#tokens + 1] = getIndent(level) .. wrapKey(k) .. " = " .. wrapVal(v, level) .. ","
end
tokens[#tokens + 1] = getIndent(level - 1) .. "}"
return table.concat(tokens, "\n")
end
return dumpObj(obj, 0)
end
local conn local conn
local res = driver.connect(config) local res = driver.connect(config)
if res.code ~=0 then if res.code ~=0 then
...@@ -75,14 +119,14 @@ else ...@@ -75,14 +119,14 @@ else
end end
res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)") res = driver.query(conn,"create table thermometer (ts timestamp, degree double) tags(location binary(20), type int)")
if res.code ~=0 then if res.code ~=0 then
print(res.error) print(res.error)
return return
else else
print("create super table--- pass") print("create super table--- pass")
end end
res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)") res = driver.query(conn,"create table therm1 using thermometer tags ('beijing', 1)")
if res.code ~=0 then if res.code ~=0 then
print(res.error) print(res.error)
return return
...@@ -90,7 +134,7 @@ else ...@@ -90,7 +134,7 @@ else
print("create table--- pass") print("create table--- pass")
end end
res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)") res = driver.query(conn,"insert into therm1 values ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)")
if res.code ~=0 then if res.code ~=0 then
print(res.error) print(res.error)
...@@ -103,7 +147,7 @@ else ...@@ -103,7 +147,7 @@ else
end end
end end
res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type") res = driver.query(conn,"select count(*) count, avg(degree) as av, max(degree), min(degree) from thermometer where location='beijing' or location='tianjin' group by location, type")
if res.code ~=0 then if res.code ~=0 then
print("select from super table--- failed:"..res.error) print("select from super table--- failed:"..res.error)
return return
...@@ -129,33 +173,16 @@ function async_query_callback(res) ...@@ -129,33 +173,16 @@ function async_query_callback(res)
end end
end end
driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) driver.query_a(conn,"insert into therm1 values ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
function stream_callback(t)
print("------------------------")
print("continuous query result:")
for key, value in pairs(t) do
print("key:"..key..", value:"..value)
end
end
local stream res = driver.query(conn, "create table avg_degree as select avg(degree) from thermometer where ts > now and ts <= now + 1m interval(5s) sliding(1s)")
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0, stream_callback)
if res.code ~=0 then
print("open stream--- failed:"..res.error)
return
else
print("open stream--- pass")
stream = res.stream
end
print("From now on we start continous insert in an definite (infinite if you want) loop.") print("From now on we start continous insertion in an definite (infinite if you want) loop.")
local loop_index = 0 local loop_index = 0
while loop_index < 30 do while loop_index < 30 do
local t = os.time()*1000 local t = os.time()*1000
local v = loop_index local v = loop_index
res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v)) res = driver.query(conn,string.format("insert into therm1 values (%d, %d)",t,v))
if res.code ~=0 then if res.code ~=0 then
print("continous insertion--- failed:" .. res.error) print("continous insertion--- failed:" .. res.error)
...@@ -163,10 +190,17 @@ while loop_index < 30 do ...@@ -163,10 +190,17 @@ while loop_index < 30 do
else else
--print("insert successfully, affected:"..res.affected) --print("insert successfully, affected:"..res.affected)
end end
local res1 = driver.query(conn, string.format("select last(*) from avg_degree"))
if res1.code ~=0 then
print("select failed: "..res1.error)
return
else
-- print(dump(res1))
if(#res1.item > 0) then print("avg_degree: " .. res1.item[1]["last(avg_degree_)"]) end
end
os.execute("sleep " .. 1) os.execute("sleep " .. 1)
loop_index = loop_index + 1 loop_index = loop_index + 1
end end
driver.close_stream(stream)
driver.close(conn) driver.close(conn)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册