Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a696ed58
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a696ed58
编写于
5月 18, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test(stream): _wstartts should be reverse quoted
上级
9f599c23
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
195 addition
and
183 deletion
+195
-183
example/src/tmq.c
example/src/tmq.c
+2
-2
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+11
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+41
-44
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+77
-74
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+51
-53
tests/script/tsim/tstream/basic1.sim
tests/script/tsim/tstream/basic1.sim
+8
-8
未找到文件。
example/src/tmq.c
浏览文件 @
a696ed58
...
...
@@ -106,8 +106,8 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as abc1"
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
a696ed58
...
...
@@ -206,7 +206,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
static
int32_t
mndDoRebalance
(
SMnode
*
pMnode
,
const
SMqRebInputObj
*
pInput
,
SMqRebOutputObj
*
pOutput
)
{
int32_t
totalVgNum
=
pOutput
->
pSub
->
vgNum
;
mInfo
(
"mq rebalance subscription: %s, vgNum: %d"
,
pOutput
->
pSub
->
key
,
pOutput
->
pSub
->
vgNum
);
mInfo
(
"mq rebalance
:
subscription: %s, vgNum: %d"
,
pOutput
->
pSub
->
key
,
pOutput
->
pSub
->
vgNum
);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj
*
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
...
...
@@ -231,6 +231,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"mq rebalance: remove vg %d from consumer %ld"
,
pVgEp
->
vgId
,
consumerId
);
}
taosHashRemove
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
));
// put into removed
...
...
@@ -250,6 +251,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
rebOutput
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"mq rebalance: remove vg %d from unassigned"
,
pVgEp
->
vgId
);
}
}
...
...
@@ -263,6 +265,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
minVgCnt
=
totalVgNum
/
afterRebConsumerNum
;
imbConsumerNum
=
totalVgNum
%
afterRebConsumerNum
;
}
mInfo
(
"mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg"
,
afterRebConsumerNum
,
minVgCnt
,
imbConsumerNum
);
// 4. first scan: remove consumer more than wanted, put to remove hash
int32_t
imbCnt
=
0
;
...
...
@@ -290,6 +294,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"mq rebalance: remove vg %d from consumer %ld (first scan)"
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
imbCnt
++
;
}
...
...
@@ -303,6 +308,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"mq rebalance: remove vg %d from consumer %ld (first scan)"
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
}
...
...
@@ -319,6 +325,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
newConsumerEp
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
taosHashPut
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
),
&
newConsumerEp
,
sizeof
(
SMqConsumerEp
));
taosArrayPush
(
pOutput
->
newConsumers
,
&
consumerId
);
mInfo
(
"mq rebalance: add new consumer %ld"
,
consumerId
);
}
}
...
...
@@ -343,6 +350,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush
(
pConsumerEp
->
vgs
,
&
pRebVg
->
pVgEp
);
pRebVg
->
newConsumerId
=
pConsumerEp
->
consumerId
;
taosArrayPush
(
pOutput
->
rebVgs
,
pRebVg
);
mInfo
(
"mq rebalance: add vg %d to consumer %ld (second scan)"
,
pRebVg
->
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
...
...
@@ -360,6 +368,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush
(
pConsumerEp
->
vgs
,
&
pRebVg
->
pVgEp
);
pRebVg
->
newConsumerId
=
pConsumerEp
->
consumerId
;
taosArrayPush
(
pOutput
->
rebVgs
,
pRebVg
);
mInfo
(
"mq rebalance: add vg %d to consumer %ld (second scan)"
,
pRebVg
->
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
else
{
// if all consumer is removed, put all vg into unassigned
...
...
@@ -372,6 +381,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
ASSERT
(
pRebOutput
->
newConsumerId
==
-
1
);
taosArrayPush
(
pOutput
->
pSub
->
unassignedVgs
,
&
pRebOutput
->
pVgEp
);
taosArrayPush
(
pOutput
->
rebVgs
,
pRebOutput
);
mInfo
(
"mq rebalance: unassign vg %d (second scan)"
,
pRebOutput
->
pVgEp
->
vgId
);
}
}
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
a696ed58
...
...
@@ -179,6 +179,7 @@ struct STQ {
SHashObj
*
pStreamTasks
;
SVnode
*
pVnode
;
SWal
*
pWal
;
// TDB* pTdb;
};
typedef
struct
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a696ed58
...
...
@@ -32,6 +32,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq
->
path
=
strdup
(
path
);
pTq
->
pVnode
=
pVnode
;
pTq
->
pWal
=
pWal
;
/*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/
/*ASSERT(0);*/
/*}*/
#if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
...
...
source/libs/executor/src/executor.c
浏览文件 @
a696ed58
...
...
@@ -106,7 +106,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
pMsg->contentLen = pMsg->contentLen;
#endif
qDebugL
(
"stream task string %s"
,
(
const
char
*
)
msg
);
/*qDebugL("stream task string %s", (const char*)msg);*/
struct
SSubplan
*
plan
=
NULL
;
int32_t
code
=
qStringToSubplan
(
msg
,
&
plan
);
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
a696ed58
...
...
@@ -15,10 +15,10 @@
#include <vnode.h>
#include "dataSinkMgt.h"
#include "texception.h"
#include "os.h"
#include "tarray.h"
#include "tcache.h"
#include "texception.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tudf.h"
...
...
@@ -32,15 +32,15 @@
typedef
struct
STaskMgmt
{
TdThreadMutex
lock
;
SCacheObj
*
qinfoPool
;
// query handle pool
int32_t
vgId
;
bool
closed
;
SCacheObj
*
qinfoPool
;
// query handle pool
int32_t
vgId
;
bool
closed
;
}
STaskMgmt
;
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
EOPTR_EXEC_MODEL
model
)
{
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
EOPTR_EXEC_MODEL
model
)
{
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
model
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -56,46 +56,46 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
handle
);
}
_error:
_error:
// if failed to add ref for all tables in this query, abort current query
return
code
;
}
#ifdef TEST_IMPL
// wait moment
int
waitMoment
(
SQInfo
*
pQInfo
)
{
if
(
pQInfo
->
sql
)
{
int
ms
=
0
;
char
*
pcnt
=
strstr
(
pQInfo
->
sql
,
" count(*)"
);
if
(
pcnt
)
return
0
;
char
*
pos
=
strstr
(
pQInfo
->
sql
,
" t_"
);
if
(
pos
)
{
int
waitMoment
(
SQInfo
*
pQInfo
)
{
if
(
pQInfo
->
sql
)
{
int
ms
=
0
;
char
*
pcnt
=
strstr
(
pQInfo
->
sql
,
" count(*)"
);
if
(
pcnt
)
return
0
;
char
*
pos
=
strstr
(
pQInfo
->
sql
,
" t_"
);
if
(
pos
)
{
pos
+=
3
;
ms
=
atoi
(
pos
);
while
(
*
pos
>=
'0'
&&
*
pos
<=
'9'
)
{
pos
++
;
while
(
*
pos
>=
'0'
&&
*
pos
<=
'9'
)
{
pos
++
;
}
char
unit_char
=
*
pos
;
if
(
unit_char
==
'h'
)
{
ms
*=
3600
*
1000
;
}
else
if
(
unit_char
==
'm'
)
{
ms
*=
60
*
1000
;
}
else
if
(
unit_char
==
's'
)
{
if
(
unit_char
==
'h'
)
{
ms
*=
3600
*
1000
;
}
else
if
(
unit_char
==
'm'
)
{
ms
*=
60
*
1000
;
}
else
if
(
unit_char
==
's'
)
{
ms
*=
1000
;
}
}
if
(
ms
==
0
)
return
0
;
if
(
ms
==
0
)
return
0
;
printf
(
"test wait sleep %dms. sql=%s ...
\n
"
,
ms
,
pQInfo
->
sql
);
if
(
ms
<
1000
)
{
if
(
ms
<
1000
)
{
taosMsleep
(
ms
);
}
else
{
int
used_ms
=
0
;
while
(
used_ms
<
ms
)
{
while
(
used_ms
<
ms
)
{
taosMsleep
(
1000
);
used_ms
+=
1000
;
if
(
isTaskKilled
(
pQInfo
))
{
if
(
isTaskKilled
(
pQInfo
))
{
printf
(
"test check query is canceled, sleep break.%s
\n
"
,
pQInfo
->
sql
);
break
;
}
...
...
@@ -106,15 +106,14 @@ int waitMoment(SQInfo* pQInfo){
}
#endif
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pRes
,
uint64_t
*
useconds
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pRes
,
uint64_t
*
useconds
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int64_t
threadId
=
taosGetSelfPthreadId
();
*
pRes
=
NULL
;
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pTaskInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"%s-%p execTask is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
qError
(
"%s-%p execTask is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
pTaskInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
pTaskInfo
->
code
;
}
...
...
@@ -133,12 +132,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
publishQueryAbortEvent
(
pTaskInfo
,
ret
);
pTaskInfo
->
code
=
ret
;
qDebug
(
"%s task abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
qDebug
(
"%s task abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
return
pTaskInfo
->
code
;
}
qDebug
(
"%s execTask is launched"
,
GET_TASKID
(
pTaskInfo
));
/*qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));*/
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
...
...
@@ -154,12 +152,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
*
useconds
=
pTaskInfo
->
cost
.
elapsedTime
;
}
int32_t
current
=
(
*
pRes
!=
NULL
)
?
(
*
pRes
)
->
info
.
rows
:
0
;
int32_t
current
=
(
*
pRes
!=
NULL
)
?
(
*
pRes
)
->
info
.
rows
:
0
;
pTaskInfo
->
totalRows
+=
current
;
cleanUpUdfs
();
qDebug
(
"%s task suspended, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d, elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
),
current
,
pTaskInfo
->
totalRows
,
0
,
el
/
1000
.
0
);
/*qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",*/
/*GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);*/
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
return
pTaskInfo
->
code
;
...
...
@@ -208,18 +206,17 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
qDebug
(
"%s execTask completed, numOfRows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
totalRows
);
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
qDebug
(
"%s execTask completed, numOfRows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
totalRows
);
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
doDestroyTask
(
pTaskInfo
);
}
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
int32_t
*
resNum
,
SExplainExecInfo
**
pRes
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
capacity
=
0
;
int32_t
capacity
=
0
;
return
getOperatorExplainExecInfo
(
pTaskInfo
->
pRoot
,
pRes
,
&
capacity
,
resNum
);
return
getOperatorExplainExecInfo
(
pTaskInfo
->
pRoot
,
pRes
,
&
capacity
,
resNum
);
}
source/libs/executor/src/executorimpl.c
浏览文件 @
a696ed58
...
...
@@ -106,7 +106,7 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
static
SColumnInfo
*
extractColumnFilterInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
*
numOfFilterCols
);
static
void
releaseQueryBuf
(
size_t
numOfTables
);
static
void
releaseQueryBuf
(
size_t
numOfTables
);
static
int32_t
getNumOfScanTimes
(
STaskAttr
*
pQueryAttr
);
...
...
@@ -154,8 +154,9 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
)
{}
static
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
);
static
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
);
static
void
initCtxOutputBuffer
(
SqlFunctionCtx
*
pCtx
,
int32_t
size
);
static
void
setResultBufSize
(
STaskAttr
*
pQueryAttr
,
SResultInfo
*
pResultInfo
);
...
...
@@ -342,14 +343,12 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
return
pResultRow
;
}
void
doClearWindow
(
SIntervalAggOperatorInfo
*
pInfo
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
int32_t
numOfOutput
)
{
void
doClearWindow
(
SIntervalAggOperatorInfo
*
pInfo
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
int32_t
numOfOutput
)
{
SAggSupporter
*
pSup
=
&
pInfo
->
aggSup
;
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
SResultRow
*
pResult
=
getResultRowByPos
(
pSup
->
pResultBuf
,
p1
);
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
SResultRow
*
pResult
=
getResultRowByPos
(
pSup
->
pResultBuf
,
p1
);
SqlFunctionCtx
*
pCtx
=
pInfo
->
binfo
.
pCtx
;
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
pCtx
[
i
].
resultInfo
=
getResultCell
(
pResult
,
i
,
pInfo
->
binfo
.
rowCellInfoOffset
);
...
...
@@ -599,8 +598,9 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64
(
pColData
,
4
,
&
pQueryWindow
->
ekey
);
}
void
doApplyFunctions
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
)
{
void
doApplyFunctions
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
)
{
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
// keep it temporarily
bool
hasAgg
=
pCtx
[
k
].
input
.
colDataAggIsSet
;
...
...
@@ -683,8 +683,8 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
}
}
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
if
(
pBlock
->
pBlockAgg
!=
NULL
)
{
doSetInputDataBlockInfo
(
pOperator
,
pCtx
,
pBlock
,
order
);
}
else
{
...
...
@@ -735,7 +735,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
}
static
int32_t
doSetInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
int32_t
scanFlag
,
bool
createDummyCol
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfExprs
;
++
i
)
{
...
...
@@ -743,7 +743,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pCtx
[
i
].
input
.
numOfRows
=
pBlock
->
info
.
rows
;
pCtx
[
i
].
pSrcBlock
=
pBlock
;
pCtx
[
i
].
scanFlag
=
scanFlag
;
pCtx
[
i
].
scanFlag
=
scanFlag
;
SInputColumnInfoData
*
pInput
=
&
pCtx
[
i
].
input
;
pInput
->
uid
=
pBlock
->
info
.
uid
;
...
...
@@ -1003,14 +1003,14 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return
false
;
}
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
// }
//
// // denote the order type
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// // return pCtx->param[0].i == pQueryAttr->order.order;
// }
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
// }
//
// // denote the order type
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// // return pCtx->param[0].i == pQueryAttr->order.order;
// }
// in the reverse table scan, only the following functions need to be executed
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
...
...
@@ -1128,16 +1128,16 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
}
else
if
(
fmIsAggFunc
(
pCtx
[
i
].
functionId
))
{
p
=
&
pCtx
[
i
];
}
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
// tagLen += pCtx[i].resDataInfo.bytes;
// pTagCtx[num++] = &pCtx[i];
// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
// // tag function may be the group by tag column
// // ts may be the required primary timestamp column
// continue;
// } else {
// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
// }
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
// tagLen += pCtx[i].resDataInfo.bytes;
// pTagCtx[num++] = &pCtx[i];
// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
// // tag function may be the group by tag column
// // ts may be the required primary timestamp column
// continue;
// } else {
// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
// }
}
if
(
p
!=
NULL
)
{
...
...
@@ -2015,7 +2015,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
}
static
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
int8_t
*
rowRes
,
bool
keep
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
SArray
*
pColMatchInfo
)
{
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
SArray
*
pColMatchInfo
)
{
if
(
pFilterNode
==
NULL
)
{
return
;
}
...
...
@@ -2133,8 +2133,9 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
* @param pQInfo
* @param result
*/
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
start
=
pGroupResInfo
->
index
;
...
...
@@ -2172,11 +2173,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
}
else
{
// expand the result into multiple rows. E.g., _wstartts, top(k, 20)
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
}
}
...
...
@@ -2192,11 +2193,12 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
return
0
;
}
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
)
{
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
)
{
assert
(
pGroupResInfo
->
currentGroup
<=
pGroupResInfo
->
totalGroup
);
SExprInfo
*
pExprInfo
=
pOperator
->
pExpr
;
int32_t
numOfExprs
=
pOperator
->
numOfExprs
;
SExprInfo
*
pExprInfo
=
pOperator
->
pExpr
;
int32_t
numOfExprs
=
pOperator
->
numOfExprs
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
*
rowCellOffset
=
pbInfo
->
rowCellInfoOffset
;
...
...
@@ -3215,7 +3217,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SExchangeInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExchangeInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -3328,7 +3330,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
static
void
doMergeResultImpl
(
SSortedMergeOperatorInfo
*
pInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
,
int32_t
rowIndex
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
// TODO set row index
// pCtx[j].startRow = rowIndex;
// pCtx[j].startRow = rowIndex;
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
...
...
@@ -3379,7 +3381,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
SqlFunctionCtx
*
pCtx
=
pInfo
->
binfo
.
pCtx
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
// pCtx[i].size = 1;
// pCtx[i].size = 1;
}
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
...
...
@@ -3605,7 +3607,7 @@ _error:
return
NULL
;
}
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
)
{
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
)
{
// todo add more information about exchange operation
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
)
{
*
order
=
TSDB_ORDER_ASC
;
...
...
@@ -3635,7 +3637,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SAggOperatorInfo
*
pAggInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pInfo
=
&
pAggInfo
->
binfo
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
...
...
@@ -3975,7 +3977,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -4225,7 +4228,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableGroupInfo
->
pGroupList
);
++
i
)
{
SArray
*
pa
=
taosArrayGetP
(
pTableGroupInfo
->
pGroupList
,
i
);
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pa
);
++
j
)
{
STableKeyInfo
*
pk
=
taosArrayGet
(
pa
,
j
);
STableKeyInfo
*
pk
=
taosArrayGet
(
pa
,
j
);
STableQueryInfo
*
pTQueryInfo
=
&
pTableQueryInfo
[
index
++
];
pTQueryInfo
->
lastKey
=
pk
->
lastKey
;
}
...
...
@@ -4361,9 +4364,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
goto
_error
;
}
pInfo
->
limit
=
*
pLimit
;
pInfo
->
slimit
=
*
pSlimit
;
pInfo
->
curOffset
=
pLimit
->
offset
;
pInfo
->
limit
=
*
pLimit
;
pInfo
->
slimit
=
*
pSlimit
;
pInfo
->
curOffset
=
pLimit
->
offset
;
pInfo
->
curSOffset
=
pSlimit
->
offset
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
...
...
@@ -4503,10 +4506,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa
}
pCol
->
slotId
=
slotId
;
pCol
->
colId
=
colId
;
pCol
->
bytes
=
pType
->
bytes
;
pCol
->
type
=
pType
->
type
;
pCol
->
scale
=
pType
->
scale
;
pCol
->
colId
=
colId
;
pCol
->
bytes
=
pType
->
bytes
;
pCol
->
type
=
pType
->
type
;
pCol
->
scale
=
pType
->
scale
;
pCol
->
precision
=
pType
->
precision
;
pCol
->
dataBlockId
=
blockId
;
...
...
@@ -4581,10 +4584,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
if
(
strcmp
(
pExp
->
pExpr
->
_function
.
functionName
,
"tbname"
)
==
0
)
{
pFuncNode
->
pParameterList
=
nodesMakeList
();
ASSERT
(
LIST_LENGTH
(
pFuncNode
->
pParameterList
)
==
0
);
SValueNode
*
res
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
if
(
NULL
==
res
)
{
// todo handle error
SValueNode
*
res
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
if
(
NULL
==
res
)
{
// todo handle error
}
else
{
res
->
node
.
resType
=
(
SDataType
)
{.
bytes
=
sizeof
(
int64_t
),
.
type
=
TSDB_DATA_TYPE_BIGINT
};
res
->
node
.
resType
=
(
SDataType
){.
bytes
=
sizeof
(
int64_t
),
.
type
=
TSDB_DATA_TYPE_BIGINT
};
nodesListAppend
(
pFuncNode
->
pParameterList
,
res
);
}
}
...
...
@@ -4677,7 +4680,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pExchange
->
node
.
pOutputDataBlockDesc
);
return
createExchangeOperatorInfo
(
pHandle
->
pMsgCb
->
clientRpc
,
pExchange
->
pSrcEndPoints
,
pResBlock
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
==
type
)
{
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
// simple child table.
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
// simple child table.
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
int32_t
numOfCols
=
0
;
...
...
@@ -4686,27 +4689,27 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if
(
pHandle
->
vnode
)
{
pDataReader
=
doCreateDataReader
(
pTableScanNode
,
pHandle
,
pTableGroupInfo
,
(
uint64_t
)
queryId
,
taskId
);
}
else
{
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
}
if
(
pDataReader
==
NULL
&&
terrno
!=
0
)
{
qDebug
(
"pDataReader is NULL"
);
/*qDebug("pDataReader is NULL");*/
// return NULL;
}
else
{
qDebug
(
"pDataReader is not NULL"
);
/*qDebug("pDataReader is not NULL");*/
}
SDataBlockDescNode
*
pDescNode
=
pScanPhyNode
->
node
.
pOutputDataBlockDesc
;
SOperatorInfo
*
pOperatorDumy
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
);
SOperatorInfo
*
pOperatorDumy
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
);
SArray
*
tableIdList
=
extractTableIdList
(
pTableGroupInfo
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pDescNode
);
SArray
*
pCols
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
->
reader
,
pDataReader
,
pHandle
,
pScanPhyNode
->
uid
,
pResBlock
,
pCols
,
tableIdList
,
pTaskInfo
,
pScanPhyNode
->
node
.
pConditions
,
pOperatorDumy
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
->
reader
,
pDataReader
,
pHandle
,
pScanPhyNode
->
uid
,
pResBlock
,
pCols
,
tableIdList
,
pTaskInfo
,
pScanPhyNode
->
node
.
pConditions
,
pOperatorDumy
);
taosArrayDestroy
(
tableIdList
);
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
...
...
@@ -4855,7 +4858,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t
tsSlotId
=
((
SColumnNode
*
)
pStateNode
->
window
.
pTspk
)
->
slotId
;
SColumnNode
*
pColNode
=
(
SColumnNode
*
)((
STargetNode
*
)
pStateNode
->
pStateKey
)
->
pExpr
;
SColumn
col
=
extractColumnFromColumnNode
(
pColNode
);
SColumn
col
=
extractColumnFromColumnNode
(
pColNode
);
pOptr
=
createStatewindowOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
as
,
tsSlotId
,
&
col
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_JOIN
==
type
)
{
SJoinPhysiNode
*
pJoinNode
=
(
SJoinPhysiNode
*
)
pPhyNode
;
...
...
@@ -4923,11 +4926,11 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
SColumn
extractColumnFromColumnNode
(
SColumnNode
*
pColNode
)
{
SColumn
c
=
{
0
};
c
.
slotId
=
pColNode
->
slotId
;
c
.
colId
=
pColNode
->
colId
;
c
.
type
=
pColNode
->
node
.
resType
.
type
;
c
.
bytes
=
pColNode
->
node
.
resType
.
bytes
;
c
.
scale
=
pColNode
->
node
.
resType
.
scale
;
c
.
slotId
=
pColNode
->
slotId
;
c
.
colId
=
pColNode
->
colId
;
c
.
type
=
pColNode
->
node
.
resType
.
type
;
c
.
bytes
=
pColNode
->
node
.
resType
.
bytes
;
c
.
scale
=
pColNode
->
node
.
resType
.
scale
;
c
.
precision
=
pColNode
->
node
.
resType
.
precision
;
return
c
;
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
a696ed58
...
...
@@ -9,11 +9,11 @@
#include "tmsg.h"
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
true
};
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
true
};
SQWorkerMgmt
gQwMgmt
=
{
.
lock
=
0
,
.
qwRef
=
-
1
,
.
qwNum
=
0
,
.
lock
=
0
,
.
qwRef
=
-
1
,
.
qwNum
=
0
,
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
...
...
@@ -110,9 +110,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
QW_LOCK
(
QW_READ
,
&
mgmt
->
schLock
);
QW_DUMP
(
"total remain schduler num:%d"
,
taosHashGetSize
(
mgmt
->
schHash
));
/*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/
void
*
key
=
NULL
;
void
*
key
=
NULL
;
size_t
keyLen
=
0
;
int32_t
i
=
0
;
SQWSchStatus
*
sch
=
NULL
;
...
...
@@ -127,7 +127,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
QW_UNLOCK
(
QW_READ
,
&
mgmt
->
schLock
);
QW_DUMP
(
"total remain ctx num:%d"
,
taosHashGetSize
(
mgmt
->
ctxHash
));
/*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/
}
char
*
qwPhaseStr
(
int32_t
phase
)
{
...
...
@@ -462,7 +462,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
}
int32_t
qwDropTaskStatus
(
QW_FPARAMS_DEF
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
@@ -499,7 +499,7 @@ _return:
}
int32_t
qwUpdateTaskStatus
(
QW_FPARAMS_DEF
,
int8_t
status
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
@@ -550,11 +550,11 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t
qwExecTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
queryEnd
)
{
int32_t
code
=
0
;
bool
qcontinue
=
true
;
SSDataBlock
*
pRes
=
NULL
;
SSDataBlock
*
pRes
=
NULL
;
uint64_t
useconds
=
0
;
int32_t
i
=
0
;
int32_t
execNum
=
0
;
qTaskInfo_t
*
taskHandle
=
&
ctx
->
taskHandle
;
qTaskInfo_t
*
taskHandle
=
&
ctx
->
taskHandle
;
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
while
(
true
)
{
...
...
@@ -632,7 +632,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
void
*
key
=
NULL
;
void
*
key
=
NULL
;
size_t
keyLen
=
0
;
int32_t
i
=
0
;
STaskStatus
status
=
{
0
};
...
...
@@ -719,8 +719,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
}
int32_t
qwHandlePrePhaseEvents
(
QW_FPARAMS_DEF
,
int8_t
phase
,
SQWPhaseInput
*
input
,
SQWPhaseOutput
*
output
)
{
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
SRpcHandleInfo
*
dropConnection
=
NULL
;
SRpcHandleInfo
*
cancelConnection
=
NULL
;
...
...
@@ -925,13 +925,13 @@ _return:
}
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
int8_t
taskType
,
int8_t
explain
)
{
int32_t
code
=
0
;
bool
queryRsped
=
false
;
SSubplan
*
plan
=
NULL
;
SQWPhaseInput
input
=
{
0
};
qTaskInfo_t
pTaskInfo
=
NULL
;
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
bool
queryRsped
=
false
;
SSubplan
*
plan
=
NULL
;
SQWPhaseInput
input
=
{
0
};
qTaskInfo_t
pTaskInfo
=
NULL
;
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
QW_ERR_JRET
(
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
...
...
@@ -944,7 +944,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
ctx
->
ctrlConnInfo
=
qwMsg
->
connInfo
;
QW_TASK_DLOGL
(
"subplan json string, len:%d, %s"
,
qwMsg
->
msgLen
,
qwMsg
->
msg
);
/*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
code
=
qStringToSubplan
(
qwMsg
->
msg
,
&
plan
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
...
...
@@ -1055,10 +1055,10 @@ _return:
}
int32_t
qwProcessCQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
SQWTaskCtx
*
ctx
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
SQWPhaseInput
input
=
{
0
};
void
*
rsp
=
NULL
;
void
*
rsp
=
NULL
;
int32_t
dataLen
=
0
;
bool
queryEnd
=
false
;
...
...
@@ -1138,8 +1138,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t
code
=
0
;
int32_t
dataLen
=
0
;
bool
locked
=
false
;
SQWTaskCtx
*
ctx
=
NULL
;
void
*
rsp
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
void
*
rsp
=
NULL
;
SQWPhaseInput
input
=
{
0
};
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_FETCH
,
&
input
,
NULL
));
...
...
@@ -1274,7 +1274,7 @@ _return:
int32_t
qwProcessHbLinkBroken
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
QW_ERR_RET
(
qwAcquireAddScheduler
(
mgmt
,
req
->
sId
,
QW_READ
,
&
sch
));
...
...
@@ -1300,7 +1300,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
int32_t
qwProcessHb
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
if
(
qwMsg
->
code
)
{
QW_RET
(
qwProcessHbLinkBroken
(
mgmt
,
qwMsg
,
req
));
...
...
@@ -1338,28 +1338,28 @@ _return:
qwMsg
->
connInfo
.
handle
=
NULL
;
}
QW_DLOG
(
"hb rsp send, handle:%p, code:%x - %s"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
/*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
QW_RET
(
TSDB_CODE_SUCCESS
);
}
void
qwProcessHbTimerEvent
(
void
*
param
,
void
*
tmrId
)
{
SQWHbParam
*
hbParam
=
(
SQWHbParam
*
)
param
;
SQWHbParam
*
hbParam
=
(
SQWHbParam
*
)
param
;
if
(
hbParam
->
qwrId
!=
atomic_load_32
(
&
gQwMgmt
.
qwRef
))
{
return
;
}
int64_t
refId
=
hbParam
->
refId
;
int64_t
refId
=
hbParam
->
refId
;
SQWorker
*
mgmt
=
qwAcquire
(
refId
);
if
(
NULL
==
mgmt
)
{
QW_DLOG
(
"qwAcquire %"
PRIx64
"failed"
,
refId
);
taosMemoryFree
(
param
);
return
;
}
SQWSchStatus
*
sch
=
NULL
;
int32_t
taskNum
=
0
;
SQWHbInfo
*
rspList
=
NULL
;
SQWHbInfo
*
rspList
=
NULL
;
int32_t
code
=
0
;
qwDbgDumpMgmtInfo
(
mgmt
);
...
...
@@ -1383,7 +1383,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
return
;
}
void
*
key
=
NULL
;
void
*
key
=
NULL
;
size_t
keyLen
=
0
;
int32_t
i
=
0
;
...
...
@@ -1413,29 +1413,27 @@ _return:
for
(
int32_t
j
=
0
;
j
<
i
;
++
j
)
{
qwBuildAndSendHbRsp
(
&
rspList
[
j
].
connInfo
,
&
rspList
[
j
].
rsp
,
code
);
QW_DLOG
(
"hb rsp send, handle:%p, code:%x - %s, taskNum:%d"
,
rspList
[
j
].
connInfo
.
handle
,
code
,
tstrerror
(
code
),
(
rspList
[
j
].
rsp
.
taskStatus
?
(
int32_t
)
taosArrayGetSize
(
rspList
[
j
].
rsp
.
taskStatus
)
:
0
));
/*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
/*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
tFreeSSchedulerHbRsp
(
&
rspList
[
j
].
rsp
);
}
taosMemoryFreeClear
(
rspList
);
taosTmrReset
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
param
,
mgmt
->
timer
,
&
mgmt
->
hbTimer
);
qwRelease
(
refId
);
qwRelease
(
refId
);
}
void
qwCloseRef
(
void
)
{
taosWLockLatch
(
&
gQwMgmt
.
lock
);
if
(
atomic_load_32
(
&
gQwMgmt
.
qwNum
)
<=
0
&&
gQwMgmt
.
qwRef
>=
0
)
{
taosCloseRef
(
gQwMgmt
.
qwRef
);
gQwMgmt
.
qwRef
=
-
1
;
gQwMgmt
.
qwRef
=
-
1
;
}
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
}
void
qwDestroySchStatus
(
SQWSchStatus
*
pStatus
)
{
taosHashCleanup
(
pStatus
->
tasksHash
);
}
void
qwDestroySchStatus
(
SQWSchStatus
*
pStatus
)
{
taosHashCleanup
(
pStatus
->
tasksHash
);
}
void
qwDestroyImpl
(
void
*
pMgmt
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
...
...
@@ -1454,12 +1452,12 @@ void qwDestroyImpl(void *pMgmt) {
SQWSchStatus
*
sch
=
(
SQWSchStatus
*
)
pIter
;
qwDestroySchStatus
(
sch
);
pIter
=
taosHashIterate
(
mgmt
->
schHash
,
pIter
);
}
}
taosHashCleanup
(
mgmt
->
schHash
);
taosMemoryFree
(
mgmt
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
qwCloseRef
();
}
...
...
@@ -1467,7 +1465,7 @@ void qwDestroyImpl(void *pMgmt) {
int32_t
qwOpenRef
(
void
)
{
taosWLockLatch
(
&
gQwMgmt
.
lock
);
if
(
gQwMgmt
.
qwRef
<
0
)
{
gQwMgmt
.
qwRef
=
taosOpenRef
(
100
,
qwDestroyImpl
);
gQwMgmt
.
qwRef
=
taosOpenRef
(
100
,
qwDestroyImpl
);
if
(
gQwMgmt
.
qwRef
<
0
)
{
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
qError
(
"init qworker ref failed"
);
...
...
@@ -1475,14 +1473,14 @@ int32_t qwOpenRef(void) {
}
}
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
return
TSDB_CODE_SUCCESS
;
}
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
)
{
int32_t
paramIdx
=
0
;
int32_t
newParamIdx
=
0
;
while
(
true
)
{
paramIdx
=
atomic_load_32
(
&
gQwMgmt
.
paramIdx
);
if
(
paramIdx
==
tListLen
(
gQwMgmt
.
param
))
{
...
...
@@ -1490,7 +1488,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
}
else
{
newParamIdx
=
paramIdx
+
1
;
}
if
(
paramIdx
==
atomic_val_compare_exchange_32
(
&
gQwMgmt
.
paramIdx
,
paramIdx
,
newParamIdx
))
{
break
;
}
...
...
@@ -1577,12 +1575,12 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
SQWHbParam
*
param
=
NULL
;
qwSetHbParam
(
mgmt
->
refId
,
&
param
);
mgmt
->
hbTimer
=
taosTmrStart
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
(
void
*
)
param
,
mgmt
->
timer
);
mgmt
->
hbTimer
=
taosTmrStart
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
(
void
*
)
param
,
mgmt
->
timer
);
if
(
NULL
==
mgmt
->
hbTimer
)
{
qError
(
"start hb timer failed"
);
QW_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
*
qWorkerMgmt
=
mgmt
;
qDebug
(
"qworker initialized for node, type:%d, id:%d, handle:%p"
,
mgmt
->
nodeType
,
mgmt
->
nodeId
,
mgmt
);
...
...
@@ -1599,9 +1597,9 @@ _return:
taosTmrCleanUp
(
mgmt
->
timer
);
taosMemoryFreeClear
(
mgmt
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
}
QW_RET
(
code
);
}
...
...
@@ -1678,7 +1676,7 @@ int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64
}
int32_t
qwGetTaskStatus
(
SQWorker
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int8_t
*
taskStatus
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
@@ -1705,7 +1703,7 @@ int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
}
int32_t
qwCancelTask
(
SQWorker
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
tests/script/tsim/tstream/basic1.sim
浏览文件 @
a696ed58
...
...
@@ -24,7 +24,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 1000
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
...
...
@@ -137,7 +137,7 @@ endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
...
...
@@ -250,7 +250,7 @@ endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
...
...
@@ -280,7 +280,7 @@ endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
...
...
@@ -312,7 +312,7 @@ sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
...
...
@@ -344,7 +344,7 @@ sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
if $data21 != 2 then
...
...
@@ -374,7 +374,7 @@ endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
...
...
@@ -404,7 +404,7 @@ endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录