Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7f77c924
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7f77c924
编写于
4月 29, 2022
作者:
L
Liu Jicong
提交者:
GitHub
4月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12066 from taosdata/feature/tq
fix: memory error
上级
b4a88b8a
de939d58
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
119 addition
and
54 deletion
+119
-54
example/src/tmq.c
example/src/tmq.c
+0
-2
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/inc/mndTopic.h
source/dnode/mnode/impl/inc/mndTopic.h
+2
-0
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+1
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+1
-9
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+35
-1
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+36
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+42
-0
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+0
-41
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+1
-0
未找到文件。
example/src/tmq.c
浏览文件 @
7f77c924
...
...
@@ -14,9 +14,7 @@
*/
#include <assert.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
7f77c924
...
...
@@ -517,6 +517,7 @@ void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
typedef
struct
{
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
SRWLatch
lock
;
int64_t
dbUid
;
int32_t
vgNum
;
int8_t
subType
;
int8_t
withTbName
;
...
...
source/dnode/mnode/impl/inc/mndTopic.h
浏览文件 @
7f77c924
...
...
@@ -31,6 +31,8 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
SSdbRaw
*
mndTopicActionEncode
(
SMqTopicObj
*
pTopic
);
SSdbRow
*
mndTopicActionDecode
(
SSdbRaw
*
pRaw
);
int32_t
mndDropTopicByDB
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
7f77c924
...
...
@@ -241,7 +241,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
void
*
tDecodeSMqConsumerEp
(
const
void
*
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
taosDecodeArray
(
buf
,
&
pConsumerEp
->
vgs
,
(
FDecode
)
tDecodeSMqVgEp
,
sizeof
(
SMq
Sub
VgEp
));
buf
=
taosDecodeArray
(
buf
,
&
pConsumerEp
->
vgs
,
(
FDecode
)
tDecodeSMqVgEp
,
sizeof
(
SMqVgEp
));
#if 0
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
7f77c924
...
...
@@ -731,7 +731,6 @@ _OVER:
static
int32_t
mndProcessMCreateStbReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
code
=
-
1
;
SStbObj
*
pTopicStb
=
NULL
;
SStbObj
*
pStb
=
NULL
;
SDbObj
*
pDb
=
NULL
;
SUserObj
*
pUser
=
NULL
;
...
...
@@ -762,12 +761,6 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
goto
_OVER
;
}
pTopicStb
=
mndAcquireStb
(
pMnode
,
createReq
.
name
);
if
(
pTopicStb
!=
NULL
)
{
terrno
=
TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC
;
goto
_OVER
;
}
pDb
=
mndAcquireDbByStb
(
pMnode
,
createReq
.
name
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
...
...
@@ -785,7 +778,7 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
int32_t
numOfStbs
=
-
1
;
mndGetNumOfStbs
(
pMnode
,
pDb
->
name
,
&
numOfStbs
);
if
(
pDb
->
cfg
.
numOfStables
==
1
&&
numOfStbs
!=
0
)
{
if
(
pDb
->
cfg
.
numOfStables
==
1
&&
numOfStbs
!=
0
)
{
terrno
=
TSDB_CODE_MND_SINGLE_STB_MODE_DB
;
goto
_OVER
;
}
...
...
@@ -799,7 +792,6 @@ _OVER:
}
mndReleaseStb
(
pMnode
,
pStb
);
mndReleaseStb
(
pMnode
,
pTopicStb
);
mndReleaseDb
(
pMnode
,
pDb
);
mndReleaseUser
(
pMnode
,
pUser
);
tFreeSMCreateStbReq
(
&
createReq
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
7f77c924
...
...
@@ -80,6 +80,7 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pSub
->
dbUid
=
pTopic
->
dbUid
;
pSub
->
subType
=
pTopic
->
subType
;
pSub
->
withTbName
=
pTopic
->
withTbName
;
pSub
->
withSchema
=
pTopic
->
withSchema
;
...
...
@@ -593,7 +594,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
int32_t
dataPos
=
0
;
int32_t
tlen
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
tlen
,
SUB_DECODE_OVER
);
buf
=
taosMemoryMalloc
(
tlen
+
1
);
buf
=
taosMemoryMalloc
(
tlen
);
if
(
buf
==
NULL
)
goto
SUB_DECODE_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
tlen
,
SUB_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_SUBSCRIBE_RESERVE_SIZE
,
SUB_DECODE_OVER
);
...
...
@@ -679,3 +680,36 @@ static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndSetDropSubCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqSubscribeObj
*
pSub
)
{
SSdbRaw
*
pCommitRaw
=
mndSubActionEncode
(
pSub
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
int32_t
mndDropSubByDB
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
int32_t
code
=
-
1
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
SMqSubscribeObj
*
pSub
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
pIter
,
(
void
**
)
&
pSub
);
if
(
pIter
==
NULL
)
break
;
if
(
pSub
->
dbUid
!=
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pSub
);
continue
;
}
if
(
mndSetDropSubCommitLogs
(
pMnode
,
pTrans
,
pSub
)
<
0
)
{
goto
END
;
}
}
code
=
0
;
END:
return
code
;
}
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
7f77c924
...
...
@@ -38,6 +38,8 @@ static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static
int32_t
mndRetrieveTopic
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextTopic
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndSetDropTopicCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqTopicObj
*
pTopic
);
int32_t
mndInitTopic
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_TOPIC
,
.
keyType
=
SDB_KEY_BINARY
,
...
...
@@ -553,7 +555,41 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
return
numOfRows
;
}
static
int32_t
mndSetDropTopicCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqTopicObj
*
pTopic
)
{
SSdbRaw
*
pCommitRaw
=
mndTopicActionEncode
(
pTopic
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
void
mndCancelGetNextTopic
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
int32_t
mndDropTopicByDB
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
int32_t
code
=
-
1
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
SMqTopicObj
*
pTopic
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_TOPIC
,
pIter
,
(
void
**
)
&
pTopic
);
if
(
pIter
==
NULL
)
break
;
if
(
pTopic
->
dbUid
!=
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pTopic
);
continue
;
}
if
(
mndSetDropTopicCommitLogs
(
pMnode
,
pTrans
,
pTopic
)
<
0
)
{
goto
END
;
}
}
code
=
0
;
END:
return
code
;
}
source/dnode/vnode/src/tq/tq.c
浏览文件 @
7f77c924
...
...
@@ -58,6 +58,48 @@ void tqClose(STQ* pTq) {
// TODO
}
static
void
tdSRowDemo
()
{
#define DEMO_N_COLS 3
int16_t
schemaVersion
=
0
;
int32_t
numOfCols
=
DEMO_N_COLS
;
// ts + int
SRowBuilder
rb
=
{
0
};
SSchema
schema
[
DEMO_N_COLS
]
=
{
{.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
colId
=
1
,
.
name
=
"ts"
,
.
bytes
=
8
,
.
flags
=
SCHEMA_SMA_ON
},
{.
type
=
TSDB_DATA_TYPE_INT
,
.
colId
=
2
,
.
name
=
"c1"
,
.
bytes
=
4
,
.
flags
=
SCHEMA_SMA_ON
},
{.
type
=
TSDB_DATA_TYPE_INT
,
.
colId
=
3
,
.
name
=
"c2"
,
.
bytes
=
4
,
.
flags
=
SCHEMA_SMA_ON
}};
SSchema
*
pSchema
=
schema
;
STSchema
*
pTSChema
=
tdGetSTSChemaFromSSChema
(
&
pSchema
,
numOfCols
);
tdSRowInit
(
&
rb
,
schemaVersion
);
tdSRowSetTpInfo
(
&
rb
,
numOfCols
,
pTSChema
->
flen
);
int32_t
maxLen
=
TD_ROW_MAX_BYTES_FROM_SCHEMA
(
pTSChema
);
void
*
row
=
taosMemoryCalloc
(
1
,
maxLen
);
// make sure the buffer is enough
// set row buf
tdSRowResetBuf
(
&
rb
,
row
);
for
(
int32_t
idx
=
0
;
idx
<
pTSChema
->
numOfCols
;
++
idx
)
{
STColumn
*
pColumn
=
pTSChema
->
columns
+
idx
;
if
(
idx
==
0
)
{
int64_t
tsKey
=
1651234567
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
&
tsKey
,
true
,
pColumn
->
offset
,
idx
);
}
else
if
(
idx
==
1
)
{
int32_t
val1
=
10
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
&
val1
,
true
,
pColumn
->
offset
,
idx
);
}
else
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NONE
,
NULL
,
true
,
pColumn
->
offset
,
idx
);
}
}
// print
tdSRowPrint
(
row
,
pTSChema
,
__func__
);
taosMemoryFree
(
pTSChema
);
}
int32_t
tqPushMsgNew
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
void
*
pIter
=
NULL
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
7f77c924
...
...
@@ -45,47 +45,6 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
#endif
return
0
;
}
static
void
tdSRowDemo
()
{
#define DEMO_N_COLS 3
int16_t
schemaVersion
=
0
;
int32_t
numOfCols
=
DEMO_N_COLS
;
// ts + int
SRowBuilder
rb
=
{
0
};
SSchema
schema
[
DEMO_N_COLS
]
=
{
{.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
colId
=
1
,
.
name
=
"ts"
,
.
bytes
=
8
,
.
flags
=
SCHEMA_SMA_ON
},
{.
type
=
TSDB_DATA_TYPE_INT
,
.
colId
=
2
,
.
name
=
"c1"
,
.
bytes
=
4
,
.
flags
=
SCHEMA_SMA_ON
},
{.
type
=
TSDB_DATA_TYPE_INT
,
.
colId
=
3
,
.
name
=
"c2"
,
.
bytes
=
4
,
.
flags
=
SCHEMA_SMA_ON
}};
SSchema
*
pSchema
=
schema
;
STSchema
*
pTSChema
=
tdGetSTSChemaFromSSChema
(
&
pSchema
,
numOfCols
);
tdSRowInit
(
&
rb
,
schemaVersion
);
tdSRowSetTpInfo
(
&
rb
,
numOfCols
,
pTSChema
->
flen
);
int32_t
maxLen
=
TD_ROW_MAX_BYTES_FROM_SCHEMA
(
pTSChema
);
void
*
row
=
taosMemoryCalloc
(
1
,
maxLen
);
// make sure the buffer is enough
// set row buf
tdSRowResetBuf
(
&
rb
,
row
);
for
(
int32_t
idx
=
0
;
idx
<
pTSChema
->
numOfCols
;
++
idx
)
{
STColumn
*
pColumn
=
pTSChema
->
columns
+
idx
;
if
(
idx
==
0
)
{
int64_t
tsKey
=
1651234567
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
&
tsKey
,
true
,
pColumn
->
offset
,
idx
);
}
else
if
(
idx
==
1
)
{
int32_t
val1
=
10
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
&
val1
,
true
,
pColumn
->
offset
,
idx
);
}
else
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NONE
,
NULL
,
true
,
pColumn
->
offset
,
idx
);
}
}
// print
tdSRowPrint
(
row
,
pTSChema
,
__func__
);
taosMemoryFree
(
pTSChema
);
}
int
vnodeProcessWriteReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
)
{
void
*
ptr
=
NULL
;
...
...
source/libs/stream/src/tstream.c
浏览文件 @
7f77c924
...
...
@@ -28,6 +28,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
if
(
buf
==
NULL
)
{
return
-
1
;
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
((
SMsgHead
*
)
buf
)
->
vgId
=
0
;
req
.
taskId
=
pTask
->
inplaceDispatcher
.
taskId
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录