Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
265e27ec
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
265e27ec
编写于
5月 31, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into enh/tsdb_optimize
上级
cf0228f8
8c240ced
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
49 addition
and
24 deletion
+49
-24
include/common/tglobal.h
include/common/tglobal.h
+1
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+10
-6
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+6
-5
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+19
-10
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+11
-0
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+1
-1
未找到文件。
include/common/tglobal.h
浏览文件 @
265e27ec
...
...
@@ -29,7 +29,6 @@ extern "C" {
#define SLOW_LOG_TYPE_OTHERS 0x4
#define SLOW_LOG_TYPE_ALL 0xFFFFFFFF
// cluster
extern
char
tsFirst
[];
extern
char
tsSecond
[];
...
...
@@ -181,6 +180,7 @@ extern bool tsDisableStream;
extern
int64_t
tsStreamBufferSize
;
extern
int64_t
tsCheckpointInterval
;
extern
bool
tsFilterScalarMode
;
extern
int32_t
tsMaxStreamBackendCache
;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
...
...
source/common/src/tglobal.c
浏览文件 @
265e27ec
...
...
@@ -60,6 +60,7 @@ int32_t tsNumOfQnodeQueryThreads = 4;
int32_t
tsNumOfQnodeFetchThreads
=
1
;
int32_t
tsNumOfSnodeStreamThreads
=
4
;
int32_t
tsNumOfSnodeWriteThreads
=
1
;
int32_t
tsMaxStreamBackendCache
=
128
;
// M
// sync raft
int32_t
tsElectInterval
=
25
*
1000
;
...
...
@@ -105,7 +106,7 @@ int32_t tsQueryPolicy = 1;
int32_t
tsQueryRspPolicy
=
0
;
int64_t
tsQueryMaxConcurrentTables
=
200
;
// unit is TSDB_TABLE_NUM_UNIT
bool
tsEnableQueryHb
=
false
;
bool
tsEnableScience
=
false
;
// on taos-cli show float and doulbe with scientific notation if true
bool
tsEnableScience
=
false
;
// on taos-cli show float and doulbe with scientific notation if true
int32_t
tsQuerySmaOptimize
=
0
;
int32_t
tsQueryRsmaTolerance
=
1000
;
// the tolerance time (ms) to judge from which level to query rsma data.
bool
tsQueryPlannerTrace
=
false
;
...
...
@@ -117,8 +118,8 @@ int32_t tsRedirectFactor = 2;
int32_t
tsRedirectMaxPeriod
=
1000
;
int32_t
tsMaxRetryWaitTime
=
10000
;
bool
tsUseAdapter
=
false
;
int32_t
tsMetaCacheMaxSize
=
-
1
;
// MB
int32_t
tsSlowLogThreshold
=
3
;
// seconds
int32_t
tsMetaCacheMaxSize
=
-
1
;
// MB
int32_t
tsSlowLogThreshold
=
3
;
// seconds
int32_t
tsSlowLogScope
=
SLOW_LOG_TYPE_ALL
;
/*
...
...
@@ -349,7 +350,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"maxRetryWaitTime"
,
tsMaxRetryWaitTime
,
0
,
86400000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"useAdapter"
,
tsUseAdapter
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"crashReporting"
,
tsEnableCrashReport
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"queryMaxConcurrentTables"
,
tsQueryMaxConcurrentTables
,
INT64_MIN
,
INT64_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"queryMaxConcurrentTables"
,
tsQueryMaxConcurrentTables
,
INT64_MIN
,
INT64_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"metaCacheMaxSize"
,
tsMetaCacheMaxSize
,
-
1
,
INT32_MAX
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"slowLogThreshold"
,
tsSlowLogThreshold
,
0
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"slowLogScope"
,
""
,
true
)
!=
0
)
return
-
1
;
...
...
@@ -524,6 +526,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"cacheLazyLoadThreshold"
,
tsCacheLazyLoadThreshold
,
0
,
100000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"filterScalarMode"
,
tsFilterScalarMode
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"maxStreamBackendCache"
,
tsMaxStreamBackendCache
,
16
,
1024
,
0
)
!=
0
)
return
-
1
;
GRANT_CFG_ADD
;
return
0
;
...
...
@@ -781,7 +784,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsNumOfTaskQueueThreads
=
cfgGetItem
(
pCfg
,
"numOfTaskQueueThreads"
)
->
i32
;
tsQueryPolicy
=
cfgGetItem
(
pCfg
,
"queryPolicy"
)
->
i32
;
tsEnableQueryHb
=
cfgGetItem
(
pCfg
,
"enableQueryHb"
)
->
bval
;
tsEnableScience
=
cfgGetItem
(
pCfg
,
"enableScience"
)
->
bval
;
tsEnableScience
=
cfgGetItem
(
pCfg
,
"enableScience"
)
->
bval
;
tsQuerySmaOptimize
=
cfgGetItem
(
pCfg
,
"querySmaOptimize"
)
->
i32
;
tsQueryPlannerTrace
=
cfgGetItem
(
pCfg
,
"queryPlannerTrace"
)
->
bval
;
tsQueryNodeChunkSize
=
cfgGetItem
(
pCfg
,
"queryNodeChunkSize"
)
->
i32
;
...
...
@@ -902,7 +905,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsCheckpointInterval
=
cfgGetItem
(
pCfg
,
"checkpointInterval"
)
->
i64
;
tsFilterScalarMode
=
cfgGetItem
(
pCfg
,
"filterScalarMode"
)
->
bval
;
tsMaxStreamBackendCache
=
cfgGetItem
(
pCfg
,
"maxStreamBackendCache"
)
->
i32
;
GRANT_CFG_GET
;
return
0
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
265e27ec
...
...
@@ -1883,7 +1883,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if
(
pInfo
->
pRecoverRes
!=
NULL
)
{
pInfo
->
blockRecoverContiCnt
++
;
calBlockTbName
(
pInfo
,
pInfo
->
pRecoverRes
);
if
(
pInfo
->
pUpdateInfo
)
{
if
(
!
pInfo
->
igCheckUpdate
&&
pInfo
->
pUpdateInfo
)
{
if
(
pTaskInfo
->
streamInfo
.
recoverStep
==
STREAM_RECOVER_STEP__SCAN1
)
{
TSKEY
maxTs
=
pAPI
->
stateStore
.
updateInfoFillBlockData
(
pInfo
->
pUpdateInfo
,
pInfo
->
pRecoverRes
,
pInfo
->
primaryTsIndex
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
265e27ec
...
...
@@ -1623,7 +1623,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
windowSup
.
parentType
=
type
;
pScanInfo
->
windowSup
.
pIntervalAggSup
=
&
pInfo
->
aggSup
;
if
(
!
pScanInfo
->
igCheckUpdate
&&
!
pScanInfo
->
pUpdateInfo
)
{
if
(
!
pScanInfo
->
pUpdateInfo
)
{
pScanInfo
->
pUpdateInfo
=
pAPI
->
updateInfoInitP
(
&
pInfo
->
interval
,
pInfo
->
twAggSup
.
waterMark
);
}
...
...
@@ -2150,28 +2150,29 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
}
void
processPullOver
(
SSDataBlock
*
pBlock
,
SHashObj
*
pMap
,
SInterval
*
pInterval
)
{
SColumnInfoData
*
pStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
CALCULATE_
START_TS_COLUMN_INDEX
);
TSKEY
*
tsData
=
(
TSKEY
*
)
pStartCol
->
pData
;
SColumnInfoData
*
pEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
CALCULATE_
END_TS_COLUMN_INDEX
);
TSKEY
*
tsEndData
=
(
TSKEY
*
)
pEndCol
->
pData
;
SColumnInfoData
*
pGroupCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
groupIdData
=
(
uint64_t
*
)
pGroupCol
->
pData
;
int32_t
chId
=
getChildIndex
(
pBlock
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
TSKEY
winTs
=
tsData
[
i
];
while
(
winTs
<
tsEndData
[
i
])
{
while
(
winTs
<
=
tsEndData
[
i
])
{
SWinKey
winRes
=
{.
ts
=
winTs
,
.
groupId
=
groupIdData
[
i
]};
void
*
chIds
=
taosHashGet
(
pMap
,
&
winRes
,
sizeof
(
SWinKey
));
if
(
chIds
)
{
SArray
*
chArray
=
*
(
SArray
**
)
chIds
;
int32_t
index
=
taosArraySearchIdx
(
chArray
,
&
chId
,
compareInt32Val
,
TD_EQ
);
if
(
index
!=
-
1
)
{
qDebug
(
"===stream===window %"
PRId64
" delete child id %d"
,
winRes
.
ts
,
chId
);
qDebug
(
"===stream===
retrive
window %"
PRId64
" delete child id %d"
,
winRes
.
ts
,
chId
);
taosArrayRemove
(
chArray
,
index
);
if
(
taosArrayGetSize
(
chArray
)
==
0
)
{
// pull data is over
taosArrayDestroy
(
chArray
);
taosHashRemove
(
pMap
,
&
winRes
,
sizeof
(
SWinKey
));
qDebug
(
"===stream===retrive pull data over.window %"
PRId64
,
winRes
.
ts
);
}
}
}
...
...
source/libs/stream/src/stream.c
浏览文件 @
265e27ec
...
...
@@ -132,8 +132,6 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
int32_t
code
=
tAppendDataToInputQueue
(
pTask
,
(
SStreamQueueItem
*
)
pBlock
);
// input queue is full, upstream is blocked now
status
=
(
code
==
TSDB_CODE_SUCCESS
)
?
TASK_INPUT_STATUS__NORMAL
:
TASK_INPUT_STATUS__BLOCKED
;
}
// rsp by input status
...
...
@@ -235,8 +233,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SStreamDispatchRsp
*
pRsp
,
int32_t
code
)
{
ASSERT
(
pRsp
->
inputStatus
==
TASK_OUTPUT_STATUS__NORMAL
||
pRsp
->
inputStatus
==
TASK_OUTPUT_STATUS__BLOCKED
);
qDebug
(
"s-task:%s receive dispatch rsp, code: %x"
,
pTask
->
id
.
idStr
,
code
);
qDebug
(
"s-task:%s receive dispatch rsp, status:%d code:%d"
,
pTask
->
id
.
idStr
,
pRsp
->
inputStatus
,
code
);
if
(
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
int32_t
leftRsp
=
atomic_sub_fetch_32
(
&
pTask
->
shuffleDispatcher
.
waitingRspCnt
,
1
);
...
...
@@ -248,13 +245,16 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int8_t
old
=
atomic_exchange_8
(
&
pTask
->
outputStatus
,
pRsp
->
inputStatus
);
ASSERT
(
old
==
TASK_OUTPUT_STATUS__WAIT
);
// the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp
// todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms.
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO: init recover timer
ASSERT
(
0
);
qError
(
"s-task:%s inputQ of downstream task:0x%x is full, need to block output"
,
pTask
->
id
.
idStr
,
pRsp
->
downstreamTaskId
);
return
0
;
}
//
continue dispatch one block to down stream
in pipeline
//
otherwise, continue dispatch the first block to down stream task
in pipeline
streamDispatchStreamBlock
(
pTask
);
return
0
;
}
...
...
@@ -304,23 +304,32 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return
-
1
;
}
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
streamDataSubmitDestroy
(
px
);
taosFreeQitem
(
pItem
);
return
code
;
}
}
else
if
(
type
==
STREAM_INPUT__DATA_BLOCK
||
type
==
STREAM_INPUT__DATA_RETRIEVE
||
type
==
STREAM_INPUT__REF_DATA_BLOCK
)
{
if
(
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
&&
(
tInputQueueIsFull
(
pTask
)))
{
if
(
/*(pTask->taskLevel == TASK_LEVEL__SOURCE) && */
(
tInputQueueIsFull
(
pTask
)))
{
qError
(
"s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort"
,
pTask
->
id
.
idStr
,
STREAM_TASK_INPUT_QUEUEU_CAPACITY
,
STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE
,
total
,
size
);
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
taosFreeQitem
(
pItem
);
return
-
1
;
}
qDebug
(
"s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)"
,
pTask
->
id
.
idStr
,
total
,
size
);
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
return
code
;
}
}
else
if
(
type
==
STREAM_INPUT__CHECKPOINT
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
else
if
(
type
==
STREAM_INPUT__GET_RES
)
{
// use the default memory limit, refactor later.
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
qDebug
(
"s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)"
,
pTask
->
id
.
idStr
,
total
,
size
);
}
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
265e27ec
...
...
@@ -38,6 +38,15 @@ typedef struct {
rocksdb_comparator_t
**
pCompares
;
}
RocksdbCfInst
;
uint32_t
nextPow2
(
uint32_t
x
)
{
x
=
x
-
1
;
x
=
x
|
(
x
>>
1
);
x
=
x
|
(
x
>>
2
);
x
=
x
|
(
x
>>
4
);
x
=
x
|
(
x
>>
8
);
x
=
x
|
(
x
>>
16
);
return
x
+
1
;
}
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
);
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
);
...
...
@@ -92,6 +101,8 @@ void* streamBackendInit(const char* path) {
rocksdb_options_set_recycle_log_file_num
(
opts
,
6
);
rocksdb_options_set_max_write_buffer_number
(
opts
,
2
);
rocksdb_options_set_info_log_level
(
opts
,
0
);
uint32_t
dbLimit
=
nextPow2
(
tsMaxStreamBackendCache
);
rocksdb_options_set_db_write_buffer_size
(
opts
,
dbLimit
<<
20
);
pHandle
->
env
=
env
;
pHandle
->
dbOpt
=
opts
;
...
...
source/libs/stream/src/streamState.c
浏览文件 @
265e27ec
...
...
@@ -1062,7 +1062,7 @@ _end:
}
int32_t
streamStatePutParName
(
SStreamState
*
pState
,
int64_t
groupId
,
const
char
tbname
[
TSDB_TABLE_NAME_LEN
])
{
q
Warn
(
"try to write to cf parname"
);
q
Debug
(
"try to write to cf parname"
);
#ifdef USE_ROCKSDB
if
(
tSimpleHashGetSize
(
pState
->
parNameMap
)
>
MAX_TABLE_NAME_NUM
)
{
if
(
tSimpleHashGet
(
pState
->
parNameMap
,
&
groupId
,
sizeof
(
int64_t
))
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录