Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
03b1de01
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
“ca8b7a2760be58670ad9388fddc99a1f4bb0d88a”上不存在“projects/PoldiChen”
提交
03b1de01
编写于
4月 28, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: remove assert.
上级
f9a64cbc
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
10 addition
and
60 deletion
+10
-60
include/libs/executor/executor.h
include/libs/executor/executor.h
+0
-2
source/common/src/tname.c
source/common/src/tname.c
+0
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+3
-17
source/util/src/tarray.c
source/util/src/tarray.c
+4
-1
source/util/src/tcache.c
source/util/src/tcache.c
+0
-8
source/util/src/thash.c
source/util/src/thash.c
+0
-17
source/util/src/tlosertree.c
source/util/src/tlosertree.c
+0
-2
source/util/src/tsched.c
source/util/src/tsched.c
+0
-8
source/util/src/tskiplist.c
source/util/src/tskiplist.c
+3
-3
未找到文件。
include/libs/executor/executor.h
浏览文件 @
03b1de01
...
@@ -208,8 +208,6 @@ void* qExtractReaderFromStreamScanner(void* scanner);
...
@@ -208,8 +208,6 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t
qExtractStreamScanner
(
qTaskInfo_t
tinfo
,
void
**
scanner
);
int32_t
qExtractStreamScanner
(
qTaskInfo_t
tinfo
,
void
**
scanner
);
int32_t
qStreamInput
(
qTaskInfo_t
tinfo
,
void
*
pItem
);
int32_t
qStreamSetParamForRecover
(
qTaskInfo_t
tinfo
);
int32_t
qStreamSetParamForRecover
(
qTaskInfo_t
tinfo
);
int32_t
qStreamSourceRecoverStep1
(
qTaskInfo_t
tinfo
,
int64_t
ver
);
int32_t
qStreamSourceRecoverStep1
(
qTaskInfo_t
tinfo
,
int64_t
ver
);
int32_t
qStreamSourceRecoverStep2
(
qTaskInfo_t
tinfo
,
int64_t
ver
);
int32_t
qStreamSourceRecoverStep2
(
qTaskInfo_t
tinfo
,
int64_t
ver
);
...
...
source/common/src/tname.c
浏览文件 @
03b1de01
...
@@ -122,10 +122,8 @@ int32_t tNameLen(const SName* name) {
...
@@ -122,10 +122,8 @@ int32_t tNameLen(const SName* name) {
int32_t
len2
=
(
int32_t
)
strlen
(
name
->
tname
);
int32_t
len2
=
(
int32_t
)
strlen
(
name
->
tname
);
if
(
name
->
type
==
TSDB_DB_NAME_T
)
{
if
(
name
->
type
==
TSDB_DB_NAME_T
)
{
ASSERT
(
len2
==
0
);
return
len
+
len1
+
TSDB_NAME_DELIMITER_LEN
;
return
len
+
len1
+
TSDB_NAME_DELIMITER_LEN
;
}
else
{
}
else
{
ASSERT
(
len2
>
0
);
return
len
+
len1
+
len2
+
TSDB_NAME_DELIMITER_LEN
*
2
;
return
len
+
len1
+
len2
+
TSDB_NAME_DELIMITER_LEN
*
2
;
}
}
}
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
03b1de01
...
@@ -139,7 +139,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
...
@@ -139,7 +139,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
}
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
}
else
if
(
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
}
else
if
(
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
ASSERT
(
numOfBlocks
==
1
);
taosArrayPush
(
pInfo
->
pBlockLists
,
input
);
taosArrayPush
(
pInfo
->
pBlockLists
,
input
);
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
}
else
if
(
type
==
STREAM_INPUT__DATA_BLOCK
)
{
}
else
if
(
type
==
STREAM_INPUT__DATA_BLOCK
)
{
...
@@ -854,15 +853,6 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
...
@@ -854,15 +853,6 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
}
}
#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
return 0;
}
#endif
int32_t
qStreamSourceRecoverStep1
(
qTaskInfo_t
tinfo
,
int64_t
ver
)
{
int32_t
qStreamSourceRecoverStep1
(
qTaskInfo_t
tinfo
,
int64_t
ver
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
...
@@ -897,8 +887,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
...
@@ -897,8 +887,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
||
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
||
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
);
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
);
ASSERT
(
pInfo
->
twAggSup
.
calTriggerSaved
==
0
);
ASSERT
(
pInfo
->
twAggSup
.
calTriggerSaved
==
0
&&
pInfo
->
twAggSup
.
deleteMarkSaved
==
0
);
ASSERT
(
pInfo
->
twAggSup
.
deleteMarkSaved
==
0
);
qInfo
(
"save stream param for interval: %d, %"
PRId64
,
pInfo
->
twAggSup
.
calTrigger
,
pInfo
->
twAggSup
.
deleteMark
);
qInfo
(
"save stream param for interval: %d, %"
PRId64
,
pInfo
->
twAggSup
.
calTrigger
,
pInfo
->
twAggSup
.
deleteMark
);
...
@@ -914,9 +903,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
...
@@ -914,9 +903,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
||
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
||
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
);
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
);
ASSERT
(
pInfo
->
twAggSup
.
calTriggerSaved
==
0
);
ASSERT
(
pInfo
->
twAggSup
.
deleteMarkSaved
==
0
);
ASSERT
(
pInfo
->
twAggSup
.
calTriggerSaved
==
0
&&
pInfo
->
twAggSup
.
deleteMarkSaved
==
0
);
qInfo
(
"save stream param for session: %d, %"
PRId64
,
pInfo
->
twAggSup
.
calTrigger
,
pInfo
->
twAggSup
.
deleteMark
);
qInfo
(
"save stream param for session: %d, %"
PRId64
,
pInfo
->
twAggSup
.
calTrigger
,
pInfo
->
twAggSup
.
deleteMark
);
pInfo
->
twAggSup
.
calTriggerSaved
=
pInfo
->
twAggSup
.
calTrigger
;
pInfo
->
twAggSup
.
calTriggerSaved
=
pInfo
->
twAggSup
.
calTrigger
;
...
@@ -929,8 +917,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
...
@@ -929,8 +917,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
||
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
||
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
);
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
);
ASSERT
(
pInfo
->
twAggSup
.
calTriggerSaved
==
0
);
ASSERT
(
pInfo
->
twAggSup
.
calTriggerSaved
==
0
&&
pInfo
->
twAggSup
.
deleteMarkSaved
==
0
);
ASSERT
(
pInfo
->
twAggSup
.
deleteMarkSaved
==
0
);
qInfo
(
"save stream param for state: %d, %"
PRId64
,
pInfo
->
twAggSup
.
calTrigger
,
pInfo
->
twAggSup
.
deleteMark
);
qInfo
(
"save stream param for state: %d, %"
PRId64
,
pInfo
->
twAggSup
.
calTrigger
,
pInfo
->
twAggSup
.
deleteMark
);
...
@@ -991,7 +978,6 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
...
@@ -991,7 +978,6 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
if
(
pOperator
->
numOfDownstream
!=
1
||
pOperator
->
pDownstream
[
0
]
==
NULL
)
{
if
(
pOperator
->
numOfDownstream
!=
1
||
pOperator
->
pDownstream
[
0
]
==
NULL
)
{
if
(
pOperator
->
numOfDownstream
>
1
)
{
if
(
pOperator
->
numOfDownstream
>
1
)
{
qError
(
"unexpected stream, multiple downstream"
);
qError
(
"unexpected stream, multiple downstream"
);
/*ASSERT(0);*/
return
-
1
;
return
-
1
;
}
}
return
0
;
return
0
;
...
...
source/util/src/tarray.c
浏览文件 @
03b1de01
...
@@ -255,7 +255,10 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
...
@@ -255,7 +255,10 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
}
}
void
taosArrayPopFrontBatch
(
SArray
*
pArray
,
size_t
cnt
)
{
void
taosArrayPopFrontBatch
(
SArray
*
pArray
,
size_t
cnt
)
{
ASSERT
(
cnt
<=
pArray
->
size
);
if
(
cnt
>
pArray
->
size
)
{
cnt
=
pArray
->
size
;
}
pArray
->
size
=
pArray
->
size
-
cnt
;
pArray
->
size
=
pArray
->
size
-
cnt
;
if
(
pArray
->
size
==
0
||
cnt
==
0
)
{
if
(
pArray
->
size
==
0
||
cnt
==
0
)
{
return
;
return
;
...
...
source/util/src/tcache.c
浏览文件 @
03b1de01
...
@@ -264,7 +264,6 @@ static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
...
@@ -264,7 +264,6 @@ static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
static
void
removeNodeInEntryList
(
SCacheEntry
*
pe
,
SCacheNode
*
prev
,
SCacheNode
*
pNode
)
{
static
void
removeNodeInEntryList
(
SCacheEntry
*
pe
,
SCacheNode
*
prev
,
SCacheNode
*
pNode
)
{
if
(
prev
==
NULL
)
{
if
(
prev
==
NULL
)
{
ASSERT
(
pe
->
next
==
pNode
);
pe
->
next
=
pNode
->
pNext
;
pe
->
next
=
pNode
->
pNext
;
}
else
{
}
else
{
prev
->
pNext
=
pNode
->
pNext
;
prev
->
pNext
=
pNode
->
pNext
;
...
@@ -464,7 +463,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
...
@@ -464,7 +463,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
SCacheNode
*
pNode
=
doSearchInEntryList
(
pe
,
key
,
keyLen
,
&
prev
);
SCacheNode
*
pNode
=
doSearchInEntryList
(
pe
,
key
,
keyLen
,
&
prev
);
if
(
pNode
!=
NULL
)
{
if
(
pNode
!=
NULL
)
{
int32_t
ref
=
T_REF_INC
(
pNode
);
int32_t
ref
=
T_REF_INC
(
pNode
);
ASSERT
(
ref
>
0
);
}
}
taosRUnLockLatch
(
&
pe
->
latch
);
taosRUnLockLatch
(
&
pe
->
latch
);
...
@@ -607,7 +605,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
...
@@ -607,7 +605,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
uDebug
(
"cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d"
,
pCacheObj
->
name
,
pNode
->
key
,
uDebug
(
"cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d"
,
pCacheObj
->
name
,
pNode
->
key
,
pNode
->
data
,
ref
);
pNode
->
data
,
ref
);
if
(
ref
>
0
)
{
if
(
ref
>
0
)
{
ASSERT
(
pNode
->
pTNodeHeader
==
NULL
);
taosAddToTrashcan
(
pCacheObj
,
pNode
);
taosAddToTrashcan
(
pCacheObj
,
pNode
);
}
else
{
// ref == 0
}
else
{
// ref == 0
atomic_sub_fetch_64
(
&
pCacheObj
->
sizeInBytes
,
pNode
->
size
);
atomic_sub_fetch_64
(
&
pCacheObj
->
sizeInBytes
,
pNode
->
size
);
...
@@ -916,7 +913,6 @@ void taosStopCacheRefreshWorker(void) {
...
@@ -916,7 +913,6 @@ void taosStopCacheRefreshWorker(void) {
size_t
taosCacheGetNumOfObj
(
const
SCacheObj
*
pCacheObj
)
{
return
pCacheObj
->
numOfElems
+
pCacheObj
->
numOfElemsInTrash
;
}
size_t
taosCacheGetNumOfObj
(
const
SCacheObj
*
pCacheObj
)
{
return
pCacheObj
->
numOfElems
+
pCacheObj
->
numOfElemsInTrash
;
}
SCacheIter
*
taosCacheCreateIter
(
const
SCacheObj
*
pCacheObj
)
{
SCacheIter
*
taosCacheCreateIter
(
const
SCacheObj
*
pCacheObj
)
{
ASSERT
(
pCacheObj
!=
NULL
);
SCacheIter
*
pIter
=
taosMemoryCalloc
(
1
,
sizeof
(
SCacheIter
));
SCacheIter
*
pIter
=
taosMemoryCalloc
(
1
,
sizeof
(
SCacheIter
));
pIter
->
pCacheObj
=
(
SCacheObj
*
)
pCacheObj
;
pIter
->
pCacheObj
=
(
SCacheObj
*
)
pCacheObj
;
pIter
->
entryIndex
=
-
1
;
pIter
->
entryIndex
=
-
1
;
...
@@ -966,12 +962,8 @@ bool taosCacheIterNext(SCacheIter *pIter) {
...
@@ -966,12 +962,8 @@ bool taosCacheIterNext(SCacheIter *pIter) {
SCacheNode
*
pNode
=
pEntry
->
next
;
SCacheNode
*
pNode
=
pEntry
->
next
;
for
(
int32_t
i
=
0
;
i
<
pEntry
->
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pEntry
->
num
;
++
i
)
{
ASSERT
(
pNode
!=
NULL
);
pIter
->
pCurrent
[
i
]
=
pNode
;
pIter
->
pCurrent
[
i
]
=
pNode
;
int32_t
ref
=
T_REF_INC
(
pIter
->
pCurrent
[
i
]);
int32_t
ref
=
T_REF_INC
(
pIter
->
pCurrent
[
i
]);
ASSERT
(
ref
>=
1
);
pNode
=
pNode
->
pNext
;
pNode
=
pNode
->
pNext
;
}
}
...
...
source/util/src/thash.c
浏览文件 @
03b1de01
...
@@ -259,8 +259,6 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
...
@@ -259,8 +259,6 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
pHashObj
->
freeFp
=
NULL
;
pHashObj
->
freeFp
=
NULL
;
pHashObj
->
callbackFp
=
NULL
;
pHashObj
->
callbackFp
=
NULL
;
ASSERT
((
pHashObj
->
capacity
&
(
pHashObj
->
capacity
-
1
))
==
0
);
pHashObj
->
hashList
=
(
SHashEntry
**
)
taosMemoryMalloc
(
pHashObj
->
capacity
*
sizeof
(
void
*
));
pHashObj
->
hashList
=
(
SHashEntry
**
)
taosMemoryMalloc
(
pHashObj
->
capacity
*
sizeof
(
void
*
));
if
(
pHashObj
->
hashList
==
NULL
)
{
if
(
pHashObj
->
hashList
==
NULL
)
{
taosMemoryFree
(
pHashObj
);
taosMemoryFree
(
pHashObj
);
...
@@ -343,7 +341,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
...
@@ -343,7 +341,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
while
(
pNode
)
{
while
(
pNode
)
{
if
((
pNode
->
keyLen
==
keyLen
)
&&
(
*
(
pHashObj
->
equalFp
))(
GET_HASH_NODE_KEY
(
pNode
),
key
,
keyLen
)
==
0
&&
if
((
pNode
->
keyLen
==
keyLen
)
&&
(
*
(
pHashObj
->
equalFp
))(
GET_HASH_NODE_KEY
(
pNode
),
key
,
keyLen
)
==
0
&&
pNode
->
removed
==
0
)
{
pNode
->
removed
==
0
)
{
ASSERT
(
pNode
->
hashVal
==
hashVal
);
break
;
break
;
}
}
...
@@ -701,8 +698,6 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
...
@@ -701,8 +698,6 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
void
pushfrontNodeInEntryList
(
SHashEntry
*
pEntry
,
SHashNode
*
pNode
)
{
void
pushfrontNodeInEntryList
(
SHashEntry
*
pEntry
,
SHashNode
*
pNode
)
{
pNode
->
next
=
pEntry
->
next
;
pNode
->
next
=
pEntry
->
next
;
pEntry
->
next
=
pNode
;
pEntry
->
next
=
pNode
;
ASSERT
(
pNode
->
next
!=
pNode
);
pEntry
->
num
+=
1
;
pEntry
->
num
+=
1
;
}
}
...
@@ -816,19 +811,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
...
@@ -816,19 +811,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
/*uint16_t prevRef = atomic_load_16(&pNode->refCount);*/
/*uint16_t prevRef = atomic_load_16(&pNode->refCount);*/
uint16_t
afterRef
=
atomic_add_fetch_16
(
&
pNode
->
refCount
,
1
);
uint16_t
afterRef
=
atomic_add_fetch_16
(
&
pNode
->
refCount
,
1
);
#if 0
ASSERT(prevRef < afterRef);
// the reference count value is overflow, which will cause the delete node operation immediately.
if (prevRef > afterRef) {
uError("hash entry ref count overflow, prev ref:%d, current ref:%d", prevRef, afterRef);
// restore the value
atomic_sub_fetch_16(&pNode->refCount, 1);
data = NULL;
} else {
data = GET_HASH_NODE_DATA(pNode);
}
#endif
data
=
GET_HASH_NODE_DATA
(
pNode
);
data
=
GET_HASH_NODE_DATA
(
pNode
);
if
(
afterRef
>=
MAX_WARNING_REF_COUNT
)
{
if
(
afterRef
>=
MAX_WARNING_REF_COUNT
)
{
...
...
source/util/src/tlosertree.c
浏览文件 @
03b1de01
...
@@ -115,8 +115,6 @@ void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
...
@@ -115,8 +115,6 @@ void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
}
}
void
tMergeTreeRebuild
(
SMultiwayMergeTreeInfo
*
pTree
)
{
void
tMergeTreeRebuild
(
SMultiwayMergeTreeInfo
*
pTree
)
{
ASSERT
((
pTree
->
totalSources
&
0x1
)
==
0
);
tMergeTreeInit
(
pTree
);
tMergeTreeInit
(
pTree
);
for
(
int32_t
i
=
pTree
->
totalSources
-
1
;
i
>=
pTree
->
numOfSources
;
i
--
)
{
for
(
int32_t
i
=
pTree
->
totalSources
-
1
;
i
>=
pTree
->
numOfSources
;
i
--
)
{
tMergeTreeAdjust
(
pTree
,
i
);
tMergeTreeAdjust
(
pTree
,
i
);
...
...
source/util/src/tsched.c
浏览文件 @
03b1de01
...
@@ -137,7 +137,6 @@ void *taosProcessSchedQueue(void *scheduler) {
...
@@ -137,7 +137,6 @@ void *taosProcessSchedQueue(void *scheduler) {
while
(
1
)
{
while
(
1
)
{
if
((
ret
=
tsem_wait
(
&
pSched
->
fullSem
))
!=
0
)
{
if
((
ret
=
tsem_wait
(
&
pSched
->
fullSem
))
!=
0
)
{
uFatal
(
"wait %s fullSem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
uFatal
(
"wait %s fullSem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
}
if
(
atomic_load_8
(
&
pSched
->
stop
))
{
if
(
atomic_load_8
(
&
pSched
->
stop
))
{
break
;
break
;
...
@@ -145,7 +144,6 @@ void *taosProcessSchedQueue(void *scheduler) {
...
@@ -145,7 +144,6 @@ void *taosProcessSchedQueue(void *scheduler) {
if
((
ret
=
taosThreadMutexLock
(
&
pSched
->
queueMutex
))
!=
0
)
{
if
((
ret
=
taosThreadMutexLock
(
&
pSched
->
queueMutex
))
!=
0
)
{
uFatal
(
"lock %s queueMutex failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
uFatal
(
"lock %s queueMutex failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
}
msg
=
pSched
->
queue
[
pSched
->
fullSlot
];
msg
=
pSched
->
queue
[
pSched
->
fullSlot
];
...
@@ -154,12 +152,10 @@ void *taosProcessSchedQueue(void *scheduler) {
...
@@ -154,12 +152,10 @@ void *taosProcessSchedQueue(void *scheduler) {
if
((
ret
=
taosThreadMutexUnlock
(
&
pSched
->
queueMutex
))
!=
0
)
{
if
((
ret
=
taosThreadMutexUnlock
(
&
pSched
->
queueMutex
))
!=
0
)
{
uFatal
(
"unlock %s queueMutex failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
uFatal
(
"unlock %s queueMutex failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
}
if
((
ret
=
tsem_post
(
&
pSched
->
emptySem
))
!=
0
)
{
if
((
ret
=
tsem_post
(
&
pSched
->
emptySem
))
!=
0
)
{
uFatal
(
"post %s emptySem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
uFatal
(
"post %s emptySem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
}
if
(
msg
.
fp
)
if
(
msg
.
fp
)
...
@@ -187,12 +183,10 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
...
@@ -187,12 +183,10 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
if
((
ret
=
tsem_wait
(
&
pSched
->
emptySem
))
!=
0
)
{
if
((
ret
=
tsem_wait
(
&
pSched
->
emptySem
))
!=
0
)
{
uFatal
(
"wait %s emptySem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
uFatal
(
"wait %s emptySem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
}
if
((
ret
=
taosThreadMutexLock
(
&
pSched
->
queueMutex
))
!=
0
)
{
if
((
ret
=
taosThreadMutexLock
(
&
pSched
->
queueMutex
))
!=
0
)
{
uFatal
(
"lock %s queueMutex failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
uFatal
(
"lock %s queueMutex failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
}
pSched
->
queue
[
pSched
->
emptySlot
]
=
*
pMsg
;
pSched
->
queue
[
pSched
->
emptySlot
]
=
*
pMsg
;
...
@@ -200,12 +194,10 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
...
@@ -200,12 +194,10 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
if
((
ret
=
taosThreadMutexUnlock
(
&
pSched
->
queueMutex
))
!=
0
)
{
if
((
ret
=
taosThreadMutexUnlock
(
&
pSched
->
queueMutex
))
!=
0
)
{
uFatal
(
"unlock %s queueMutex failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
uFatal
(
"unlock %s queueMutex failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
}
if
((
ret
=
tsem_post
(
&
pSched
->
fullSem
))
!=
0
)
{
if
((
ret
=
tsem_post
(
&
pSched
->
fullSem
))
!=
0
)
{
uFatal
(
"post %s fullSem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
uFatal
(
"post %s fullSem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
ASSERT
(
0
);
}
}
return
ret
;
return
ret
;
}
}
...
...
source/util/src/tskiplist.c
浏览文件 @
03b1de01
...
@@ -268,8 +268,9 @@ SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) {
...
@@ -268,8 +268,9 @@ SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) {
}
}
SSkipListIterator
*
tSkipListCreateIterFromVal
(
SSkipList
*
pSkipList
,
const
char
*
val
,
int32_t
type
,
int32_t
order
)
{
SSkipListIterator
*
tSkipListCreateIterFromVal
(
SSkipList
*
pSkipList
,
const
char
*
val
,
int32_t
type
,
int32_t
order
)
{
ASSERT
(
order
==
TSDB_ORDER_ASC
||
order
==
TSDB_ORDER_DESC
);
if
(
order
!=
TSDB_ORDER_ASC
&&
order
!=
TSDB_ORDER_DESC
)
{
ASSERT
(
pSkipList
!=
NULL
);
return
NULL
;
}
SSkipListIterator
*
iter
=
doCreateSkipListIterator
(
pSkipList
,
order
);
SSkipListIterator
*
iter
=
doCreateSkipListIterator
(
pSkipList
,
order
);
if
(
val
==
NULL
)
{
if
(
val
==
NULL
)
{
...
@@ -585,7 +586,6 @@ static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) {
...
@@ -585,7 +586,6 @@ static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) {
}
}
}
}
ASSERT
(
level
<=
pSkipList
->
maxLevel
);
return
level
;
return
level
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录