Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8e409f81
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8e409f81
编写于
3月 25, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add backend
上级
b17f99de
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
219 addition
and
26 deletion
+219
-26
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+6
-5
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+178
-10
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+21
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-1
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+3
-1
source/libs/stream/src/streamStateRocksdb.c
source/libs/stream/src/streamStateRocksdb.c
+8
-6
tests/script/tsim/stream/basic0.sim
tests/script/tsim/stream/basic0.sim
+1
-1
tests/script/tsim/stream/basic1.sim
tests/script/tsim/stream/basic1.sim
+1
-1
未找到文件。
source/libs/executor/inc/executil.h
浏览文件 @
8e409f81
...
...
@@ -15,6 +15,7 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
#include "executor.h"
#include "function.h"
#include "nodes.h"
#include "plannodes.h"
...
...
@@ -22,7 +23,6 @@
#include "tpagedbuf.h"
#include "tsimplehash.h"
#include "vnode.h"
#include "executor.h"
#define T_LONG_JMP(_obj, _c) \
do { \
...
...
@@ -37,7 +37,7 @@
memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
} while (0)
#define GET_RES_WINDOW_KEY_LEN(_l)
((_l) + sizeof(uint64_t))
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
...
...
@@ -67,7 +67,7 @@ typedef struct SResultRowPosition {
typedef
struct
SResKeyPos
{
SResultRowPosition
pos
;
uint64_t
groupId
;
char
key
[];
char
key
[];
}
SResKeyPos
;
typedef
struct
SResultRowInfo
{
...
...
@@ -88,12 +88,13 @@ typedef struct SColMatchInfo {
int32_t
matchType
;
// determinate the source according to col id or slot id
}
SColMatchInfo
;
typedef
struct
SExecTaskInfo
SExecTaskInfo
;
typedef
struct
SExecTaskInfo
SExecTaskInfo
;
typedef
struct
STableListInfo
STableListInfo
;
struct
SqlFunctionCtx
;
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
SExecTaskInfo
*
pTaskInfo
);
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
SExecTaskInfo
*
pTaskInfo
);
STableListInfo
*
tableListCreate
();
void
*
tableListDestroy
(
STableListInfo
*
pTableListInfo
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8e409f81
...
...
@@ -1065,8 +1065,12 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr
bool
returnNotNull
=
false
;
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
SResultRowEntryInfo
*
pResInfo
=
getResultEntryInfo
(
pRow
,
j
,
rowEntryOffset
);
qWarn
(
"offset: idx: %d, val: %d"
,
j
,
rowEntryOffset
[
j
]);
if
(
!
isRowEntryInitialized
(
pResInfo
))
{
qWarn
(
"no result"
);
continue
;
}
else
{
qWarn
(
"has result"
);
}
if
(
pRow
->
numOfRows
<
pResInfo
->
numOfRes
)
{
...
...
@@ -2569,6 +2573,158 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
return
TSDB_CODE_SUCCESS
;
}
void
resultRowToString
(
void
*
row
,
int32_t
size
,
char
*
buf
)
{
SResultRow
*
p
=
row
;
int32_t
len
=
0
;
len
+=
sprintf
(
buf
+
len
,
"pageId:%d, offset:%d, startInterp:%d, endInterp:%d, closed:%d, numOfRows:%d, skey:%"
PRId64
", ekey:%"
PRId64
,
p
->
pageId
,
p
->
offset
,
p
->
startInterp
,
p
->
endInterp
,
p
->
closed
,
p
->
numOfRows
,
p
->
win
.
skey
,
p
->
win
.
ekey
);
int32_t
numOfEntryInfo
=
(
size
-
sizeof
(
SResultRow
))
/
sizeof
(
struct
SResultRowEntryInfo
);
len
+=
sprintf
(
buf
+
len
,
", entryInfo size:%d"
,
numOfEntryInfo
);
for
(
int
i
=
0
;
i
<
numOfEntryInfo
;
i
++
)
{
SResultRowEntryInfo
*
pInfo
=
&
p
->
pEntryInfo
[
i
];
if
(
len
>=
200
*
size
-
64
||
i
>=
5
)
{
break
;
}
len
+=
sprintf
(
buf
+
len
,
"[inited:%d, complete:%d, nullRes:%d, numOfRes:%d]"
,
pInfo
->
initialized
,
pInfo
->
complete
,
pInfo
->
isNullRes
,
pInfo
->
numOfRes
);
}
}
/*
*
*/
int32_t
resultRowEncode
(
void
*
k
,
int32_t
*
size
,
char
*
buf
)
{
// SResultRow* key = k;
// int len = 0;
// int struLen = *size;
// len += taosEncodeFixedI32((void**)&buf, key->pageId);
// uint32_t offset = key->offset;
// len += taosEncodeFixedU32((void**)&buf, offset);
// len += taosEncodeFixedI8((void**)&buf, key->startInterp);
// len += taosEncodeFixedI8((void**)&buf, key->endInterp);
// len += taosEncodeFixedI8((void**)&buf, key->closed);
// len += taosEncodeFixedU32((void**)&buf, key->numOfRows);
// len += taosEncodeFixedI64((void**)&buf, key->win.skey);
// len += taosEncodeFixedI64((void**)&buf, key->win.ekey);
// int32_t numOfEntryInfo = (struLen - sizeof(SResultRow)) / sizeof(struct SResultRowEntryInfo);
// len += taosEncodeFixedI32((void**)&buf, numOfEntryInfo);
// for (int i = 0; i < numOfEntryInfo; i++) {
// SResultRowEntryInfo* p = &key->pEntryInfo[i];
// uint8_t value = p->initialized ? 1 : 0;
// len += taosEncodeFixedU8((void**)&buf, value);
// value = p->complete ? 1 : 0;
// len += taosEncodeFixedU8((void**)&buf, value);
// value = p->isNullRes;
// len += taosEncodeFixedU8((void**)&buf, value);
// len += taosEncodeFixedU16((void**)&buf, p->numOfRes);
// }
// {
// char* strBuf = taosMemoryCalloc(1, *size * 100);
// resultRowToString(key, *size, strBuf);
// qWarn("encode result row:%s", strBuf);
// }
// return len;
return
0
;
}
int32_t
resultRowDecode
(
void
**
k
,
size_t
size
,
char
*
buf
)
{
// char* p1 = buf;
// int32_t numOfEntryInfo = 0;
// uint32_t entryOffset = sizeof(int32_t) + sizeof(uint32_t) + sizeof(int8_t) + sizeof(int8_t) + sizeof(int8_t) +
// sizeof(uint32_t) + sizeof(int64_t) + sizeof(int64_t);
// taosDecodeFixedI32(p1 + entryOffset, &numOfEntryInfo);
// char* p = buf;
// size = sizeof(SResultRow) + numOfEntryInfo * sizeof(SResultRowEntryInfo);
// SResultRow* key = taosMemoryCalloc(1, size);
// p = taosDecodeFixedI32(p, (int32_t*)&key->pageId);
// uint32_t offset = 0;
// p = taosDecodeFixedU32(p, &offset);
// key->offset = offset;
// p = taosDecodeFixedI8(p, (int8_t*)(&key->startInterp));
// p = taosDecodeFixedI8(p, (int8_t*)(&key->endInterp));
// p = taosDecodeFixedI8(p, (int8_t*)&key->closed);
// p = taosDecodeFixedU32(p, &key->numOfRows);
// p = taosDecodeFixedI64(p, &key->win.skey);
// p = taosDecodeFixedI64(p, &key->win.ekey);
// p = taosDecodeFixedI32(p, &numOfEntryInfo);
// for (int i = 0; i < numOfEntryInfo; i++) {
// SResultRowEntryInfo* pInfo = &key->pEntryInfo[i];
// uint8_t value = 0;
// p = taosDecodeFixedU8(p, &value);
// pInfo->initialized = (value == 1) ? true : false;
// p = taosDecodeFixedU8(p, &value);
// pInfo->complete = (value == 1) ? true : false;
// p = taosDecodeFixedU8(p, &value);
// pInfo->isNullRes = value;
// p = taosDecodeFixedU16(p, &pInfo->numOfRes);
// }
// *k = key;
// {
// char* strBuf = taosMemoryCalloc(1, size * 100);
// resultRowToString(key, size, strBuf);
// qWarn("decode result row:%s", strBuf);
// }
// return size;
return
0
;
}
int32_t
saveOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
,
int32_t
resSize
)
{
// char* buf = taosMemoryCalloc(1, resSize * 10);
// int len = resultRowEncode((void*)pResult, &resSize, buf);
// char* buf = taosMemoryCalloc(1, resSize);
// memcpy(buf, pResult, resSize);
streamStatePut
(
pState
,
pKey
,
(
char
*
)
pResult
,
resSize
);
// taosMemoryFree(buf);
return
TSDB_CODE_SUCCESS
;
}
int32_t
getOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
**
pResult
,
int32_t
*
resSize
)
{
char
*
pVal
=
NULL
;
int32_t
size
=
0
;
int32_t
code
=
streamStateGet
(
pState
,
pKey
,
(
void
**
)
&
pVal
,
&
size
);
if
(
code
!=
0
)
{
return
0
;
}
*
pResult
=
(
SResultRow
*
)
pVal
;
// memcpy((char*)*pResult, (char*)pVal, size);
// int tlen = resultRowDecode((void**)pResult, size, pVal);
*
resSize
=
size
;
return
code
;
}
int32_t
streamStateAddIfNotExist2
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
qWarn
(
"streamStateAddIfNotExist"
);
char
*
tVal
=
NULL
;
int32_t
size
=
0
;
int32_t
code
=
streamStateGet
(
pState
,
key
,
(
void
**
)
&
tVal
,
&
size
);
if
(
code
!=
0
)
{
*
pVal
=
taosMemoryCalloc
(
1
,
*
pVLen
);
}
else
{
*
pVal
=
(
void
*
)
tVal
;
// resultRowDecode((void**)pVal, size, tVal);
*
pVLen
=
size
;
}
return
0
;
}
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
)
{
...
...
@@ -2579,8 +2735,10 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
char
*
value
=
NULL
;
int32_t
size
=
pAggSup
->
resultRowSize
;
if
(
streamStateAddIfNotExist
(
pState
,
&
key
,
(
void
**
)
&
value
,
&
size
)
<
0
)
{
if
(
streamStateAddIfNotExist
2
(
pState
,
&
key
,
(
void
**
)
&
value
,
&
size
)
<
0
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
// getOutputBuf(pState, &key, (SResultRow**)&value, &size);
}
*
pResult
=
(
SResultRow
*
)
value
;
// set time window for current result
...
...
@@ -2594,12 +2752,6 @@ int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResul
return
TSDB_CODE_SUCCESS
;
}
int32_t
saveOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
,
int32_t
resSize
)
{
qWarn
(
"write to stream state"
);
streamStatePut
(
pState
,
pKey
,
pResult
,
resSize
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
buildDataBlockFromGroupRes
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -2614,30 +2766,38 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
SWinKey
*
pKey
=
taosArrayGet
(
pGroupResInfo
->
pRows
,
i
);
int32_t
size
=
0
;
void
*
pVal
=
NULL
;
int32_t
code
=
streamStateGet
(
pState
,
pKey
,
&
pVal
,
&
size
);
int32_t
code
=
getOutputBuf
(
pState
,
pKey
,
(
SResultRow
**
)
&
pVal
,
&
size
);
// streamStateGet(pState, pKey, &pVal, &size);
ASSERT
(
code
==
0
);
SResultRow
*
pRow
=
(
SResultRow
*
)
pVal
;
doUpdateNumOfRows
(
pCtx
,
pRow
,
numOfExprs
,
rowEntryOffset
);
// no results, continue to check the next one
qWarn
(
"indx 1"
);
if
(
pRow
->
numOfRows
==
0
)
{
pGroupResInfo
->
index
+=
1
;
qWarn
(
"indx 2"
);
releaseOutputBuf
(
pState
,
pKey
,
pRow
);
continue
;
}
qWarn
(
"indx 3"
);
if
(
pBlock
->
info
.
id
.
groupId
==
0
)
{
pBlock
->
info
.
id
.
groupId
=
pKey
->
groupId
;
void
*
tbname
=
NULL
;
qWarn
(
"indx 4"
);
if
(
streamStateGetParName
(
pTaskInfo
->
streamInfo
.
pState
,
pBlock
->
info
.
id
.
groupId
,
&
tbname
)
<
0
)
{
qWarn
(
"indx 5"
);
pBlock
->
info
.
parTbName
[
0
]
=
0
;
}
else
{
qWarn
(
"indx 6"
);
memcpy
(
pBlock
->
info
.
parTbName
,
tbname
,
TSDB_TABLE_NAME_LEN
);
}
qWarn
(
"indx 7"
);
streamFreeVal
(
tbname
);
}
else
{
// current value belongs to different group, it can't be packed into one datablock
if
(
pBlock
->
info
.
id
.
groupId
!=
pKey
->
groupId
)
{
releaseOutputBuf
(
pState
,
pKey
,
pRow
);
qWarn
(
"indx 8"
);
break
;
}
}
...
...
@@ -2647,28 +2807,36 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
releaseOutputBuf
(
pState
,
pKey
,
pRow
);
break
;
}
qWarn
(
"indx 10"
);
pGroupResInfo
->
index
+=
1
;
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
int32_t
slotId
=
pExprInfo
[
j
].
base
.
resSchema
.
slotId
;
qWarn
(
"indx 10"
);
pCtx
[
j
].
resultInfo
=
getResultEntryInfo
(
pRow
,
j
,
rowEntryOffset
);
SResultRowEntryInfo
*
pEnryInfo
=
pCtx
[
j
].
resultInfo
;
qWarn
(
"initd:%d, complete:%d, null:%d, res:%d"
,
pEnryInfo
->
initialized
,
pEnryInfo
->
complete
,
pEnryInfo
->
isNullRes
,
pEnryInfo
->
numOfRes
);
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
qWarn
(
"indx 14"
);
int32_t
code1
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
TAOS_FAILED
(
code1
))
{
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code1
));
T_LONG_JMP
(
pTaskInfo
->
env
,
code1
);
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
qWarn
(
"indx 11"
);
// do nothing, todo refactor
}
else
{
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
qWarn
(
"indx 12"
);
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataSetVal
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
qWarn
(
"indx 13"
);
}
}
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
8e409f81
...
...
@@ -16,10 +16,12 @@
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "query.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "tlog.h"
#include "ttime.h"
#define IS_FINAL_OP(op) ((op)->isFinal)
...
...
@@ -2152,6 +2154,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
if
(
num
==
0
)
{
int32_t
code
=
setOutputBuf
(
pInfo
->
pState
,
&
parentWin
,
&
pCurResult
,
pWinRes
->
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
);
ASSERT
(
pCurResult
!=
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pCurResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_OUT_OF_MEMORY
);
}
...
...
@@ -2183,6 +2186,7 @@ bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pS
if
(
pWin
->
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
deleteMark
)
{
SWinKey
key
=
{.
ts
=
pWin
->
skey
,
.
groupId
=
groupId
};
if
(
streamStateGet
(
pState
,
&
key
,
NULL
,
0
)
==
TSDB_CODE_SUCCESS
)
{
qWarn
(
"get from dele"
);
return
false
;
}
return
true
;
...
...
@@ -2349,6 +2353,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
SResultRow
*
pResult
=
NULL
;
int32_t
forwardRows
=
0
;
int
stepTrace
=
0
;
qWarn
(
"step1 %d"
,
stepTrace
++
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
...
...
@@ -2361,14 +2367,17 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
nextWin
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
TSDB_ORDER_ASC
);
}
while
(
1
)
{
qWarn
(
"step1 %d"
,
stepTrace
++
);
bool
isClosed
=
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
);
if
((
pInfo
->
ignoreExpiredData
&&
isClosed
)
||
!
inSlidingWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pSDataBlock
->
info
))
{
startPos
=
getNexWindowPos
(
&
pInfo
->
interval
,
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
&
nextWin
);
if
(
startPos
<
0
)
{
qWarn
(
"step1 %d"
,
stepTrace
++
);
break
;
}
continue
;
}
qWarn
(
"step1 %d"
,
stepTrace
++
);
if
(
IS_FINAL_OP
(
pInfo
)
&&
isClosed
&&
pInfo
->
pChildren
)
{
bool
ignore
=
true
;
...
...
@@ -2399,6 +2408,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
ignore
=
false
;
}
}
qWarn
(
"step1 %d"
,
stepTrace
++
);
if
(
ignore
)
{
startPos
=
getNexWindowPos
(
&
pInfo
->
interval
,
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
&
nextWin
);
...
...
@@ -2408,23 +2418,27 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
continue
;
}
}
qWarn
(
"step1 %d"
,
stepTrace
++
);
int32_t
code
=
setOutputBuf
(
pInfo
->
pState
,
&
nextWin
,
&
pResult
,
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
qWarn
(
"step1 %d"
,
stepTrace
++
);
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_OUT_OF_MEMORY
);
}
qWarn
(
"step1 %d"
,
stepTrace
++
);
if
(
IS_FINAL_OP
(
pInfo
))
{
forwardRows
=
1
;
}
else
{
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
}
qWarn
(
"step1 %d"
,
stepTrace
++
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pUpdatedMap
)
{
saveWinResultInfo
(
pResult
->
win
.
skey
,
groupId
,
pUpdatedMap
);
}
qWarn
(
"step1 %d"
,
stepTrace
++
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
SWinKey
key
=
{
.
ts
=
pResult
->
win
.
skey
,
...
...
@@ -2432,6 +2446,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
};
tSimpleHashPut
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
key
,
sizeof
(
SWinKey
),
NULL
,
0
);
}
qWarn
(
"step1 %d"
,
stepTrace
++
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
applyAggFunctionOnPartialTuples
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pSDataBlock
->
info
.
rows
,
numOfOutput
);
...
...
@@ -2439,6 +2455,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
.
ts
=
nextWin
.
skey
,
.
groupId
=
groupId
,
};
qWarn
(
"step1 %d"
,
stepTrace
++
);
saveOutputBuf
(
pInfo
->
pState
,
&
key
,
pResult
,
pInfo
->
aggSup
.
resultRowSize
);
releaseOutputBuf
(
pInfo
->
pState
,
&
key
,
pResult
);
if
(
pInfo
->
delKey
.
ts
>
key
.
ts
)
{
...
...
@@ -2457,6 +2475,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
pSDataBlock
->
info
.
id
.
uid
,
pSDataBlock
->
info
.
window
.
skey
,
pSDataBlock
->
info
.
window
.
ekey
);
}
}
qWarn
(
"step1 %d"
,
stepTrace
++
);
if
(
IS_FINAL_OP
(
pInfo
))
{
startPos
=
getNextQualifiedFinalWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pSDataBlock
->
info
,
tsCols
,
prevEndPos
);
...
...
@@ -2465,6 +2484,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pSDataBlock
->
info
,
tsCols
,
prevEndPos
,
TSDB_ORDER_ASC
);
}
if
(
startPos
<
0
)
{
qWarn
(
"step1 %d"
,
stepTrace
++
);
break
;
}
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
8e409f81
...
...
@@ -881,7 +881,7 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
}
if
(
pCtx
->
saveHandle
.
pState
)
{
tdbFree
((
void
*
)
p
);
streamFreeVal
((
void
*
)
p
);
}
}
...
...
source/libs/stream/src/streamState.c
浏览文件 @
8e409f81
...
...
@@ -18,6 +18,7 @@
#include <string.h>
#include "executor.h"
#include "osMemory.h"
#include "query.h"
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "streamInc.h"
...
...
@@ -288,7 +289,7 @@ int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const voi
}
int32_t
streamStateFuncGet
(
SStreamState
*
pState
,
const
STupleKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
#ifdef USE_ROCKSDB
return
streamStateFuncGet
(
pState
,
key
,
pVal
,
pVLen
);
return
streamStateFuncGet
_rocksdb
(
pState
,
key
,
pVal
,
pVLen
);
#else
return
tdbTbGet
(
pState
->
pTdbState
->
pFuncStateDb
,
key
,
sizeof
(
STupleKey
),
pVal
,
pVLen
);
#endif
...
...
@@ -398,6 +399,7 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
int32_t
streamStateReleaseBuf
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
*
pVal
)
{
// todo refactor
qWarn
(
"streamStateReleaseBuf"
);
if
(
!
pVal
)
{
return
0
;
}
...
...
source/libs/stream/src/streamStateRocksdb.c
浏览文件 @
8e409f81
...
...
@@ -395,12 +395,14 @@ int streamGetInit(const char* funcName) {
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->ropts; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)vLen, &err); \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)&len, &err); \
if (val == NULL) { \
qWarn("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
code = -1; \
} else { \
*pVal = val; \
if (pVal != NULL) *pVal = val; \
if (vLen != NULL) *vLen = len; \
} \
if (err != NULL) { \
taosMemoryFree(err); \
...
...
@@ -754,9 +756,8 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
if
(
!
pCur
)
{
return
-
1
;
}
SStateSessionKey
ktmp
=
{
0
};
SStateSessionKey
*
pKTmp
=
&
ktmp
;
int32_t
kLen
,
vLen
;
SStateSessionKey
ktmp
=
{
0
};
int32_t
kLen
,
vLen
;
if
(
!
rocksdb_iter_valid
(
pCur
->
iter
))
{
return
-
1
;
...
...
@@ -764,7 +765,8 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
const
char
*
curKey
=
rocksdb_iter_key
(
pCur
->
iter
,
(
size_t
*
)
&
kLen
);
stateSessionKeyDecode
((
void
*
)
&
ktmp
,
(
char
*
)
curKey
);
const
char
*
val
=
rocksdb_iter_value
(
pCur
->
iter
,
(
size_t
*
)
&
vLen
);
SStateSessionKey
*
pKTmp
=
&
ktmp
;
const
char
*
val
=
rocksdb_iter_value
(
pCur
->
iter
,
(
size_t
*
)
&
vLen
);
if
(
pVal
!=
NULL
)
*
pVal
=
(
char
*
)
val
;
if
(
pVLen
!=
NULL
)
*
pVLen
=
vLen
;
...
...
tests/script/tsim/stream/basic0.sim
浏览文件 @
8e409f81
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c
debugflag -v 131
system sh/cfg.sh -n dnode1 -c
qDebugflag -v 143
system sh/exec.sh -n dnode1 -s start
sql connect
...
...
tests/script/tsim/stream/basic1.sim
浏览文件 @
8e409f81
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录