Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9e13cced
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9e13cced
编写于
7月 23, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/3_liaohj
上级
e9936db9
621ef9f1
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
799 addition
and
114 deletion
+799
-114
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+2
-2
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+11
-1
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+30
-2
source/libs/planner/test/planPartByTest.cpp
source/libs/planner/test/planPartByTest.cpp
+2
-0
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+1
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+56
-33
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+9
-2
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+28
-26
tests/system-test/7-tmq/tmqAutoCreateTbl.py
tests/system-test/7-tmq/tmqAutoCreateTbl.py
+6
-6
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+21
-7
tests/system-test/7-tmq/tmqDropNtb.py
tests/system-test/7-tmq/tmqDropNtb.py
+2
-2
tests/system-test/7-tmq/tmqDropStbCtb.py
tests/system-test/7-tmq/tmqDropStbCtb.py
+9
-5
tests/system-test/7-tmq/tmqUpdateWithConsume.py
tests/system-test/7-tmq/tmqUpdateWithConsume.py
+190
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+5
-2
tools/shell/inc/shellInt.h
tools/shell/inc/shellInt.h
+26
-1
tools/shell/src/shellArguments.c
tools/shell/src/shellArguments.c
+47
-2
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+54
-20
tools/shell/src/shellMain.c
tools/shell/src/shellMain.c
+8
-1
tools/shell/src/shellUtil.c
tools/shell/src/shellUtil.c
+31
-1
tools/shell/src/shellWebsocket.c
tools/shell/src/shellWebsocket.c
+260
-0
tools/taosws-rs
tools/taosws-rs
+1
-1
未找到文件。
source/libs/function/src/builtinsimpl.c
浏览文件 @
9e13cced
...
...
@@ -3880,11 +3880,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
if
(
pCtx
->
end
.
key
!=
INT64_MIN
)
{
pInfo
->
min
=
pCtx
->
end
.
key
;
}
else
{
pInfo
->
min
=
ptsList
[
0
];
pInfo
->
min
=
ptsList
[
start
];
}
}
else
{
if
(
pCtx
->
start
.
key
==
INT64_MIN
)
{
pInfo
->
min
=
(
pInfo
->
min
>
ptsList
[
0
])
?
ptsList
[
0
]
:
pInfo
->
min
;
pInfo
->
min
=
(
pInfo
->
min
>
ptsList
[
start
])
?
ptsList
[
start
]
:
pInfo
->
min
;
}
else
{
pInfo
->
min
=
pCtx
->
start
.
key
;
}
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
9e13cced
...
...
@@ -787,6 +787,14 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return
code
;
}
static
bool
isPrimaryKeySort
(
SNodeList
*
pOrderByList
)
{
SNode
*
pExpr
=
((
SOrderByExprNode
*
)
nodesListGetNode
(
pOrderByList
,
0
))
->
pExpr
;
if
(
QUERY_NODE_COLUMN
!=
nodeType
(
pExpr
))
{
return
false
;
}
return
PRIMARYKEY_TIMESTAMP_COL_ID
==
((
SColumnNode
*
)
pExpr
)
->
colId
;
}
static
int32_t
createSortLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SLogicNode
**
pLogicNode
)
{
if
(
NULL
==
pSelect
->
pOrderByList
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -800,7 +808,9 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pSort
->
groupSort
=
pSelect
->
groupSort
;
pSort
->
node
.
groupAction
=
pSort
->
groupSort
?
GROUP_ACTION_KEEP
:
GROUP_ACTION_CLEAR
;
pSort
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_NONE
;
pSort
->
node
.
resultDataOrder
=
pSort
->
groupSort
?
DATA_ORDER_LEVEL_IN_GROUP
:
DATA_ORDER_LEVEL_GLOBAL
;
pSort
->
node
.
resultDataOrder
=
isPrimaryKeySort
(
pSelect
->
pOrderByList
)
?
(
pSort
->
groupSort
?
DATA_ORDER_LEVEL_IN_GROUP
:
DATA_ORDER_LEVEL_GLOBAL
)
:
DATA_ORDER_LEVEL_NONE
;
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_ORDER_BY
,
NULL
,
COLLECT_COL_TYPE_ALL
,
&
pSort
->
node
.
pTargets
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
==
pSort
->
node
.
pTargets
)
{
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
9e13cced
...
...
@@ -215,6 +215,14 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
return
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pChild
)
&&
stbSplIsMultiTbScan
(
streamQuery
,
(
SScanLogicNode
*
)
pChild
));
}
static
bool
stbSplIsMultiTbScanChild
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
if
(
1
!=
LIST_LENGTH
(
pNode
->
pChildren
))
{
return
false
;
}
SNode
*
pChild
=
nodesListGetNode
(
pNode
->
pChildren
,
0
);
return
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pChild
)
&&
stbSplIsMultiTbScan
(
streamQuery
,
(
SScanLogicNode
*
)
pChild
));
}
static
bool
stbSplNeedSplitWindow
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
SWindowLogicNode
*
pWindow
=
(
SWindowLogicNode
*
)
pNode
;
if
(
WINDOW_TYPE_INTERVAL
==
pWindow
->
winType
)
{
...
...
@@ -247,7 +255,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
return
!
(((
SJoinLogicNode
*
)
pNode
)
->
isSingleTableJoin
);
case
QUERY_NODE_LOGIC_PLAN_PARTITION
:
return
stbSpl
HasMultiTbScan
(
streamQuery
,
pNode
);
return
stbSpl
IsMultiTbScanChild
(
streamQuery
,
pNode
);
case
QUERY_NODE_LOGIC_PLAN_AGG
:
return
!
stbSplHasGatherExecFunc
(((
SAggLogicNode
*
)
pNode
)
->
pAggFuncs
)
&&
stbSplHasMultiTbScan
(
streamQuery
,
pNode
);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
...
...
@@ -969,8 +977,28 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return
code
;
}
static
int32_t
stbSplCreateMergeKeysForPartitionNode
(
SLogicNode
*
pPart
,
SNodeList
**
pMergeKeys
)
{
SNode
*
pPrimaryKey
=
nodesCloneNode
(
stbSplFindPrimaryKeyFromScan
((
SScanLogicNode
*
)
nodesListGetNode
(
pPart
->
pChildren
,
0
)));
if
(
NULL
==
pPrimaryKey
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
nodesListAppend
(
pPart
->
pTargets
,
pPrimaryKey
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeKeysByPrimaryKey
(
pPrimaryKey
,
pMergeKeys
);
}
return
code
;
}
static
int32_t
stbSplSplitPartitionNode
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
int32_t
code
=
stbSplCreateMergeNode
(
pCxt
,
pInfo
->
pSubplan
,
pInfo
->
pSplitNode
,
NULL
,
pInfo
->
pSplitNode
,
true
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNodeList
*
pMergeKeys
=
NULL
;
if
(
pInfo
->
pSplitNode
->
requireDataOrder
>=
DATA_ORDER_LEVEL_IN_GROUP
)
{
code
=
stbSplCreateMergeKeysForPartitionNode
(
pInfo
->
pSplitNode
,
&
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
pInfo
->
pSubplan
,
pInfo
->
pSplitNode
,
pMergeKeys
,
pInfo
->
pSplitNode
,
true
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pInfo
->
pSubplan
->
pChildren
,
(
SNode
*
)
splCreateScanSubplan
(
pCxt
,
pInfo
->
pSplitNode
,
SPLIT_FLAG_STABLE_SPLIT
));
...
...
source/libs/planner/test/planPartByTest.cpp
浏览文件 @
9e13cced
...
...
@@ -67,6 +67,8 @@ TEST_F(PlanPartitionByTest, withTimeLineFunc) {
useDb
(
"root"
,
"test"
);
run
(
"SELECT TWA(c1) FROM st1 PARTITION BY c1"
);
run
(
"SELECT MAVG(c1, 2) FROM st1 PARTITION BY c1"
);
}
TEST_F
(
PlanPartitionByTest
,
withSlimit
)
{
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
9e13cced
...
...
@@ -396,6 +396,7 @@ typedef struct SDelayQueue {
int
transDQCreate
(
uv_loop_t
*
loop
,
SDelayQueue
**
queue
);
void
transDQDestroy
(
SDelayQueue
*
queue
,
void
(
*
freeFunc
)(
void
*
arg
));
SDelayTask
*
transDQSched
(
SDelayQueue
*
queue
,
void
(
*
func
)(
void
*
arg
),
void
*
arg
,
uint64_t
timeoutMs
);
void
transDQCancel
(
SDelayQueue
*
queue
,
SDelayTask
*
task
);
bool
transEpSetIsEqual
(
SEpSet
*
a
,
SEpSet
*
b
);
/*
...
...
source/libs/transport/src/transCli.c
浏览文件 @
9e13cced
...
...
@@ -27,7 +27,6 @@ typedef struct SCliConn {
SConnBuffer
readBuf
;
STransQueue
cliMsgs
;
queue
q
;
uint64_t
expireTime
;
STransCtx
ctx
;
bool
broken
;
// link broken or not
...
...
@@ -37,6 +36,7 @@ typedef struct SCliConn {
char
*
ip
;
uint32_t
port
;
SDelayTask
*
task
;
// debug and log info
struct
sockaddr_in
addr
;
struct
sockaddr_in
localAddr
;
...
...
@@ -65,6 +65,7 @@ typedef struct SCliThrd {
queue
msg
;
TdThreadMutex
msgMtx
;
SDelayQueue
*
delayQueue
;
SDelayQueue
*
timeoutQueue
;
uint64_t
nextTimeout
;
// next timeout
void
*
pTransInst
;
//
...
...
@@ -92,9 +93,10 @@ static void* createConnPool(int size);
static
void
*
destroyConnPool
(
void
*
pool
);
static
SCliConn
*
getConnFromPool
(
void
*
pool
,
char
*
ip
,
uint32_t
port
);
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
);
static
void
doCloseIdleConn
(
void
*
param
);
// register timer in each thread to clear expire conn
static
void
cliTimeoutCb
(
uv_timer_t
*
handle
);
//
static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
// callback after read nbytes from socket
...
...
@@ -184,7 +186,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
pThrd = (SCliThrd*)(exh)->pThrd; \
} \
} while (0)
#define CONN_PERSIST_TIME(para) (
para * 1000 * 10
)
#define CONN_PERSIST_TIME(para) (
(para) == 0 ? 3 * 1000 : (para)
)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
...
...
@@ -384,10 +386,6 @@ void cliHandleResp(SCliConn* conn) {
}
uv_read_start
((
uv_stream_t
*
)
conn
->
stream
,
cliAllocRecvBufferCb
,
cliRecvCb
);
// start thread's timer of conn pool if not active
if
(
!
uv_is_active
((
uv_handle_t
*
)
&
pThrd
->
timer
)
&&
pTransInst
->
idleTime
>
0
)
{
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
}
void
cliHandleExcept
(
SCliConn
*
pConn
)
{
...
...
@@ -441,30 +439,30 @@ void cliHandleExcept(SCliConn* pConn) {
transUnrefCliHandle
(
pConn
);
}
void
cliTimeoutCb
(
uv_timer_t
*
handle
)
{
SCliThrd
*
pThrd
=
handle
->
data
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
int64_t
currentTime
=
pThrd
->
nextTimeout
;
tTrace
(
"%s conn timeout, try to remove expire conn from conn pool"
,
pTransInst
->
label
);
SConnList
*
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
pool
,
NULL
);
while
(
p
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
p
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
p
->
conn
);
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
q
);
if
(
c
->
expireTime
<
currentTime
)
{
QUEUE_REMOVE
(
h
);
transUnrefCliHandle
(
c
);
}
else
{
break
;
}
}
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
pool
,
p
);
}
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pTransInst
->
idleTime
);
uv_timer_start
(
handle
,
cliTimeoutCb
,
CONN_PERSIST_TIME
(
pTransInst
->
idleTime
)
/
2
,
0
);
}
//
void cliTimeoutCb(uv_timer_t* handle) {
//
SCliThrd* pThrd = handle->data;
//
STrans* pTransInst = pThrd->pTransInst;
//
int64_t currentTime = pThrd->nextTimeout;
//
tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
//
//
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
//
while (p != NULL) {
//
while (!QUEUE_IS_EMPTY(&p->conn)) {
//
queue* h = QUEUE_HEAD(&p->conn);
//
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
//
if (c->expireTime < currentTime) {
//
QUEUE_REMOVE(h);
//
transUnrefCliHandle(c);
//
} else {
//
break;
//
}
//
}
//
p = taosHashIterate((SHashObj*)pThrd->pool, p);
//
}
//
//
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
//
uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
//
}
void
*
createConnPool
(
int
size
)
{
// thread local, no lock
...
...
@@ -506,6 +504,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
QUEUE_REMOVE
(
&
conn
->
q
);
QUEUE_INIT
(
&
conn
->
q
);
assert
(
h
==
&
conn
->
q
);
transDQCancel
(((
SCliThrd
*
)
conn
->
hostThrd
)
->
timeoutQueue
,
conn
->
task
);
conn
->
task
=
NULL
;
return
conn
;
}
static
int32_t
allocConnRef
(
SCliConn
*
conn
,
bool
update
)
{
...
...
@@ -537,6 +539,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
transReleaseExHandle
(
transGetRefMgt
(),
handle
);
return
0
;
}
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
)
{
if
(
conn
->
status
==
ConnInPool
)
{
return
;
...
...
@@ -547,7 +550,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
allocConnRef
(
conn
,
true
);
STrans
*
pTransInst
=
thrd
->
pTransInst
;
conn
->
expireTime
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pTransInst
->
idleTime
);
cliReleaseUnfinishedMsg
(
conn
);
transQueueClear
(
&
conn
->
cliMsgs
);
transCtxCleanup
(
&
conn
->
ctx
);
...
...
@@ -562,7 +564,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
assert
(
plist
!=
NULL
);
QUEUE_INIT
(
&
conn
->
q
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
q
);
assert
(
!
QUEUE_IS_EMPTY
(
&
plist
->
conn
));
STaskArg
*
arg
=
taosMemoryCalloc
(
1
,
sizeof
(
STaskArg
));
arg
->
param1
=
conn
;
arg
->
param2
=
thrd
;
conn
->
task
=
transDQSched
(
thrd
->
timeoutQueue
,
doCloseIdleConn
,
arg
,
CONN_PERSIST_TIME
(
pTransInst
->
idleTime
));
}
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
SCliConn
*
conn
=
handle
->
data
;
...
...
@@ -631,6 +639,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
conn
->
refId
=
-
1
;
if
(
conn
->
task
!=
NULL
)
transDQCancel
(((
SCliThrd
*
)
conn
->
hostThrd
)
->
timeoutQueue
,
conn
->
task
);
if
(
clear
)
{
if
(
!
uv_is_closing
((
uv_handle_t
*
)
conn
->
stream
))
{
uv_read_stop
(
conn
->
stream
);
...
...
@@ -997,6 +1007,8 @@ static SCliThrd* createThrdObj() {
pThrd
->
pool
=
createConnPool
(
4
);
transDQCreate
(
pThrd
->
loop
,
&
pThrd
->
delayQueue
);
transDQCreate
(
pThrd
->
loop
,
&
pThrd
->
timeoutQueue
);
pThrd
->
quit
=
false
;
return
pThrd
;
}
...
...
@@ -1012,6 +1024,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transAsyncPoolDestroy
(
pThrd
->
asyncPool
);
transDQDestroy
(
pThrd
->
delayQueue
,
destroyCmsg
);
transDQDestroy
(
pThrd
->
timeoutQueue
,
NULL
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
);
}
...
...
@@ -1058,6 +1071,10 @@ static void doCloseIdleConn(void* param) {
STaskArg
*
arg
=
param
;
SCliConn
*
conn
=
arg
->
param1
;
SCliThrd
*
pThrd
=
arg
->
param2
;
tTrace
(
"%s conn %p idle, close it"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
conn
->
task
=
NULL
;
cliDestroyConn
(
conn
,
true
);
taosMemoryFree
(
arg
);
}
static
void
cliSchedMsgToNextNode
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
...
...
@@ -1248,11 +1265,17 @@ int transReleaseCliHandle(void* handle) {
if
(
pThrd
==
NULL
)
{
return
-
1
;
}
STransMsg
tmsg
=
{.
info
.
handle
=
handle
};
SCliMsg
*
cmsg
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliMsg
));
TRACE_SET_MSGID
(
&
tmsg
.
info
.
traceId
,
tGenIdPI64
());
SCliMsg
*
cmsg
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliMsg
));
cmsg
->
msg
=
tmsg
;
cmsg
->
type
=
Release
;
STraceId
*
trace
=
&
tmsg
.
info
.
traceId
;
tGDebug
(
"send release request at thread:%08"
PRId64
""
,
pThrd
->
pid
);
if
(
0
!=
transAsyncSend
(
pThrd
->
asyncPool
,
&
cmsg
->
q
))
{
return
-
1
;
}
...
...
source/libs/transport/src/transComm.c
浏览文件 @
9e13cced
...
...
@@ -480,7 +480,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
SDelayTask
*
task
=
container_of
(
minNode
,
SDelayTask
,
node
);
STaskArg
*
arg
=
task
->
arg
;
freeFunc
(
arg
->
param1
);
if
(
freeFunc
)
freeFunc
(
arg
->
param1
);
taosMemoryFree
(
arg
);
taosMemoryFree
(
task
);
...
...
@@ -491,9 +491,16 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
void
transDQCancel
(
SDelayQueue
*
queue
,
SDelayTask
*
task
)
{
uv_timer_stop
(
queue
->
timer
);
if
(
heapSize
(
queue
->
heap
)
<=
0
)
return
;
if
(
heapSize
(
queue
->
heap
)
<=
0
)
{
taosMemoryFree
(
task
->
arg
);
taosMemoryFree
(
task
);
return
;
}
heapRemove
(
queue
->
heap
,
&
task
->
node
);
taosMemoryFree
(
task
->
arg
);
taosMemoryFree
(
task
);
if
(
heapSize
(
queue
->
heap
)
!=
0
)
{
HeapNode
*
minNode
=
heapMin
(
queue
->
heap
);
if
(
minNode
!=
NULL
)
return
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
9e13cced
...
...
@@ -149,32 +149,34 @@ static void* transAcceptThread(void* arg);
static
bool
addHandleToWorkloop
(
SWorkThrd
*
pThrd
,
char
*
pipeName
);
static
bool
addHandleToAcceptloop
(
void
*
arg
);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
reallocConnRef(conn); \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
reallocConnRef(conn); \
tTrace("conn %p received release request", conn); \
\
STraceId traceId = head->traceId; \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
\
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = NULL}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
#define SRV_RELEASE_UV(loop) \
...
...
tests/system-test/7-tmq/tmqAutoCreateTbl.py
浏览文件 @
9e13cced
...
...
@@ -18,7 +18,7 @@ class TDTestCase:
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
ctbNum
=
10
00
self
.
ctbNum
=
5
00
self
.
rowsPerTbl
=
1000
def
init
(
self
,
conn
,
logSql
):
...
...
@@ -38,9 +38,9 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
00
,
'ctbNum'
:
5
00
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
4
00
,
'batchNum'
:
5
00
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
...
...
@@ -83,9 +83,9 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
00
,
'ctbNum'
:
5
00
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
10
00
,
'batchNum'
:
5
00
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
...
...
@@ -156,7 +156,7 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
00
,
'ctbNum'
:
5
00
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
1000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
...
...
tests/system-test/7-tmq/tmqCommon.py
浏览文件 @
9e13cced
...
...
@@ -539,16 +539,30 @@ class TMQCom:
insert_sql
=
f
'insert into
{
dbname
}
.
{
tbname_prefix
}{
tblIdx
+
tbname_index_start_num
}
values (
{
column_value_str
}
);'
tsql
.
execute
(
insert_sql
)
def
waitSubscriptionExit
(
self
,
tsql
,
max_wait_count
=
20
):
def
waitSubscriptionExit
(
self
,
tsql
,
topicName
):
wait_cnt
=
0
while
(
wait_cnt
<
max_wait_count
):
while
True
:
exit_flag
=
1
tsql
.
query
(
"show subscriptions"
)
if
tsql
.
getRows
()
==
0
:
break
else
:
time
.
sleep
(
2
)
wait_cnt
+=
1
rows
=
tsql
.
getRows
()
for
idx
in
range
(
rows
):
if
tsql
.
getData
(
idx
,
0
)
!=
topicName
:
continue
if
tsql
.
getData
(
idx
,
3
)
==
None
:
continue
else
:
time
.
sleep
(
0.5
)
wait_cnt
+=
1
exit_flag
=
0
break
if
exit_flag
==
1
:
break
tsql
.
query
(
"show subscriptions"
)
tdLog
.
info
(
"show subscriptions:"
)
tdLog
.
info
(
tsql
.
queryResult
)
tdLog
.
info
(
"wait subscriptions exit for %d s"
%
wait_cnt
)
def
close
(
self
):
...
...
tests/system-test/7-tmq/tmqDropNtb.py
浏览文件 @
9e13cced
...
...
@@ -103,7 +103,7 @@ class TDTestCase:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
...
...
@@ -196,7 +196,7 @@ class TDTestCase:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
...
...
tests/system-test/7-tmq/tmqDropStbCtb.py
浏览文件 @
9e13cced
...
...
@@ -51,7 +51,7 @@ class TDTestCase:
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
#
tmqCom.initConsumerTable()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
...
...
@@ -98,6 +98,8 @@ class TDTestCase:
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
# again create one new stb1
paraDict
[
"stbName"
]
=
'stb1'
paraDict
[
'ctbPrefix'
]
=
'ctb1n_'
...
...
@@ -157,7 +159,7 @@ class TDTestCase:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
...
...
@@ -191,6 +193,8 @@ class TDTestCase:
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
# again create one new stb1
paraDict
[
"stbName"
]
=
'stb2'
paraDict
[
'ctbPrefix'
]
=
'ctb2n_'
...
...
@@ -209,9 +213,9 @@ class TDTestCase:
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicFromDb
,
paraDict
[
'dbName'
]))
if
self
.
snapshot
==
0
:
consumerId
=
0
consumerId
=
2
elif
self
.
snapshot
==
1
:
consumerId
=
1
consumerId
=
3
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
)
topicList
=
topicFromDb
...
...
@@ -246,7 +250,7 @@ class TDTestCase:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
...
...
tests/system-test/7-tmq/tmqUpdateWithConsume.py
0 → 100644
浏览文件 @
9e13cced
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
1000
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
# update to half tables
paraDict
[
'rowsPerTbl'
]
=
int
(
self
.
rowsPerTbl
/
2
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# paraDict['ctbNum'] = self.ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
consumerId
=
0
if
self
.
snapshot
==
0
:
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
1
+
1
/
2
+
1
))
elif
self
.
snapshot
==
1
:
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
2
))
topicList
=
topicFromStb1
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
# update to half data
paraDict
[
"startTs"
]
=
paraDict
[
"startTs"
]
+
int
(
self
.
rowsPerTbl
/
2
)
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d, act insert rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
self
.
snapshot
==
0
:
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error!"
)
elif
self
.
snapshot
==
1
:
if
not
(
totalConsumeRows
<
expectrowcnt
and
totalConsumeRows
>=
totalRowsInserted
):
tdLog
.
exit
(
"tmq consume rows error!"
)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
ctbNum
=
1
self
.
snapshot
=
0
self
.
prepareTestEnv
()
self
.
tmqCase1
()
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
prepareTestEnv
()
self
.
snapshot
=
1
self
.
tmqCase1
()
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
ctbNum
=
100
self
.
snapshot
=
0
self
.
prepareTestEnv
()
self
.
tmqCase1
()
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
prepareTestEnv
()
self
.
snapshot
=
1
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
9e13cced
...
...
@@ -209,17 +209,20 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
#
python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
python3 ./test.py
-f
7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
python3 ./test.py
-f
7-tmq/tmqUpdate-1ctb.py
python3 ./test.py
-f
7-tmq/tmqUpdateWithConsume.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot1.py
python3 ./test.py
-f
7-tmq/tmqDelete-1ctb.py
python3 ./test.py
-f
7-tmq/tmqDelete-multiCtb.py
python3 ./test.py
-f
7-tmq/tmqDropStb.py
python3 ./test.py
-f
7-tmq/tmqDropStbCtb.py
python3 ./test.py
-f
7-tmq/tmqDropNtb.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
#
python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py
-f
7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py
-f
7-tmq/stbTagFilter-1ctb.py
# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
...
...
tools/shell/inc/shellInt.h
浏览文件 @
9e13cced
...
...
@@ -26,6 +26,10 @@
#include "ttypes.h"
#include "tutil.h"
#ifdef WEBSOCKET
#include "taosws.h"
#endif
#define SHELL_MAX_HISTORY_SIZE 1000
#define SHELL_MAX_COMMAND_SIZE 1048586
#define SHELL_HISTORY_FILE ".taos_history"
...
...
@@ -67,6 +71,12 @@ typedef struct {
int32_t
pktNum
;
int32_t
displayWidth
;
int32_t
abort
;
#ifdef WEBSOCKET
bool
restful
;
bool
cloud
;
char
*
dsn
;
int32_t
timeout
;
#endif
}
SShellArgs
;
typedef
struct
{
...
...
@@ -85,6 +95,10 @@ typedef struct {
TAOS
*
conn
;
TdThread
pid
;
tsem_t
cancelSem
;
#ifdef WEBSOCKET
WS_TAOS
*
ws_conn
;
bool
stop_query
;
#endif
}
SShellObj
;
// shellArguments.c
...
...
@@ -95,7 +109,10 @@ int32_t shellReadCommand(char* command);
// shellEngine.c
int32_t
shellExecute
();
int32_t
shellCalcColWidth
(
TAOS_FIELD
*
field
,
int32_t
precision
);
void
shellPrintHeader
(
TAOS_FIELD
*
fields
,
int32_t
*
width
,
int32_t
num_fields
);
void
shellPrintField
(
const
char
*
val
,
TAOS_FIELD
*
field
,
int32_t
width
,
int32_t
length
,
int32_t
precision
);
void
shellDumpFieldToFile
(
TdFilePtr
pFile
,
const
char
*
val
,
TAOS_FIELD
*
field
,
int32_t
length
,
int32_t
precision
);
// shellUtil.c
int32_t
shellCheckIntSize
();
void
shellPrintVersion
();
...
...
@@ -109,6 +126,14 @@ void shellExit();
// shellNettest.c
void
shellTestNetWork
();
#ifdef WEBSOCKET
void
shellCheckConnectMode
();
// shellWebsocket.c
int
shell_conn_ws_server
(
bool
first
);
int32_t
shell_run_websocket
();
void
shellRunSingleCommandWebsocketImp
(
char
*
command
);
#endif
// shellMain.c
extern
SShellObj
shell
;
...
...
tools/shell/src/shellArguments.c
浏览文件 @
9e13cced
...
...
@@ -43,6 +43,12 @@
#define SHELL_VERSION "Print program version."
#define SHELL_EMAIL "<support@taosdata.com>"
#ifdef WEBSOCKET
#define SHELL_DSN "The dsn to use when connecting to cloud server."
#define SHELL_REST "Use restful mode when connecting."
#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 10."
#endif
static
int32_t
shellParseSingleOpt
(
int32_t
key
,
char
*
arg
);
void
shellPrintHelp
()
{
...
...
@@ -65,6 +71,11 @@ void shellPrintHelp() {
printf
(
"%s%s%s%s
\r\n
"
,
indent
,
"-s,"
,
indent
,
SHELL_CMD
);
printf
(
"%s%s%s%s
\r\n
"
,
indent
,
"-t,"
,
indent
,
SHELL_STARTUP
);
printf
(
"%s%s%s%s
\r\n
"
,
indent
,
"-u,"
,
indent
,
SHELL_USER
);
#ifdef WEBSOCKET
printf
(
"%s%s%s%s
\r\n
"
,
indent
,
"-E,"
,
indent
,
SHELL_DSN
);
printf
(
"%s%s%s%s
\r\n
"
,
indent
,
"-R,"
,
indent
,
SHELL_REST
);
printf
(
"%s%s%s%s
\r\n
"
,
indent
,
"-T,"
,
indent
,
SHELL_TIMEOUT
);
#endif
printf
(
"%s%s%s%s
\r\n
"
,
indent
,
"-w,"
,
indent
,
SHELL_WIDTH
);
printf
(
"%s%s%s%s
\r\n
"
,
indent
,
"-V,"
,
indent
,
SHELL_VERSION
);
printf
(
"
\r\n\r\n
Report bugs to %s.
\r\n
"
,
SHELL_EMAIL
);
...
...
@@ -95,6 +106,11 @@ static struct argp_option shellOptions[] = {
{
"display-width"
,
'w'
,
"WIDTH"
,
0
,
SHELL_WIDTH
},
{
"netrole"
,
'n'
,
"NETROLE"
,
0
,
SHELL_NET_ROLE
},
{
"pktlen"
,
'l'
,
"PKTLEN"
,
0
,
SHELL_PKG_LEN
},
#ifdef WEBSOCKET
{
"dsn"
,
'E'
,
"DSN"
,
0
,
SHELL_DSN
},
{
"restful"
,
'R'
,
0
,
0
,
SHELL_REST
},
{
"timeout"
,
'T'
,
"SECONDS"
,
0
,
SHELL_TIMEOUT
},
#endif
{
"pktnum"
,
'N'
,
"PKTNUM"
,
0
,
SHELL_PKT_NUM
},
{
0
},
};
...
...
@@ -120,9 +136,15 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
switch
(
key
)
{
case
'h'
:
pArgs
->
host
=
arg
;
#ifdef WEBSOCKET
pArgs
->
cloud
=
false
;
#endif
break
;
case
'P'
:
pArgs
->
port
=
atoi
(
arg
);
#ifdef WEBSOCKET
pArgs
->
cloud
=
false
;
#endif
if
(
pArgs
->
port
==
0
)
pArgs
->
port
=
-
1
;
break
;
case
'u'
:
...
...
@@ -137,6 +159,9 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
pArgs
->
is_gen_auth
=
true
;
break
;
case
'c'
:
#ifdef WEBSOCKET
pArgs
->
cloud
=
false
;
#endif
pArgs
->
cfgdir
=
arg
;
break
;
case
'C'
:
...
...
@@ -172,6 +197,18 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
case
'N'
:
pArgs
->
pktNum
=
atoi
(
arg
);
break
;
#ifdef WEBSOCKET
case
'R'
:
pArgs
->
restful
=
true
;
break
;
case
'E'
:
pArgs
->
dsn
=
arg
;
pArgs
->
cloud
=
true
;
break
;
case
'T'
:
pArgs
->
timeout
=
atoi
(
arg
);
break
;
#endif
case
'V'
:
pArgs
->
is_version
=
true
;
break
;
...
...
@@ -208,7 +245,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
}
if
(
key
[
1
]
==
'h'
||
key
[
1
]
==
'P'
||
key
[
1
]
==
'u'
||
key
[
1
]
==
'a'
||
key
[
1
]
==
'c'
||
key
[
1
]
==
's'
||
key
[
1
]
==
'f'
||
key
[
1
]
==
'd'
||
key
[
1
]
==
'w'
||
key
[
1
]
==
'n'
||
key
[
1
]
==
'l'
||
key
[
1
]
==
'N'
)
{
key
[
1
]
==
'f'
||
key
[
1
]
==
'd'
||
key
[
1
]
==
'w'
||
key
[
1
]
==
'n'
||
key
[
1
]
==
'l'
||
key
[
1
]
==
'N'
#ifdef WEBSOCKET
||
key
[
1
]
==
'E'
||
key
[
1
]
==
'T'
#endif
)
{
if
(
i
+
1
>=
argc
)
{
fprintf
(
stderr
,
"option %s requires an argument
\r\n
"
,
key
);
return
-
1
;
...
...
@@ -221,7 +262,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
shellParseSingleOpt
(
key
[
1
],
val
);
i
++
;
}
else
if
(
key
[
1
]
==
'p'
||
key
[
1
]
==
'A'
||
key
[
1
]
==
'C'
||
key
[
1
]
==
'r'
||
key
[
1
]
==
'k'
||
key
[
1
]
==
't'
||
key
[
1
]
==
'V'
||
key
[
1
]
==
'?'
||
key
[
1
]
==
1
)
{
key
[
1
]
==
't'
||
key
[
1
]
==
'V'
||
key
[
1
]
==
'?'
||
key
[
1
]
==
1
#ifdef WEBSOCKET
||
key
[
1
]
==
'R'
#endif
)
{
shellParseSingleOpt
(
key
[
1
],
NULL
);
}
else
{
fprintf
(
stderr
,
"invalid option %s
\r\n
"
,
key
);
...
...
tools/shell/src/shellEngine.c
浏览文件 @
9e13cced
...
...
@@ -25,14 +25,9 @@ static int32_t shellRunSingleCommand(char *command);
static
int32_t
shellRunCommand
(
char
*
command
);
static
void
shellRunSingleCommandImp
(
char
*
command
);
static
char
*
shellFormatTimestamp
(
char
*
buf
,
int64_t
val
,
int32_t
precision
);
static
void
shellDumpFieldToFile
(
TdFilePtr
pFile
,
const
char
*
val
,
TAOS_FIELD
*
field
,
int32_t
length
,
int32_t
precision
);
static
int32_t
shellDumpResultToFile
(
const
char
*
fname
,
TAOS_RES
*
tres
);
static
void
shellPrintNChar
(
const
char
*
str
,
int32_t
length
,
int32_t
width
);
static
void
shellPrintField
(
const
char
*
val
,
TAOS_FIELD
*
field
,
int32_t
width
,
int32_t
length
,
int32_t
precision
);
static
int32_t
shellVerticalPrintResult
(
TAOS_RES
*
tres
,
const
char
*
sql
);
static
int32_t
shellCalcColWidth
(
TAOS_FIELD
*
field
,
int32_t
precision
);
static
void
shellPrintHeader
(
TAOS_FIELD
*
fields
,
int32_t
*
width
,
int32_t
num_fields
);
static
int32_t
shellHorizontalPrintResult
(
TAOS_RES
*
tres
,
const
char
*
sql
);
static
int32_t
shellDumpResult
(
TAOS_RES
*
tres
,
char
*
fname
,
int32_t
*
error_no
,
bool
vertical
,
const
char
*
sql
);
static
void
shellReadHistory
();
...
...
@@ -94,8 +89,15 @@ int32_t shellRunSingleCommand(char *command) {
shellSourceFile
(
c_ptr
);
return
0
;
}
shellRunSingleCommandImp
(
command
);
#ifdef WEBSOCKET
if
(
shell
.
args
.
restful
||
shell
.
args
.
cloud
)
{
shellRunSingleCommandWebsocketImp
(
command
);
}
else
{
#endif
shellRunSingleCommandImp
(
command
);
#ifdef WEBSOCKET
}
#endif
return
0
;
}
...
...
@@ -937,7 +939,16 @@ void *shellCancelHandler(void *arg) {
taosMsleep
(
10
);
continue
;
}
taos_kill_query
(
shell
.
conn
);
#ifdef WEBSOCKET
if
(
shell
.
args
.
restful
||
shell
.
args
.
cloud
)
{
shell
.
stop_query
=
true
;
}
else
{
#endif
taos_kill_query
(
shell
.
conn
);
#ifdef WEBSOCKET
}
#endif
#ifdef WINDOWS
printf
(
"
\n
%s"
,
shell
.
info
.
promptHeader
);
#endif
...
...
@@ -981,16 +992,26 @@ int32_t shellExecute() {
fflush
(
stdout
);
SShellArgs
*
pArgs
=
&
shell
.
args
;
if
(
shell
.
args
.
auth
==
NULL
)
{
shell
.
conn
=
taos_connect
(
pArgs
->
host
,
pArgs
->
user
,
pArgs
->
password
,
pArgs
->
database
,
pArgs
->
port
);
#ifdef WEBSOCKET
if
(
shell
.
args
.
restful
||
shell
.
args
.
cloud
)
{
if
(
shell_conn_ws_server
(
1
))
{
return
-
1
;
}
}
else
{
shell
.
conn
=
taos_connect_auth
(
pArgs
->
host
,
pArgs
->
user
,
pArgs
->
auth
,
pArgs
->
database
,
pArgs
->
port
);
}
if
(
shell
.
conn
==
NULL
)
{
fflush
(
stdout
);
return
-
1
;
#endif
if
(
shell
.
args
.
auth
==
NULL
)
{
shell
.
conn
=
taos_connect
(
pArgs
->
host
,
pArgs
->
user
,
pArgs
->
password
,
pArgs
->
database
,
pArgs
->
port
);
}
else
{
shell
.
conn
=
taos_connect_auth
(
pArgs
->
host
,
pArgs
->
user
,
pArgs
->
auth
,
pArgs
->
database
,
pArgs
->
port
);
}
if
(
shell
.
conn
==
NULL
)
{
fflush
(
stdout
);
return
-
1
;
}
#ifdef WEBSOCKET
}
#endif
shellReadHistory
();
...
...
@@ -1005,8 +1026,16 @@ int32_t shellExecute() {
if
(
pArgs
->
file
[
0
]
!=
0
)
{
shellSourceFile
(
pArgs
->
file
);
}
#ifdef WEBSOCKET
if
(
shell
.
args
.
restful
||
shell
.
args
.
cloud
)
{
ws_close
(
shell
.
ws_conn
);
}
else
{
#endif
taos_close
(
shell
.
conn
);
#ifdef WEBSOCKET
}
#endif
taos_close
(
shell
.
conn
);
shellWriteHistory
();
shellCleanupHistory
();
return
0
;
...
...
@@ -1026,10 +1055,15 @@ int32_t shellExecute() {
taosSetSignal
(
SIGINT
,
shellQueryInterruptHandler
);
shellGetGrantInfo
();
#ifdef WEBSOCKET
if
(
!
shell
.
args
.
restful
&&
!
shell
.
args
.
cloud
)
{
#endif
shellGetGrantInfo
();
#ifdef WEBSOCKET
}
#endif
while
(
1
)
{
taosThreadCreate
(
&
shell
.
pid
,
NULL
,
shellThreadLoop
,
shell
.
conn
);
taosThreadCreate
(
&
shell
.
pid
,
NULL
,
shellThreadLoop
,
NULL
);
taosThreadJoin
(
shell
.
pid
,
NULL
);
taosThreadClear
(
&
shell
.
pid
);
}
...
...
tools/shell/src/shellMain.c
浏览文件 @
9e13cced
...
...
@@ -19,6 +19,11 @@
SShellObj
shell
=
{
0
};
int
main
(
int
argc
,
char
*
argv
[])
{
#ifdef WEBSOCKET
shell
.
args
.
timeout
=
10
;
shell
.
args
.
cloud
=
true
;
#endif
if
(
shellCheckIntSize
()
!=
0
)
{
return
-
1
;
}
...
...
@@ -41,7 +46,9 @@ int main(int argc, char *argv[]) {
shellPrintHelp
();
return
0
;
}
#ifdef WEBSOCKET
shellCheckConnectMode
();
#endif
taos_init
();
if
(
shell
.
args
.
is_dump_config
)
{
...
...
tools/shell/src/shellUtil.c
浏览文件 @
9e13cced
...
...
@@ -121,6 +121,36 @@ void shellCheckServerStatus() {
}
}
while
(
1
);
}
#ifdef WEBSOCKET
void
shellCheckConnectMode
()
{
if
(
shell
.
args
.
dsn
)
{
shell
.
args
.
cloud
=
true
;
shell
.
args
.
restful
=
false
;
return
;
}
if
(
shell
.
args
.
cloud
)
{
shell
.
args
.
dsn
=
getenv
(
"TDENGINE_CLOUD_DSN"
);
if
(
shell
.
args
.
dsn
)
{
shell
.
args
.
cloud
=
true
;
shell
.
args
.
restful
=
false
;
return
;
}
if
(
shell
.
args
.
restful
)
{
if
(
!
shell
.
args
.
host
)
{
shell
.
args
.
host
=
"localhost"
;
}
if
(
!
shell
.
args
.
port
)
{
shell
.
args
.
port
=
6041
;
}
shell
.
args
.
dsn
=
taosMemoryCalloc
(
1
,
1024
);
snprintf
(
shell
.
args
.
dsn
,
1024
,
"ws://%s:%d/rest/ws"
,
shell
.
args
.
host
,
shell
.
args
.
port
);
}
shell
.
args
.
cloud
=
false
;
return
;
}
}
#endif
void
shellExit
()
{
if
(
shell
.
conn
!=
NULL
)
{
...
...
@@ -129,4 +159,4 @@ void shellExit() {
}
taos_cleanup
();
exit
(
EXIT_FAILURE
);
}
\ No newline at end of file
}
tools/shell/src/shellWebsocket.c
0 → 100644
浏览文件 @
9e13cced
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef WEBSOCKET
#include "taosws.h"
#include "shellInt.h"
int
shell_conn_ws_server
(
bool
first
)
{
shell
.
ws_conn
=
ws_connect_with_dsn
(
shell
.
args
.
dsn
);
if
(
!
shell
.
ws_conn
)
{
fprintf
(
stderr
,
"failed to connect %s, reason: %s
\n
"
,
shell
.
args
.
dsn
,
ws_errstr
(
NULL
));
return
-
1
;
}
if
(
first
&&
shell
.
args
.
restful
)
{
fprintf
(
stdout
,
"successfully connect to %s
\n\n
"
,
shell
.
args
.
dsn
);
}
else
if
(
first
&&
shell
.
args
.
cloud
)
{
fprintf
(
stdout
,
"successfully connect to cloud service
\n
"
);
}
return
0
;
}
static
int
horizontalPrintWebsocket
(
WS_RES
*
wres
)
{
const
void
*
data
=
NULL
;
int
rows
;
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
if
(
!
rows
)
{
return
0
;
}
int
num_fields
=
ws_field_count
(
wres
);
TAOS_FIELD
*
fields
=
(
TAOS_FIELD
*
)
ws_fetch_fields
(
wres
);
int
precision
=
ws_result_precision
(
wres
);
int
width
[
TSDB_MAX_COLUMNS
];
for
(
int
col
=
0
;
col
<
num_fields
;
col
++
)
{
width
[
col
]
=
shellCalcColWidth
(
fields
+
col
,
precision
);
}
shellPrintHeader
(
fields
,
width
,
num_fields
);
int
numOfRows
=
0
;
do
{
uint8_t
ty
;
uint32_t
len
;
for
(
int
i
=
0
;
i
<
rows
;
i
++
)
{
for
(
int
j
=
0
;
j
<
num_fields
;
j
++
)
{
putchar
(
' '
);
const
void
*
value
=
ws_get_value_in_block
(
wres
,
i
,
j
,
&
ty
,
&
len
);
shellPrintField
((
const
char
*
)
value
,
fields
+
j
,
width
[
j
],
len
,
precision
);
putchar
(
' '
);
putchar
(
'|'
);
}
putchar
(
'\r'
);
putchar
(
'\n'
);
}
numOfRows
+=
rows
;
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
}
while
(
rows
&&
!
shell
.
stop_query
);
return
numOfRows
;
}
static
int
verticalPrintWebsocket
(
WS_RES
*
wres
)
{
int
rows
=
0
;
const
void
*
data
=
NULL
;
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
if
(
!
rows
)
{
return
0
;
}
int
num_fields
=
ws_field_count
(
wres
);
TAOS_FIELD
*
fields
=
(
TAOS_FIELD
*
)
ws_fetch_fields
(
wres
);
int
precision
=
ws_result_precision
(
wres
);
int
maxColNameLen
=
0
;
for
(
int
col
=
0
;
col
<
num_fields
;
col
++
)
{
int
len
=
(
int
)
strlen
(
fields
[
col
].
name
);
if
(
len
>
maxColNameLen
)
{
maxColNameLen
=
len
;
}
}
int
numOfRows
=
0
;
do
{
uint8_t
ty
;
uint32_t
len
;
for
(
int
i
=
0
;
i
<
rows
;
i
++
)
{
printf
(
"*************************** %d.row ***************************
\n
"
,
numOfRows
+
1
);
for
(
int
j
=
0
;
j
<
num_fields
;
j
++
)
{
TAOS_FIELD
*
field
=
fields
+
j
;
int
padding
=
(
int
)(
maxColNameLen
-
strlen
(
field
->
name
));
printf
(
"%*.s%s: "
,
padding
,
" "
,
field
->
name
);
const
void
*
value
=
ws_get_value_in_block
(
wres
,
i
,
j
,
&
ty
,
&
len
);
shellPrintField
((
const
char
*
)
value
,
field
,
0
,
len
,
precision
);
putchar
(
'\n'
);
}
numOfRows
++
;
}
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
}
while
(
rows
&&
!
shell
.
stop_query
);
return
numOfRows
;
}
static
int
dumpWebsocketToFile
(
const
char
*
fname
,
WS_RES
*
wres
)
{
char
fullname
[
PATH_MAX
]
=
{
0
};
if
(
taosExpandDir
(
fname
,
fullname
,
PATH_MAX
)
!=
0
)
{
tstrncpy
(
fullname
,
fname
,
PATH_MAX
);
}
TdFilePtr
pFile
=
taosOpenFile
(
fullname
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
pFile
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file: %s
\r\n
"
,
fullname
);
return
-
1
;
}
int
rows
=
0
;
const
void
*
data
=
NULL
;
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
if
(
!
rows
)
{
taosCloseFile
(
&
pFile
);
return
0
;
}
int
numOfRows
=
0
;
TAOS_FIELD
*
fields
=
(
TAOS_FIELD
*
)
ws_fetch_fields
(
wres
);
int
num_fields
=
ws_field_count
(
wres
);
int
precision
=
ws_result_precision
(
wres
);
for
(
int
col
=
0
;
col
<
num_fields
;
col
++
)
{
if
(
col
>
0
)
{
taosFprintfFile
(
pFile
,
","
);
}
taosFprintfFile
(
pFile
,
"%s"
,
fields
[
col
].
name
);
}
taosFprintfFile
(
pFile
,
"
\r\n
"
);
do
{
uint8_t
ty
;
uint32_t
len
;
numOfRows
+=
rows
;
for
(
int
i
=
0
;
i
<
rows
;
i
++
)
{
for
(
int
j
=
0
;
j
<
num_fields
;
j
++
)
{
if
(
j
>
0
)
{
taosFprintfFile
(
pFile
,
","
);
}
const
void
*
value
=
ws_get_value_in_block
(
wres
,
i
,
j
,
&
ty
,
&
len
);
shellDumpFieldToFile
(
pFile
,
(
const
char
*
)
value
,
fields
+
j
,
len
,
precision
);
}
taosFprintfFile
(
pFile
,
"
\r\n
"
);
}
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
}
while
(
rows
&&
!
shell
.
stop_query
);
taosCloseFile
(
&
pFile
);
return
numOfRows
;
}
static
int
shellDumpWebsocket
(
WS_RES
*
wres
,
char
*
fname
,
int
*
error_no
,
bool
vertical
)
{
int
numOfRows
=
0
;
if
(
fname
!=
NULL
)
{
numOfRows
=
dumpWebsocketToFile
(
fname
,
wres
);
}
else
if
(
vertical
)
{
numOfRows
=
verticalPrintWebsocket
(
wres
);
}
else
{
numOfRows
=
horizontalPrintWebsocket
(
wres
);
}
*
error_no
=
ws_errno
(
wres
);
return
numOfRows
;
}
void
shellRunSingleCommandWebsocketImp
(
char
*
command
)
{
int64_t
st
,
et
;
char
*
sptr
=
NULL
;
char
*
cptr
=
NULL
;
char
*
fname
=
NULL
;
bool
printMode
=
false
;
if
((
sptr
=
strstr
(
command
,
">>"
))
!=
NULL
)
{
cptr
=
strstr
(
command
,
";"
);
if
(
cptr
!=
NULL
)
{
*
cptr
=
'\0'
;
}
fname
=
sptr
+
2
;
while
(
*
fname
==
' '
)
fname
++
;
*
sptr
=
'\0'
;
}
if
((
sptr
=
strstr
(
command
,
"
\\
G"
))
!=
NULL
)
{
cptr
=
strstr
(
command
,
";"
);
if
(
cptr
!=
NULL
)
{
*
cptr
=
'\0'
;
}
*
sptr
=
'\0'
;
printMode
=
true
;
// When output to a file, the switch does not work.
}
if
(
!
shell
.
ws_conn
&&
shell_conn_ws_server
(
0
))
{
return
;
}
shell
.
stop_query
=
false
;
st
=
taosGetTimestampUs
();
WS_RES
*
res
=
ws_query_timeout
(
shell
.
ws_conn
,
command
,
shell
.
args
.
timeout
);
int
code
=
ws_errno
(
res
);
if
(
code
!=
0
)
{
et
=
taosGetTimestampUs
();
fprintf
(
stderr
,
"
\n
DB: error: %s (%.6fs)
\n
"
,
ws_errstr
(
res
),
(
et
-
st
)
/
1E6
);
if
(
code
==
TSDB_CODE_WS_SEND_TIMEOUT
||
code
==
TSDB_CODE_WS_RECV_TIMEOUT
)
{
fprintf
(
stderr
,
"Hint: use -t to increase the timeout in seconds
\n
"
);
}
else
if
(
code
==
TSDB_CODE_WS_INTERNAL_ERRO
||
code
==
TSDB_CODE_WS_CLOSED
)
{
fprintf
(
stderr
,
"TDengine server is down, will try to reconnect
\n
"
);
shell
.
ws_conn
=
NULL
;
}
ws_free_result
(
res
);
return
;
}
if
(
shellRegexMatch
(
command
,
"^
\\
s*use
\\
s+[a-zA-Z0-9_]+
\\
s*;
\\
s*$"
,
REG_EXTENDED
|
REG_ICASE
))
{
fprintf
(
stdout
,
"Database changed.
\r\n\r\n
"
);
fflush
(
stdout
);
ws_free_result
(
res
);
return
;
}
int
numOfRows
=
0
;
if
(
ws_is_update_query
(
res
))
{
numOfRows
=
ws_affected_rows
(
res
);
et
=
taosGetTimestampUs
();
printf
(
"Query Ok, %d of %d row(s) in database (%.6fs)
\n
"
,
numOfRows
,
numOfRows
,
(
et
-
st
)
/
1E6
);
}
else
{
int
error_no
=
0
;
numOfRows
=
shellDumpWebsocket
(
res
,
fname
,
&
error_no
,
printMode
);
if
(
numOfRows
<
0
)
{
ws_free_result
(
res
);
return
;
}
et
=
taosGetTimestampUs
();
if
(
error_no
==
0
&&
!
shell
.
stop_query
)
{
printf
(
"Query OK, %d row(s) in set (%.6fs)
\n
"
,
numOfRows
,
(
et
-
st
)
/
1E6
);
}
else
{
printf
(
"Query interrupted, %d row(s) in set (%.6fs)
\n
"
,
numOfRows
,
(
et
-
st
)
/
1E6
);
}
}
printf
(
"
\n
"
);
ws_free_result
(
res
);
}
#endif
taosws-rs
@
9de599dc
比较
c5fded26
...
9de599dc
Subproject commit
c5fded266d3b10508e38bf3285bb7ecf798bc34
3
Subproject commit
9de599dc5293e9c90bc00bc4a03f8b91ba756bc
3
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录