From 6c4626abda0caa1a17bdeae66c230ce95cb1b503 Mon Sep 17 00:00:00 2001 From: robotspace Date: Wed, 13 Apr 2022 00:11:36 +0800 Subject: [PATCH] Remove wrapper of stream call for continous query. (#11432) --- examples/lua/OpenResty/so/luaconnector51.so | Bin 22472 -> 22376 bytes examples/lua/lua51/lua_connector51.c | 47 --------- examples/lua/lua_connector.c | 108 -------------------- examples/lua/test.lua | 88 +++++++++++----- 4 files changed, 61 insertions(+), 182 deletions(-) diff --git a/examples/lua/OpenResty/so/luaconnector51.so b/examples/lua/OpenResty/so/luaconnector51.so index 442de6e39f909e1aeb869988722b84795c048855..449466da2d3121380d070ffca1411162cbd0bd32 100755 GIT binary patch delta 4738 zcmZu#4OG-s7JqMsftlfB_!|B*d_yXM>3|!dC||;}6o>G}YHeDT-pCPh73J2)RmZ#rZ`|F&KOA0>v@3U){t1ka<%TH|w z8IFxO6AQ)SkHxi<5H(5UGGxg;SU!Y;cTOG3%pBc0=7e+909}f7a4XVj7MxQEbTz9X zU1^{lU{zMq6gZ(Yf!fai9+btSOn@>!7af30e#P`b-X|RjRGaA$Nb(ctmq3*Q-OKA! zOtRZQonD0u|4P{JZ&+A|D-ccs2~B4H5Kt?@)z&vPH`Z;a0hc;E(%RIzp}KNYi> zvf5f-UuCUc2ZN@Fa7G;i<(gG+o(gbLV}LI-(OA?;6jzlw z#p)N(t5DG)(h^;#Aw)r5g(R(DEQ=7uTcmANOQ=vN*P3v#I}x)=!gSw(NrXeF-4i8t z>Agcc11_QVi&#-RaEEp_garzk(L|{)SObOW{YylSodmY?@;5L^9trCM1>-O| z_U@jK0{bZZB{(R!yG+CcAc^WZ7+gc?4AAS=L49)n?$AJbq9-VHk7B;XnR6hQ5Q|gw z3M2Ezn)Mck${!E6WqWV4#yhA>=wJh#lqlj`IX<%yZimO>@i9cir)8puw9KrqY|VLh z0X8l6Z<>P;`d~I8?s;CX*ZnwFr&YnZuqL7;q!Ab87~30*z0ds(9F0he`*Gaswb;#L z7Kiz$#i>u9jCcvW5m_M~JZZj8tDq<{EpCI@$s=_-JQ)!V4n(Ag2)BgTI2rLFd>WZn zICz^I9%P7P=o0to5fmO}j5~v=QS~n4eEJV1Df2(!HstA(={l&=Pou|RhdzxSg~R$3 z-Jj96qx7s~dIz}mchkoqENT{=33*W|x~M7oAE6~Gjs6q5qKb1yf8ahbulG+ie4A-d z$_y{Tw%vT$;;3LikH!8kCVd{J33KROuu@1{?KTi%|CmV+iT%#Aw(UnY`upNAyw7JS zag^dnIIvUywy)Q3u-8$xPeW(%rs^|~vqeA@ALdE;N{FZJ@UxIaOCZs356y>CLxR6+ z%g{)qlPfNcvM@D4(^QGnQZEnJLN`}cj1X7|8BV>EmjtA&@1 zYX4`Ms0u}C{x;T}178~N_RnHXKMq_U7YOOmssJwf2Z1+@M)*@)KnY*oK!)3o_8oXC z(XQl+!`zL=9mG;SASr@A;7+oGOZzPjwYac+*=xWVo#tQvo!2|OV6wOEIZPBRBKzDX zH05H9wevf|-XH6!6XXh_&tL!Pd5;M7LcjV!7%D zswOr!SkI0`JG1fnsIejZiq;(5I2T<7VzQz#t(|AW%BMA{>?Fu5Dd4w#;KxwTVB$*1w_6rUT~ihqkBAwZ@!F`*xhvzy z_WSO&rdx0@-q8L$s&K`bm;F;$_GP9)DVy=l8~g$|&VJe4E0}pZ?it%RM+E{xS<_K5 zZf9Fofx;F(Ym5Dy#i7reBd*$4I0hSUt?qg+8}m4h@5twOK|jXYOZXZR*P=!??99~R zoEdfxUkP??Fz6N^biqwn7c(ooZDJC9=(d~iddxJ%+c#h!CQ{LP1HO%UIN()g&pZyF z7z9`yn-p>}Y$DJ<4FhLvx?*eujw3le0wb}r6@M6kq_}$&wIi@3ZZ;$BacPPLBk)FC zswNC4GhD-l>oi6}UR;RcZyuoWvyyjsq;TUA(K;fA+j(EOz21>g#NQZ(&f4k(O0AbfKzi_*BICy4m!#h<23T;;d6Y6e6$GVeBOeGaU?$(66RI zPx_!c5IS6m;4y}}a82we??ur8R8Lsg>FZWLVS&9}S>Gq8Sg7(rXFx#mT)Ms|Ke;y; z25IQ-^p5tHD_-yYSl)Acz1OfL4tc$0Sf0f4CYFC=i9tQ1cVd{VsbRgXxfY*niFHkh zn;RQ8)R2aCcm_X1xAtNA70$zkUSVpl-M>%u5^}1KZ8$dIX%cM6PsjhEmvawrK z`aH*JjB@^Bcz|K!bg2UNa$0F!eD>pWxt>RHNhta*@nf2Q{vB8pZ;Mg>-V94ADYpz zc=qYFKK&^a!!V@HiSF4p=c<;iutt7ThC*vjO142t2;@g(qQuM3#u*YXKNM$6{AeOR z_M>*^s>7ba$1D4o3+#IVjJXN42sCatjM{bwXm3W!3k^^!G%H-zDtsd^hzCZA+1Do zIdYMt7oION(K0xL^+WJOk;y1eN4A1Ngn2?B8Gr(_U<{SA09l)b4N9U&8Ei7MarT;v z>*P(IFQtk{q!HxJ=Eo3a?@4lb)A{j2+21AkXd*W73zGKyQ*2Yrx4NV(q70}x0bEKy z6(u^-4ZH47FwH2UvghU4e7=3)=6oMM(tll)^|(4z@8xw7tBvJ&?pgndv{Ur+5!m#AK%d~q&Afh!*TCz& zJR9!e^+{fT&gI{-+BhzaNu^U+AWc02c9jS;fz^sMCH%fnuOQjTnG}yoTrJb7TJaFD zAdaBGvHK)#50ozw6rXSoY8Dxxd(pALNr^A)T-*sUrFR9$8SdQDnb268J#7-kmmKpf zy1LP3ZMHRAtLkk%XG>?%j5|_mGAa9RY}h=Rm6CBVEa~idW67fw)a7evI@Fb$!sNR| zre$ydrE)0B^bic8RK6}``Y5C>HEBK4C4zTK)zbUIVdTkyS1Rt)^ez!?^}?l!VvOX}6()@w24!Gfk;kHYMV98FO(O2MaA`%JzH6b# g4M_KAG_o0xxpJm<<04Trq=Y9{-bYu#-jyl;2L!uZVgLXD delta 5087 zcmZu#e^gXe9=~^BaAp990fw2ugn=N0)B$(Z)Ibnt859UmEGvZ($KZ%U3TSXZeiFiW{X&zfGs=Ib}DpFMIPNVXj|Z~sBGVUzxTa24xD}G+MTh=fjjVaM z^Q=QZ9wt{N!Ne`GEKhM!rk1djZ#0$X$kkv8FZ18!_a3QO{N?8muip~?=~jQ*?7yu! z`Y!Rv{X9$@Qv@cqk1-vylO*tK`zAZyhk_IE5x2roO*UT!DPa~ELCKAhgXcpg_Gts<+g88aD5Q30-noLyNnly0*?!yA>6iJUiGncSA#Ct;}s~ z@@%h`?b((c9uGryb#r6OPS1|b?q;`4s%>c8=}}matG^$H^mh1ApNJ_IUe)WCanY|7 z7)ggP)EGHR){ z_Bb3fSXysav_sRh+7!7V|1>;P;g3wipGcDVqtoz(3O_InKW!KMJDC3-*^nmJ3|SQU zNC-X;wnbWU5<;|r5bYfy_;?7O*}Erlkn?ybjkZ8ZbX0Fs^n;wI!d5^q0 zpeb{CFE&RZb^cPu0-4xqU^8a#X@AOv{J2#!D{o`09|6hb9SnQ|2jjA4U%E*(=cLQ$ z>~nRPV~9b^ml#jOmvM_{`f-f}p<+mj&zikaHX2uqxlOa~3}cUFHlU zVB5aZ50s`Kf+6!9z7i(P^LQjACuA5Srs&T>RYDej1KJXbaxZ=_Vxp*zPHp%yX%MA` z9VmO97hJw7B96PfpAzbYaLWQ728%6O8;7inc|Rodm~4ln!GPxn^8U8$M%cWAh_4hq zq2mto^Fgb}(5jC{VeAjMF%3FzQYRp5gxqY0Q3|_D$!X8^}#7k5y#%fkX3yC&3W1BOhh{Veb4%uvAPK=T3 zCq+H(iZO=57QHr$c>4*r#>Ry7!6@RF9|FHEoj(hw6X&;Xxq(2mYUV=q0jl)VBBJ-- zOL~4_w{w#FoRiC(e&bX7X+ec@HX#QA(O>^2*^9G#_v3xcxM!}+6{<*deyjfz%rt5d z)W*lx;r#4*vGKAg4f6oO@i!nPDa~4TUG_e8is-JxhNSyv%tafY^CrAzwZJDyDdx=h zTU|UGWx0zh?9{~uc)_mI@4tpU^B4{{5V|%wR=iUsj z*XLb@L3@VgjVtikZAq=pE3^)7B4a4HKTIHoolQH#kla1=Nus`+A#12Fhx+MrOJFL3 zSB!H9#Bx$CP$!hvCqy?75pfC1Ugzj3Tp}#+mT=XIQBtJ7DrC@%8u3-#^p@*P!N94ze;m3*b7Bwr@^I*UyRPW(Vy57WCADIV! zBZnip=H$2}wPG)8|AcBZ5;1;;1YhZ>sCJm2!;iDP2eitDVh{M4a7=punutB16Omh1 zo}g8+Y&8_z`vOzkUx;Z{{o7G7_N*NJ&=%vjVydp5!ZrlmUd71Ow>&~kcteBHP6_6$ zQElSyvK4i<-{mdG-mV&H4S9*ODXpyvwze1T#6~v!P8EKeT)V%vyS$InpagRMKB7aNJORd(^w!C5XqH^V0i9$m>uG$x!B?Qx$WbB& z<14);svYLDGu4-Sn*tzC$HGU6j?^!wPOgc?c3W%1B}vM~^g%$9{);JlOp(xG7V$)V@dIhWBKs6@JP;5Z4J`9;ffM|1kB({l*) zdC7b&Wad?;jElRHa%Do@K80%#OvaAG%Xtny0%!A*Q$`|XP(A(5QApI+^z?vjd9p@l zfThcmOo#DSKo5TM8>x)OL+UI=)h_kusK*-@U`m;i}(N>LwPL>7dm)3m{yv@jI0~WY8xvH^`iQUXMXUm zbnqHDfq5N_t#sJb*Hihp=J3uG$;KhaX|ZK1W36fzqrh1Ls{yyu!JDAdS=6e|hh@rj z@wnnaofl%JB1WGQ@~qAWF)b1Mhk~EBqkxZ?j#!?>M0|BVMJco7E5tcoNW10xkOqU9 zSedN9C}1NSfcjr#v|bg)QA`~wtcvG*%*nV-xlAcAi$Ld`;oDB@IHPEECR4BaDnC{5 z&8$Yb?Wp*C8OIq^u0nF@f|Mc)ow)I8UM8;X>xH~k$d3rwOS$$LbU{lbj`s-NDUqKM zj1iGv6v$PP-xPV2_(d?AuJSxb$hnwXg|tTGwUpbMa1s5#EB>z_r`W>1g87`tUlRH2 zA|DWb|0(kGBEKPUEgV~AvCYD9pZa@1e&YpDvC6{ppna8t-zVgH;`0_E?}e{W?iTVf z!nFhFht@Pd{>0E-wxK$7pM@nQ?I!wxke^>vQoN$Ddcph!^B2O2 zk~Fwfk{%iIO93KF^X&AOp%!=aW>2$wQ-k>HFy$Xb@3zu;JX!qB*y3rft*hSA_)8|L zAtAu~W$h*bhDOr6`koCOj+DFk9MD%dOzM`UmUU2!sJhP8@;G=ARd*$|+z7)J4uekJ z$PA#VTx3%BHMQ)4;!1}}-Ti_kcq?0V-8j_rz_CgTWUMg-e~S$$+gleDuWH_#=-JOP>)v`tx4u*Rd@jfWHH{*Y# CnvRnI diff --git a/examples/lua/lua51/lua_connector51.c b/examples/lua/lua51/lua_connector51.c index 7aad42f293..ac840913f9 100644 --- a/examples/lua/lua51/lua_connector51.c +++ b/examples/lua/lua51/lua_connector51.c @@ -303,51 +303,6 @@ void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ // printf("-----------------------------------------------------------------------------------\n\r"); } -static int l_open_stream(lua_State *L){ - int r = luaL_ref(L, LUA_REGISTRYINDEX); - TAOS * taos = (TAOS*)lua_topointer(L,1); - const char * sqlstr = lua_tostring(L,2); - int stime = luaL_checknumber(L,3); - - lua_newtable(L); - int table_index = lua_gettop(L); - - struct cb_param *p = malloc(sizeof(struct cb_param)); - p->state = L; - p->callback=r; - // printf("r:%d, L:%d\n",r,L); - void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL); - if (s == NULL) { - printf("failed to open stream, reason:%s\n", taos_errstr(taos)); - free(p); - lua_pushnumber(L, -1); - lua_setfield(L, table_index, "code"); - lua_pushstring(L, taos_errstr(taos)); - lua_setfield(L, table_index, "error"); - lua_pushlightuserdata(L,NULL); - lua_setfield(L, table_index, "stream"); - }else{ - // printf("success to open stream\n"); - lua_pushnumber(L, 0); - lua_setfield(L, table_index, "code"); - lua_pushstring(L, taos_errstr(taos)); - lua_setfield(L, table_index, "error"); - p->stream = s; - lua_pushlightuserdata(L,p); - lua_setfield(L, table_index, "stream");//stream has different content in lua and c. - } - - return 1; -} - -static int l_close_stream(lua_State *L){ - //TODO:get stream and free cb_param - struct cb_param *p = lua_touserdata(L,1); - taos_close_stream(p->stream); - free(p); - return 0; -} - static int l_close(lua_State *L){ TAOS *taos= (TAOS*)lua_topointer(L,1); lua_newtable(L); @@ -373,8 +328,6 @@ static const struct luaL_Reg lib[] = { {"query", l_query}, {"query_a",l_async_query}, {"close", l_close}, - {"open_stream", l_open_stream}, - {"close_stream", l_close_stream}, {NULL, NULL} }; diff --git a/examples/lua/lua_connector.c b/examples/lua/lua_connector.c index 035b17eb2a..ce13ab3829 100644 --- a/examples/lua/lua_connector.c +++ b/examples/lua/lua_connector.c @@ -241,112 +241,6 @@ static int l_async_query(lua_State *L){ return 1; } -void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ - struct cb_param* p = (struct cb_param*) param; - TAOS_FIELD *fields = taos_fetch_fields(result); - int numFields = taos_num_fields(result); - - // printf("\nnumfields:%d\n", numFields); - //printf("\n\r-----------------------------------------------------------------------------------\n"); - - lua_State *L = p->state; - lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback); - - lua_newtable(L); - - for (int i = 0; i < numFields; ++i) { - if (row[i] == NULL) { - continue; - } - - lua_pushstring(L,fields[i].name); - - switch (fields[i].type) { - case TSDB_DATA_TYPE_TINYINT: - lua_pushinteger(L,*((char *)row[i])); - break; - case TSDB_DATA_TYPE_SMALLINT: - lua_pushinteger(L,*((short *)row[i])); - break; - case TSDB_DATA_TYPE_INT: - lua_pushinteger(L,*((int *)row[i])); - break; - case TSDB_DATA_TYPE_BIGINT: - lua_pushinteger(L,*((int64_t *)row[i])); - break; - case TSDB_DATA_TYPE_FLOAT: - lua_pushnumber(L,*((float *)row[i])); - break; - case TSDB_DATA_TYPE_DOUBLE: - lua_pushnumber(L,*((double *)row[i])); - break; - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - lua_pushstring(L,(char *)row[i]); - break; - case TSDB_DATA_TYPE_TIMESTAMP: - lua_pushinteger(L,*((int64_t *)row[i])); - break; - case TSDB_DATA_TYPE_BOOL: - lua_pushinteger(L,*((char *)row[i])); - break; - default: - lua_pushnil(L); - break; - } - - lua_settable(L, -3); - } - - lua_call(L, 1, 0); - - // printf("-----------------------------------------------------------------------------------\n\r"); -} - -static int l_open_stream(lua_State *L){ - int r = luaL_ref(L, LUA_REGISTRYINDEX); - TAOS * taos = (TAOS*)lua_topointer(L,1); - const char * sqlstr = lua_tostring(L,2); - int stime = luaL_checknumber(L,3); - - lua_newtable(L); - int table_index = lua_gettop(L); - - struct cb_param *p = malloc(sizeof(struct cb_param)); - p->state = L; - p->callback=r; - // printf("r:%d, L:%d\n",r,L); - void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL); - if (s == NULL) { - printf("failed to open stream, reason:%s\n", taos_errstr(taos)); - free(p); - lua_pushnumber(L, -1); - lua_setfield(L, table_index, "code"); - lua_pushstring(L, taos_errstr(taos)); - lua_setfield(L, table_index, "error"); - lua_pushlightuserdata(L,NULL); - lua_setfield(L, table_index, "stream"); - }else{ - // printf("success to open stream\n"); - lua_pushnumber(L, 0); - lua_setfield(L, table_index, "code"); - lua_pushstring(L, taos_errstr(taos)); - lua_setfield(L, table_index, "error"); - p->stream = s; - lua_pushlightuserdata(L,p); - lua_setfield(L, table_index, "stream");//stream has different content in lua and c. - } - - return 1; -} - -static int l_close_stream(lua_State *L){ - //TODO:get stream and free cb_param - struct cb_param *p = lua_touserdata(L,1); - taos_close_stream(p->stream); - free(p); - return 0; -} static int l_close(lua_State *L){ TAOS *taos= (TAOS*)lua_topointer(L,1); @@ -373,8 +267,6 @@ static const struct luaL_Reg lib[] = { {"query", l_query}, {"query_a",l_async_query}, {"close", l_close}, - {"open_stream", l_open_stream}, - {"close_stream", l_close_stream}, {NULL, NULL} }; diff --git a/examples/lua/test.lua b/examples/lua/test.lua index c124b50a4d..a858dbb6ad 100644 --- a/examples/lua/test.lua +++ b/examples/lua/test.lua @@ -9,6 +9,50 @@ local config = { max_packet_size = 1024 * 1024 } +function dump(obj) + local getIndent, quoteStr, wrapKey, wrapVal, dumpObj + getIndent = function(level) + return string.rep("\t", level) + end + quoteStr = function(str) + return '"' .. string.gsub(str, '"', '\\"') .. '"' + end + wrapKey = function(val) + if type(val) == "number" then + return "[" .. val .. "]" + elseif type(val) == "string" then + return "[" .. quoteStr(val) .. "]" + else + return "[" .. tostring(val) .. "]" + end + end + wrapVal = function(val, level) + if type(val) == "table" then + return dumpObj(val, level) + elseif type(val) == "number" then + return val + elseif type(val) == "string" then + return quoteStr(val) + else + return tostring(val) + end + end + dumpObj = function(obj, level) + if type(obj) ~= "table" then + return wrapVal(obj) + end + level = level + 1 + local tokens = {} + tokens[#tokens + 1] = "{" + for k, v in pairs(obj) do + tokens[#tokens + 1] = getIndent(level) .. wrapKey(k) .. " = " .. wrapVal(v, level) .. "," + end + tokens[#tokens + 1] = getIndent(level - 1) .. "}" + return table.concat(tokens, "\n") + end + return dumpObj(obj, 0) +end + local conn local res = driver.connect(config) if res.code ~=0 then @@ -75,14 +119,14 @@ else end -res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)") +res = driver.query(conn,"create table thermometer (ts timestamp, degree double) tags(location binary(20), type int)") if res.code ~=0 then print(res.error) return else print("create super table--- pass") end -res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)") +res = driver.query(conn,"create table therm1 using thermometer tags ('beijing', 1)") if res.code ~=0 then print(res.error) return @@ -90,7 +134,7 @@ else print("create table--- pass") end -res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)") +res = driver.query(conn,"insert into therm1 values ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)") if res.code ~=0 then print(res.error) @@ -103,7 +147,7 @@ else end end -res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type") +res = driver.query(conn,"select count(*) count, avg(degree) as av, max(degree), min(degree) from thermometer where location='beijing' or location='tianjin' group by location, type") if res.code ~=0 then print("select from super table--- failed:"..res.error) return @@ -129,33 +173,16 @@ function async_query_callback(res) end end -driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) - - -function stream_callback(t) - print("------------------------") - print("continuous query result:") - for key, value in pairs(t) do - print("key:"..key..", value:"..value) - end -end +driver.query_a(conn,"insert into therm1 values ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) -local stream -res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0, stream_callback) -if res.code ~=0 then - print("open stream--- failed:"..res.error) - return -else - print("open stream--- pass") - stream = res.stream -end +res = driver.query(conn, "create table avg_degree as select avg(degree) from thermometer where ts > now and ts <= now + 1m interval(5s) sliding(1s)") -print("From now on we start continous insert in an definite (infinite if you want) loop.") +print("From now on we start continous insertion in an definite (infinite if you want) loop.") local loop_index = 0 while loop_index < 30 do local t = os.time()*1000 local v = loop_index - res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v)) + res = driver.query(conn,string.format("insert into therm1 values (%d, %d)",t,v)) if res.code ~=0 then print("continous insertion--- failed:" .. res.error) @@ -163,10 +190,17 @@ while loop_index < 30 do else --print("insert successfully, affected:"..res.affected) end + local res1 = driver.query(conn, string.format("select last(*) from avg_degree")) + if res1.code ~=0 then + print("select failed: "..res1.error) + return + else +-- print(dump(res1)) + if(#res1.item > 0) then print("avg_degree: " .. res1.item[1]["last(avg_degree_)"]) end + end + os.execute("sleep " .. 1) loop_index = loop_index + 1 end -driver.close_stream(stream) - driver.close(conn) -- GitLab