Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5c128e22
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
5c128e22
编写于
3月 28, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
b22cc822
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
34 addition
and
100 deletion
+34
-100
include/util/tarray.h
include/util/tarray.h
+0
-16
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+8
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+12
-1
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+11
-8
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-1
source/util/src/tarray.c
source/util/src/tarray.c
+2
-73
未找到文件。
include/util/tarray.h
浏览文件 @
5c128e22
...
...
@@ -69,14 +69,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles);
*/
void
taosArrayRemoveDuplicate
(
SArray
*
pArray
,
__compar_fn_t
comparFn
,
void
(
*
fp
)(
void
*
));
/**
*
* @param pArray
* @param comparFn
* @param fp
*/
void
taosArrayRemoveDuplicateP
(
SArray
*
pArray
,
__compar_fn_t
comparFn
,
void
(
*
fp
)(
void
*
));
/**
* add all element from the source array list into the destination
* @param pArray
...
...
@@ -252,14 +244,6 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
int32_t
taosEncodeArray
(
void
**
buf
,
const
SArray
*
pArray
,
FEncode
encode
);
void
*
taosDecodeArray
(
const
void
*
buf
,
SArray
**
pArray
,
FDecode
decode
,
int32_t
dataSz
);
/**
* swap array
* @param a
* @param b
* @return
*/
void
taosArraySwap
(
SArray
*
a
,
SArray
*
b
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
5c128e22
...
...
@@ -601,6 +601,13 @@ static void* topicNameDup(void* p){
return
taosStrdup
((
char
*
)
p
);
}
static
void
freeItem
(
void
*
param
)
{
void
*
pItem
=
*
(
void
**
)
param
;
if
(
pItem
!=
NULL
)
{
taosMemoryFree
(
pItem
);
}
}
int32_t
mndProcessSubscribeReq
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
char
*
msgStr
=
pMsg
->
pCont
;
...
...
@@ -616,7 +623,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t
code
=
-
1
;
SArray
*
pTopicList
=
subscribe
.
topicNames
;
taosArraySort
(
pTopicList
,
taosArrayCompareString
);
taosArrayRemoveDuplicate
P
(
pTopicList
,
taosArrayCompareString
,
taosMemoryFree
);
taosArrayRemoveDuplicate
(
pTopicList
,
taosArrayCompareString
,
freeItem
);
int32_t
newTopicNum
=
taosArrayGetSize
(
pTopicList
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
5c128e22
...
...
@@ -885,6 +885,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqInfo
(
"vgId:%d switch consumer from Id:0x%"
PRIx64
" to Id:0x%"
PRIx64
,
req
.
vgId
,
pHandle
->
consumerId
,
req
.
newConsumerId
);
// kill executing task
qTaskInfo_t
pTaskInfo
=
pHandle
->
execHandle
.
task
;
if
(
pTaskInfo
!=
NULL
)
{
// qAsyncKillTask(pTaskInfo);
}
taosWLockLatch
(
&
pTq
->
lock
);
atomic_store_32
(
&
pHandle
->
epoch
,
-
1
);
...
...
@@ -895,7 +901,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_add_fetch_32
(
&
pHandle
->
epoch
,
1
);
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
qStreamCloseTsdbReader
(
pHandle
->
execHandle
.
task
);
qStreamCloseTsdbReader
(
pTaskInfo
);
}
// reset the error code.
if
(
pHandle
->
execHandle
.
task
!=
NULL
)
{
}
taosWUnLockLatch
(
&
pTq
->
lock
);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
5c128e22
...
...
@@ -65,16 +65,17 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
const
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
qTaskInfo_t
task
=
pExec
->
task
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
if
(
qStreamPrepareScan
(
task
,
pOffset
,
pHandle
->
execHandle
.
subType
)
<
0
)
{
tqDebug
(
"prepare scan failed, return
"
);
tqDebug
(
"prepare scan failed, return
, consumer:0x%"
PRIx64
,
pHandle
->
consumerId
);
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
pRsp
->
rspOffset
=
*
pOffset
;
return
0
;
}
else
{
tqOffsetResetToLog
(
pOffset
,
pHandle
->
snapshotVer
);
if
(
qStreamPrepareScan
(
task
,
pOffset
,
pHandle
->
execHandle
.
subType
)
<
0
)
{
tqDebug
(
"prepare scan failed, return
"
);
tqDebug
(
"prepare scan failed, return
, consumer:0x%"
PRIx64
,
pHandle
->
consumerId
);
pRsp
->
rspOffset
=
*
pOffset
;
return
0
;
}
...
...
@@ -86,13 +87,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
tqDebug
(
"vgId:%d, tmq task start to execute
"
,
pTq
->
pVnode
->
config
.
vg
Id
);
tqDebug
(
"vgId:%d, tmq task start to execute
, consumer:0x%"
PRIx64
,
vgId
,
pHandle
->
consumer
Id
);
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
tqError
(
"vgId:%d, task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
tqError
(
"vgId:%d, task exec error since %s, consumer:0x%"
PRIx64
,
vgId
,
terrstr
(),
pHandle
->
consumerId
);
return
-
1
;
}
tqDebug
(
"consumer:0x%"
PRIx64
" vgId:%d, tmq task executed, get %p"
,
pHandle
->
consumerId
,
pTq
->
pVnode
->
config
.
vgId
,
pDataBlock
);
tqDebug
(
"consumer:0x%"
PRIx64
" vgId:%d, tmq task executed, get %p"
,
pHandle
->
consumerId
,
vgId
,
pDataBlock
);
// current scan should be stopped asap, since the rebalance occurs.
if
(
pDataBlock
==
NULL
)
{
...
...
@@ -115,15 +117,16 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
}
if
(
pRsp
->
rspOffset
.
type
==
0
)
{
tqError
(
"
expected rsp offset: type %d %"
PRId64
" %"
PRId64
" %"
PRId64
,
pRsp
->
rspOffset
.
type
,
pRsp
->
rspOffset
.
ts
,
pRsp
->
rspOffset
.
uid
,
pRsp
->
rspOffset
.
version
);
tqError
(
"
vgId:%d, expected rsp offset: type %d %"
PRId64
" %"
PRId64
" %"
PRId64
,
vgId
,
pRsp
->
rspOffset
.
type
,
pRsp
->
rspOffset
.
ts
,
pRsp
->
rspOffset
.
uid
,
pRsp
->
rspOffset
.
version
);
return
-
1
;
}
if
(
pRsp
->
withTbName
||
pRsp
->
withSchema
)
{
tqError
(
"
get column should not with meta:%d,%d"
,
pRsp
->
withTbName
,
pRsp
->
withSchema
);
tqError
(
"
vgId:%d, get column should not with meta:%d,%d"
,
vgId
,
pRsp
->
withTbName
,
pRsp
->
withSchema
);
return
-
1
;
}
return
0
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
5c128e22
...
...
@@ -633,7 +633,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
}
}
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
return
(
0
!=
pTaskInfo
->
code
)
?
true
:
false
;
}
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
return
(
0
!=
pTaskInfo
->
code
)
;
}
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
rspCode
)
{
pTaskInfo
->
code
=
rspCode
;
}
...
...
source/util/src/tarray.c
浏览文件 @
5c128e22
...
...
@@ -157,45 +157,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
pArray
->
size
=
pos
+
1
;
}
void
taosArrayRemoveDuplicateP
(
SArray
*
pArray
,
__compar_fn_t
comparFn
,
void
(
*
fp
)(
void
*
))
{
size_t
size
=
pArray
->
size
;
if
(
size
<=
1
)
{
return
;
}
int32_t
pos
=
0
;
for
(
int32_t
i
=
1
;
i
<
size
;
++
i
)
{
char
*
p1
=
taosArrayGet
(
pArray
,
pos
);
char
*
p2
=
taosArrayGet
(
pArray
,
i
);
if
(
comparFn
(
p1
,
p2
)
==
0
)
{
// do nothing
}
else
{
if
(
pos
+
1
!=
i
)
{
void
*
p
=
taosArrayGetP
(
pArray
,
pos
+
1
);
if
(
fp
!=
NULL
)
{
fp
(
p
);
}
taosArraySet
(
pArray
,
pos
+
1
,
p2
);
memset
(
TARRAY_GET_ELEM
(
pArray
,
i
),
0
,
pArray
->
elemSize
);
pos
+=
1
;
}
else
{
pos
+=
1
;
}
}
}
if
(
fp
!=
NULL
)
{
for
(
int32_t
i
=
pos
+
1
;
i
<
pArray
->
size
;
++
i
)
{
void
*
p
=
taosArrayGetP
(
pArray
,
i
);
fp
(
p
);
}
}
pArray
->
size
=
pos
+
1
;
}
void
*
taosArrayAddAll
(
SArray
*
pArray
,
const
SArray
*
pInput
)
{
if
(
pInput
)
{
return
taosArrayAddBatch
(
pArray
,
pInput
->
pData
,
(
int32_t
)
taosArrayGetSize
(
pInput
));
...
...
@@ -392,20 +353,6 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
pArray
->
size
=
0
;
}
void
taosArrayClearP
(
SArray
*
pArray
,
FDelete
fp
)
{
if
(
pArray
==
NULL
)
return
;
if
(
fp
==
NULL
)
{
pArray
->
size
=
0
;
return
;
}
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
fp
(
*
(
void
**
)
TARRAY_GET_ELEM
(
pArray
,
i
));
}
pArray
->
size
=
0
;
}
void
*
taosArrayDestroy
(
SArray
*
pArray
)
{
if
(
pArray
)
{
taosMemoryFree
(
pArray
->
pData
);
...
...
@@ -495,6 +442,7 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
if
(
pArray
->
size
<=
1
)
{
return
;
}
for
(
int32_t
i
=
1
;
i
<=
pArray
->
size
-
1
;
++
i
)
{
for
(
int32_t
j
=
i
;
j
>
0
;
--
j
)
{
if
(
fn
(
taosArrayGetP
(
pArray
,
j
),
taosArrayGetP
(
pArray
,
j
-
1
),
param
)
==
-
1
)
{
...
...
@@ -507,7 +455,6 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
}
}
}
return
;
}
int32_t
taosEncodeArray
(
void
**
buf
,
const
SArray
*
pArray
,
FEncode
encode
)
{
...
...
@@ -539,21 +486,3 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
taosArrayGetSize
(
pArray
)
>
8
?
taosArrayQuickSort
(
pArray
,
fn
,
param
)
:
taosArrayInsertSort
(
pArray
,
fn
,
param
);
}
void
taosArraySwap
(
SArray
*
a
,
SArray
*
b
)
{
if
(
a
==
NULL
||
b
==
NULL
)
return
;
size_t
t
=
a
->
size
;
a
->
size
=
b
->
size
;
b
->
size
=
t
;
uint32_t
cap
=
a
->
capacity
;
a
->
capacity
=
b
->
capacity
;
b
->
capacity
=
cap
;
uint32_t
elem
=
a
->
elemSize
;
a
->
elemSize
=
b
->
elemSize
;
b
->
elemSize
=
elem
;
void
*
data
=
a
->
pData
;
a
->
pData
=
b
->
pData
;
b
->
pData
=
data
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录