Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
64c2b51d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
64c2b51d
编写于
3月 28, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/tdb
上级
78313467
460ae120
变更
31
隐藏空白更改
内联
并排
Showing
31 changed file
with
231 addition
and
184 deletion
+231
-184
example/src/tstream.c
example/src/tstream.c
+1
-1
include/client/taos.h
include/client/taos.h
+2
-2
include/common/tmsgdef.h
include/common/tmsgdef.h
+0
-1
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+9
-3
include/util/tprocess.h
include/util/tprocess.h
+1
-0
source/client/src/tmq.c
source/client/src/tmq.c
+14
-11
source/dnode/mgmt/container/inc/dndInt.h
source/dnode/mgmt/container/inc/dndInt.h
+0
-1
source/dnode/mgmt/container/src/dndExec.c
source/dnode/mgmt/container/src/dndExec.c
+16
-12
source/dnode/mgmt/container/src/dndFile.c
source/dnode/mgmt/container/src/dndFile.c
+14
-14
source/dnode/mgmt/container/src/dndMsg.c
source/dnode/mgmt/container/src/dndMsg.c
+11
-7
source/dnode/mgmt/container/src/dndObj.c
source/dnode/mgmt/container/src/dndObj.c
+1
-1
source/dnode/mgmt/container/src/dndTransport.c
source/dnode/mgmt/container/src/dndTransport.c
+1
-1
source/dnode/mgmt/vnode/src/vmMsg.c
source/dnode/mgmt/vnode/src/vmMsg.c
+0
-1
source/dnode/mgmt/vnode/src/vmWorker.c
source/dnode/mgmt/vnode/src/vmWorker.c
+19
-1
source/dnode/mnode/impl/inc/mndScheduler.h
source/dnode/mnode/impl/inc/mndScheduler.h
+1
-1
source/dnode/mnode/impl/inc/mndStream.h
source/dnode/mnode/impl/inc/mndStream.h
+1
-1
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+9
-8
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+6
-1
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+4
-3
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+3
-3
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+4
-4
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+4
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+10
-7
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+5
-2
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+5
-0
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+24
-6
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+5
-5
source/libs/wal/src/walSeek.c
source/libs/wal/src/walSeek.c
+2
-3
source/util/src/tprocess.c
source/util/src/tprocess.c
+49
-44
tests/script/tsim/tmq/basic.sim
tests/script/tsim/tmq/basic.sim
+1
-1
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+9
-38
未找到文件。
example/src/tstream.c
浏览文件 @
64c2b51d
...
...
@@ -25,7 +25,7 @@ int32_t init_env() {
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups
1
"
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups
2
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/client/taos.h
浏览文件 @
64c2b51d
...
...
@@ -217,7 +217,6 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new1
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
);
DLL_EXPORT
const
char
*
tmq_err2str
(
tmq_resp_err_t
);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
...
...
@@ -258,7 +257,8 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
);
DLL_EXPORT
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
);
DLL_EXPORT
char
*
tmq_get_topic_schema
(
tmq_t
*
tmq
,
const
char
*
topic
);
DLL_EXPORT
void
*
tmq_get_topic_schema
(
tmq_t
*
tmq
,
const
char
*
topic
);
DLL_EXPORT
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
DLL_EXPORT
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
...
...
include/common/tmsgdef.h
浏览文件 @
64c2b51d
...
...
@@ -192,7 +192,6 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_DEPLOY
,
"vnode-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_EXEC
,
"vnode-task-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_PIPE_EXEC
,
"vnode-task-pipe-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_MERGE_EXEC
,
"vnode-task-merge-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_WRITE_EXEC
,
"vnode-task-write-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
...
...
include/libs/stream/tstream.h
浏览文件 @
64c2b51d
...
...
@@ -62,10 +62,11 @@ typedef struct {
}
STaskExec
;
typedef
struct
{
int
8_t
reserve
d
;
int
32_t
taskI
d
;
}
STaskDispatcherInplace
;
typedef
struct
{
int32_t
taskId
;
int32_t
nodeId
;
SEpSet
epSet
;
}
STaskDispatcherFixedEp
;
...
...
@@ -81,8 +82,12 @@ typedef struct {
SHashObj
*
pHash
;
// groupId to tbuid
}
STaskSinkTb
;
typedef
void
FSmaHandle
(
void
*
vnode
,
int64_t
smaId
,
const
SArray
*
data
);
typedef
struct
{
int8_t
reserved
;
int64_t
smaId
;
// following are not applicable to encoder and decoder
FSmaHandle
*
smaHandle
;
}
STaskSinkSma
;
typedef
struct
{
...
...
@@ -155,7 +160,8 @@ typedef struct {
STaskDispatcherShuffle
shuffleDispatcher
;
};
// state storage
// application storage
void
*
ahandle
;
}
SStreamTask
;
...
...
include/util/tprocess.h
浏览文件 @
64c2b51d
...
...
@@ -51,6 +51,7 @@ void taosProcCleanup(SProcObj *pProc);
int32_t
taosProcRun
(
SProcObj
*
pProc
);
void
taosProcStop
(
SProcObj
*
pProc
);
bool
taosProcIsChild
(
SProcObj
*
pProc
);
int32_t
taosProcChildId
(
SProcObj
*
pProc
);
int32_t
taosProcPutToChildQueue
(
SProcObj
*
pProc
,
void
*
pHead
,
int32_t
headLen
,
void
*
pBody
,
int32_t
bodyLen
);
int32_t
taosProcPutToParentQueue
(
SProcObj
*
pProc
,
void
*
pHead
,
int32_t
headLen
,
void
*
pBody
,
int32_t
bodyLen
);
...
...
source/client/src/tmq.c
浏览文件 @
64c2b51d
...
...
@@ -186,23 +186,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if
(
strcmp
(
key
,
"
connection
.ip"
)
==
0
)
{
if
(
strcmp
(
key
,
"
td.connect
.ip"
)
==
0
)
{
conf
->
ip
=
strdup
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"
connection
.user"
)
==
0
)
{
if
(
strcmp
(
key
,
"
td.connect
.user"
)
==
0
)
{
conf
->
user
=
strdup
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"
connection
.pass"
)
==
0
)
{
if
(
strcmp
(
key
,
"
td.connect
.pass"
)
==
0
)
{
conf
->
pass
=
strdup
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"
connection
.port"
)
==
0
)
{
if
(
strcmp
(
key
,
"
td.connect
.port"
)
==
0
)
{
conf
->
port
=
atoi
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"
connection
.db"
)
==
0
)
{
if
(
strcmp
(
key
,
"
td.connect
.db"
)
==
0
)
{
conf
->
db
=
strdup
(
value
);
return
TMQ_CONF_OK
;
}
...
...
@@ -223,13 +223,13 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
}
void
tmq_list_destroy
(
tmq_list_t
*
list
)
{
SArray
*
container
=
(
SArray
*
)
list
;
SArray
*
container
=
&
list
->
container
;
/*taosArrayDestroy(container);*/
taosArrayDestroyEx
(
container
,
(
void
(
*
)(
void
*
))
taosMemoryFree
);
}
void
tmqClearUnhandleMsg
(
tmq_t
*
tmq
)
{
tmq_message_t
*
msg
;
tmq_message_t
*
msg
=
NULL
;
while
(
1
)
{
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
msg
);
if
(
msg
)
...
...
@@ -807,7 +807,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg
*
pVg
=
pParam
->
pVg
;
tmq_t
*
tmq
=
pParam
->
tmq
;
if
(
code
!=
0
)
{
printf
(
"msg discard
%x
\n
"
,
code
);
printf
(
"msg discard
, code:
%x
\n
"
,
code
);
goto
WRITE_QUEUE_FAIL
;
}
...
...
@@ -877,10 +877,10 @@ WRITE_QUEUE_FAIL:
}
bool
tmqUpdateEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
SMqCMGetSubEpRsp
*
pRsp
)
{
/*printf("call update ep %d\n", epoch);*/
bool
set
=
false
;
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
topics
);
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
SArray
*
newTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
pRsp
->
topics
,
i
);
...
...
@@ -899,8 +899,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
}
taosArrayPush
(
tmq
->
client
Topics
,
&
topic
);
taosArrayPush
(
new
Topics
,
&
topic
);
}
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
tmq
->
clientTopics
=
newTopics
;
atomic_store_32
(
&
tmq
->
epoch
,
epoch
);
return
set
;
}
...
...
@@ -1219,6 +1221,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
if
(
rspMsg
->
msg
.
head
.
epoch
==
atomic_load_32
(
&
tmq
->
epoch
))
{
/*printf("epoch match\n");*/
SMqClientVg
*
pVg
=
rspMsg
->
vg
;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg
->
currentOffset
=
rspMsg
->
msg
.
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
return
rspMsg
;
...
...
source/dnode/mgmt/container/inc/dndInt.h
浏览文件 @
64c2b51d
...
...
@@ -56,7 +56,6 @@ void dndCleanupServer(SDnode *pDnode);
int32_t
dndInitClient
(
SDnode
*
pDnode
);
void
dndCleanupClient
(
SDnode
*
pDnode
);
int32_t
dndInitMsgHandle
(
SDnode
*
pDnode
);
void
dndSendRpcRsp
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRsp
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/container/src/dndExec.c
浏览文件 @
64c2b51d
...
...
@@ -20,7 +20,7 @@ static void dndResetLog(SMgmtWrapper *pMgmt) {
char
logname
[
24
]
=
{
0
};
snprintf
(
logname
,
sizeof
(
logname
),
"%slog"
,
pMgmt
->
name
);
dInfo
(
"node:%s, reset log to %s"
,
pMgmt
->
name
,
logname
);
dInfo
(
"node:%s, reset log to %s
in child process
"
,
pMgmt
->
name
,
logname
);
taosCloseLog
();
taosInitLog
(
logname
,
1
);
}
...
...
@@ -51,6 +51,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
void
dndCloseNode
(
SMgmtWrapper
*
pWrapper
)
{
dDebug
(
"node:%s, start to close"
,
pWrapper
->
name
);
pWrapper
->
required
=
false
;
taosWLockLatch
(
&
pWrapper
->
latch
);
if
(
pWrapper
->
deployed
)
{
(
*
pWrapper
->
fp
.
closeFp
)(
pWrapper
);
...
...
@@ -138,7 +139,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t
static
void
dndConsumeParentQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRsp
,
int32_t
msgLen
,
void
*
pCont
,
int32_t
contLen
)
{
dTrace
(
"msg:%p, get from parent queue"
,
pRsp
);
pRsp
->
pCont
=
pCont
;
dndSendR
pcR
sp
(
pWrapper
,
pRsp
);
dndSendRsp
(
pWrapper
,
pRsp
);
taosMemoryFree
(
pRsp
);
}
...
...
@@ -178,7 +179,6 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
testFlag
=
0
,
.
pParent
=
pWrapper
,
.
name
=
pWrapper
->
name
};
SProcObj
*
pProc
=
taosProcInit
(
&
cfg
);
...
...
@@ -200,7 +200,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dInfo
(
"node:%s, will be initialized in child process"
,
pWrapper
->
name
);
dndOpenNode
(
pWrapper
);
}
else
{
dInfo
(
"node:%s, will not start in parent process
"
,
pWrapper
->
name
);
dInfo
(
"node:%s, will not start in parent process
, child pid:%d"
,
pWrapper
->
name
,
taosProcChildId
(
pProc
)
);
pWrapper
->
procType
=
PROC_PARENT
;
}
...
...
@@ -210,16 +210,20 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
}
}
#if 0
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) {
dndReleaseWrapper(pWrapper);
dError("failed to start dnode worker since %s", terrstr());
return -1;
dndSetStatus
(
pDnode
,
DND_STAT_RUNNING
);
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
pWrapper
->
fp
.
startFp
==
NULL
)
continue
;
if
(
pWrapper
->
procType
==
PROC_PARENT
&&
n
!=
DNODE
)
continue
;
if
(
pWrapper
->
procType
==
PROC_CHILD
&&
n
==
DNODE
)
continue
;
if
((
*
pWrapper
->
fp
.
startFp
)(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
}
dndReleaseWrapper(pWrapper);
#endif
return
0
;
}
...
...
source/dnode/mgmt/container/src/dndFile.c
浏览文件 @
64c2b51d
...
...
@@ -16,14 +16,16 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#define MAXLEN 1024
int32_t
dndReadFile
(
SMgmtWrapper
*
pWrapper
,
bool
*
pDeployed
)
{
int32_t
code
=
TSDB_CODE_NODE_PARSE_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
1024
;
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
)
;
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
];
TdFilePtr
pFile
=
NULL
;
int32_t
code
=
TSDB_CODE_NODE_PARSE_FILE_ERROR
;
int32_t
len
=
0
;
const
int32_t
maxLen
=
MAXLEN
;
char
content
[
MAXLEN
+
1
]
=
{
0
}
;
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
];
TdFilePtr
pFile
=
NULL
;
snprintf
(
file
,
sizeof
(
file
),
"%s%s%s.json"
,
pWrapper
->
path
,
TD_DIRSEP
,
pWrapper
->
name
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
...
...
@@ -57,7 +59,6 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
dDebug
(
"succcessed to read file %s, deployed:%d"
,
file
,
*
pDeployed
);
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
...
...
@@ -66,7 +67,7 @@ _OVER:
}
int32_t
dndWriteFile
(
SMgmtWrapper
*
pWrapper
,
bool
deployed
)
{
char
file
[
PATH_MAX
];
char
file
[
PATH_MAX
]
=
{
0
}
;
snprintf
(
file
,
sizeof
(
file
),
"%s%s%s.json"
,
pWrapper
->
path
,
TD_DIRSEP
,
pWrapper
->
name
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
...
...
@@ -76,9 +77,9 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
1024
;
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
)
;
int32_t
len
=
0
;
const
int32_t
maxLen
=
MAXLEN
;
char
content
[
MAXLEN
+
1
]
=
{
0
}
;
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d
\n
"
,
deployed
);
...
...
@@ -87,9 +88,8 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
taosWriteFile
(
pFile
,
content
,
len
);
taosFsyncFile
(
pFile
);
taosCloseFile
(
&
pFile
);
taosMemoryFree
(
content
);
char
realfile
[
PATH_MAX
];
char
realfile
[
PATH_MAX
]
=
{
0
}
;
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%s%s.json"
,
pWrapper
->
path
,
TD_DIRSEP
,
pWrapper
->
name
);
if
(
taosRenameFile
(
file
,
realfile
)
!=
0
)
{
...
...
source/dnode/mgmt/container/src/dndMsg.c
浏览文件 @
64c2b51d
...
...
@@ -43,36 +43,40 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
memcpy
(
pMsg
->
user
,
connInfo
.
user
,
TSDB_USER_LEN
);
memcpy
(
&
pMsg
->
rpcMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
return
0
;
}
void
dndProcessRpcMsg
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
,
SEpSet
*
pEpSet
)
{
if
(
pEpSet
&&
pEpSet
->
numOfEps
>
0
&&
pRpc
->
msgType
==
TDMT_MND_STATUS_RSP
)
{
dndUpdateMnodeEpSet
(
pWrapper
->
pDnode
,
pEpSet
);
}
int32_t
code
=
-
1
;
SNodeMsg
*
pMsg
=
NULL
;
NodeMsgFp
msgFp
=
NULL
;
if
(
pEpSet
&&
pEpSet
->
numOfEps
>
0
&&
pRpc
->
msgType
==
TDMT_MND_STATUS_RSP
)
{
dndUpdateMnodeEpSet
(
pWrapper
->
pDnode
,
pEpSet
);
}
if
(
dndMarkWrapper
(
pWrapper
)
!=
0
)
goto
_OVER
;
if
((
msgFp
=
dndGetMsgFp
(
pWrapper
,
pRpc
))
==
NULL
)
goto
_OVER
;
if
((
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)))
==
NULL
)
goto
_OVER
;
if
(
dndBuildMsg
(
pMsg
,
pRpc
)
!=
0
)
goto
_OVER
;
dTrace
(
"msg:%p, is created, handle:%p app:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
,
pMsg
->
user
);
if
(
pWrapper
->
procType
==
PROC_SINGLE
)
{
dTrace
(
"msg:%p, is created, handle:%p app:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
,
pMsg
->
user
);
code
=
(
*
msgFp
)(
pWrapper
->
pMgmt
,
pMsg
);
}
else
if
(
pWrapper
->
procType
==
PROC_PARENT
)
{
dTrace
(
"msg:%p, is created and will put into child queue, handle:%p app:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
,
pMsg
->
user
);
code
=
taosProcPutToChildQueue
(
pWrapper
->
pProc
,
pMsg
,
sizeof
(
SNodeMsg
),
pRpc
->
pCont
,
pRpc
->
contLen
);
}
else
{
dTrace
(
"msg:%p, should not processed in child process, handle:%p app:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
,
pMsg
->
user
);
ASSERT
(
1
);
}
_OVER:
if
(
code
==
0
)
{
if
(
pWrapper
->
procType
==
PROC_PARENT
)
{
dTrace
(
"msg:%p, is freed"
,
pMsg
);
dTrace
(
"msg:%p, is freed
in parent process
"
,
pMsg
);
taosFreeQitem
(
pMsg
);
rpcFreeCont
(
pRpc
->
pCont
);
}
...
...
source/dnode/mgmt/container/src/dndObj.c
浏览文件 @
64c2b51d
...
...
@@ -175,7 +175,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t
code
=
0
;
taosRLockLatch
(
&
pWrapper
->
latch
);
if
(
pWrapper
->
deployed
)
{
if
(
pWrapper
->
deployed
||
(
pWrapper
->
procType
==
PROC_PARENT
&&
pWrapper
->
required
)
)
{
int32_t
refCount
=
atomic_add_fetch_32
(
&
pWrapper
->
refCount
,
1
);
dTrace
(
"node:%s, is marked, refCount:%d"
,
pWrapper
->
name
,
refCount
);
}
else
{
...
...
source/dnode/mgmt/container/src/dndTransport.c
浏览文件 @
64c2b51d
...
...
@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
}
}
void
dndSendRpcRsp
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRsp
)
{
static
void
dndSendRpcRsp
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRsp
)
{
if
(
pRsp
->
code
==
TSDB_CODE_APP_NOT_READY
)
{
SMgmtWrapper
*
pDnodeWrapper
=
dndAcquireWrapper
(
pWrapper
->
pDnode
,
DNODE
);
if
(
pDnodeWrapper
!=
NULL
)
{
...
...
source/dnode/mgmt/vnode/src/vmMsg.c
浏览文件 @
64c2b51d
...
...
@@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CONSUME
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY_HEARTBEAT
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_EXEC
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_PIPE_EXEC
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_MERGE_EXEC
,
(
NodeMsgFp
)
vmProcessMergeMsg
,
VND_VGID
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_STREAM_TRIGGER
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
VND_VGID
);
...
...
source/dnode/mgmt/vnode/src/vmWorker.c
浏览文件 @
64c2b51d
...
...
@@ -160,6 +160,24 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
}
}
static
void
vmProcessMergeQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SNodeMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
dTrace
(
"msg:%p, will be processed in vnode-merge queue"
,
pMsg
);
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
&
pMsg
->
rpcMsg
);
if
(
code
!=
0
)
{
vmSendRsp
(
pVnode
->
pWrapper
,
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
}
}
}
static
int32_t
vmPutNodeMsgToQueue
(
SVnodesMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
,
EQueueType
qtype
)
{
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
int32_t
code
=
-
1
;
...
...
@@ -308,7 +326,7 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t
vmAllocQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
pVnode
->
pWriteQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
vmProcessWriteQueue
);
pVnode
->
pApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
vmProcessApplyQueue
);
pVnode
->
pMergeQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
mergePool
,
pVnode
,
(
FItems
)
vmProcessMerge
Msg
);
pVnode
->
pMergeQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
mergePool
,
pVnode
,
(
FItems
)
vmProcessMerge
Queue
);
pVnode
->
pSyncQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
pFetchQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItem
)
vmProcessFetchQueue
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
...
...
source/dnode/mnode/impl/inc/mndScheduler.h
浏览文件 @
64c2b51d
...
...
@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t
mndSchedInitSubEp
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
);
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
);
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
,
int64_t
smaId
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndStream.h
浏览文件 @
64c2b51d
...
...
@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw
*
mndStreamActionEncode
(
SStreamObj
*
pStream
);
SSdbRow
*
mndStreamActionDecode
(
SSdbRaw
*
pRaw
);
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
);
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
,
int64_t
smaId
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
64c2b51d
...
...
@@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeI32
(
pEncoder
,
sz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pArray
=
taosArrayGet
(
pObj
->
tasks
,
i
);
SArray
*
pArray
=
taosArrayGet
P
(
pObj
->
tasks
,
i
);
int32_t
innerSz
=
taosArrayGetSize
(
pArray
);
if
(
tEncodeI32
(
pEncoder
,
innerSz
)
<
0
)
return
-
1
;
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGet
(
pArray
,
j
);
SStreamTask
*
pTask
=
taosArrayGet
P
(
pArray
,
j
);
if
(
tEncodeSStreamTask
(
pEncoder
,
pTask
)
<
0
)
return
-
1
;
}
}
...
...
@@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t
sz
;
if
(
tDecodeI32
(
pDecoder
,
&
sz
)
<
0
)
return
-
1
;
if
(
sz
!=
0
)
{
pObj
->
tasks
=
taosArrayInit
(
sz
,
sizeof
(
SArray
));
pObj
->
tasks
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
innerSz
;
if
(
tDecodeI32
(
pDecoder
,
&
innerSz
)
<
0
)
return
-
1
;
SArray
*
pArray
=
taosArrayInit
(
innerSz
,
sizeof
(
SStreamTask
));
SArray
*
pArray
=
taosArrayInit
(
innerSz
,
sizeof
(
void
*
));
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
task
;
if
(
tDecodeSStreamTask
(
pDecoder
,
&
task
)
<
0
)
return
-
1
;
taosArrayPush
(
pArray
,
&
task
);
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
return
-
1
;
if
(
tDecodeSStreamTask
(
pDecoder
,
pTask
)
<
0
)
return
-
1
;
taosArrayPush
(
pArray
,
&
pTask
);
}
taosArrayPush
(
pObj
->
tasks
,
pArray
);
taosArrayPush
(
pObj
->
tasks
,
&
pArray
);
}
}
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
64c2b51d
...
...
@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return
pVgroup
;
}
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
)
{
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
,
int64_t
smaId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SQueryPlan
*
pPlan
=
qStringToQueryPlan
(
pStream
->
physicalPlan
);
if
(
pPlan
==
NULL
)
{
...
...
@@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// only for inplace
pTask
->
sinkType
=
TASK_SINK__SHOW
;
pTask
->
showSink
.
reserved
=
0
;
if
(
smaId
!=
-
1
)
{
pTask
->
sinkType
=
TASK_SINK__SMA
;
pTask
->
smaSink
.
smaId
=
smaId
;
}
}
else
{
pTask
->
sinkType
=
TASK_SINK__NONE
;
}
...
...
@@ -185,6 +189,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask
->
dispatchMsgType
=
TDMT_VND_TASK_MERGE_EXEC
;
pTask
->
dispatchType
=
TASK_DISPATCH__FIXED
;
pTask
->
fixedEpDispatcher
.
taskId
=
lastLevelTask
->
taskId
;
pTask
->
fixedEpDispatcher
.
nodeId
=
lastLevelTask
->
nodeId
;
pTask
->
fixedEpDispatcher
.
epSet
=
lastLevelTask
->
epSet
;
}
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
64c2b51d
...
...
@@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {}
static
SSdbRaw
*
mndSmaActionEncode
(
SSmaObj
*
pSma
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
size
=
sizeof
(
SSmaObj
)
+
pSma
->
exprLen
+
pSma
->
tagsFilterLen
+
pSma
->
sqlLen
+
pSma
->
astLen
+
TSDB_SMA_RESERVE_SIZE
;
int32_t
size
=
sizeof
(
SSmaObj
)
+
pSma
->
exprLen
+
pSma
->
tagsFilterLen
+
pSma
->
sqlLen
+
pSma
->
astLen
+
TSDB_SMA_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_SMA
,
TSDB_SMA_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
_OVER
;
...
...
@@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if
(
mndSetCreateSmaRedoLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaCommitLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
,
smaObj
.
uid
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
...
...
@@ -491,7 +492,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
mError
(
"sma:%s, failed to create since stb:%s not exist"
,
createReq
.
name
,
createReq
.
stb
);
goto
_OVER
;
}
pStream
=
mndAcquireStream
(
pMnode
,
createReq
.
name
);
if
(
pStream
!=
NULL
)
{
mError
(
"sma:%s, failed to create since stream:%s already exist"
,
createReq
.
name
,
createReq
.
name
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
64c2b51d
...
...
@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
return
code
;
}
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
)
{
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
,
int64_t
smaId
)
{
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
ast
,
&
pAst
)
<
0
)
{
...
...
@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return
-
1
;
}
if
(
mndScheduleStream
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
if
(
mndScheduleStream
(
pMnode
,
pTrans
,
pStream
,
smaId
)
<
0
)
{
mError
(
"stream:%ld, schedule stream since %s"
,
pStream
->
uid
,
terrstr
());
return
-
1
;
}
...
...
@@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
}
mDebug
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
pCreate
->
name
);
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
)
!=
0
)
{
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
,
-
1
)
!=
0
)
{
mError
(
"trans:%d, failed to add stream since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
64c2b51d
...
...
@@ -21,7 +21,7 @@
#include "mndTrans.h"
#include "tbase64.h"
#define TSDB_USER_VER_NUMBER 1
#define TSDB_USER_VER_NUMBER
1
#define TSDB_USER_RESERVE_SIZE 64
static
int32_t
mndCreateDefaultUsers
(
SMnode
*
pMnode
);
...
...
@@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
userObj
.
updateTime
=
userObj
.
createdTime
;
userObj
.
superUser
=
pCreate
->
superUser
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_CREATE_USER
,
&
pReq
->
rpcMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_CREATE_USER
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
terrstr
());
return
-
1
;
...
...
@@ -350,7 +350,7 @@ CREATE_USER_OVER:
}
static
int32_t
mndUpdateUser
(
SMnode
*
pMnode
,
SUserObj
*
pOld
,
SUserObj
*
pNew
,
SNodeMsg
*
pReq
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_ALTER_USER
,
&
pReq
->
rpcMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_ALTER_USER
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to update since %s"
,
pOld
->
user
,
terrstr
());
return
-
1
;
...
...
@@ -511,7 +511,7 @@ ALTER_USER_OVER:
}
static
int32_t
mndDropUser
(
SMnode
*
pMnode
,
SNodeMsg
*
pReq
,
SUserObj
*
pUser
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_DROP_USER
,
&
pReq
->
rpcMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_DROP_USER
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to drop since %s"
,
pUser
->
user
,
terrstr
());
return
-
1
;
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
64c2b51d
...
...
@@ -198,10 +198,13 @@ int tqCommit(STQ*);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
SRpcMsg
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
);
// sma
void
smaHandleRes
(
SVnode
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
64c2b51d
...
...
@@ -42,8 +42,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq
// TODO: error code of buffer pool
}
#endif
pTq
->
tqMeta
=
tqStoreOpen
(
pTq
,
path
,
(
FTqSerialize
)
tqSerializeConsumer
,
(
FTqDeserialize
)
tqDeserializeConsumer
,
(
FTqDelete
)
taosMemoryFree
,
0
);
pTq
->
tqMeta
=
tqStoreOpen
(
pTq
,
path
,
(
FTqSerialize
)
tqSerializeConsumer
,
(
FTqDeserialize
)
tqDeserializeConsumer
,
(
FTqDelete
)
taosMemoryFree
,
0
);
if
(
pTq
->
tqMeta
==
NULL
)
{
taosMemoryFree
(
pTq
);
#if 0
...
...
@@ -476,6 +476,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if
(
tqExpandTask
(
pTq
,
pTask
,
4
)
<
0
)
{
ASSERT
(
0
);
}
pTask
->
ahandle
=
pTq
->
pVnode
;
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
...
...
@@ -497,13 +498,15 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
return
0
;
}
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
SRpcMsg
*
msg
)
{
SStreamTaskExecReq
*
pReq
=
msg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTaskExecReq
req
;
tDecodeSStreamTaskExecReq
(
msg
,
&
req
);
int32_t
taskId
=
req
.
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
ASSERT
(
pTask
);
if
(
streamExecTask
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
->
data
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
0
)
<
0
)
{
if
(
streamExecTask
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
req
.
data
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
0
)
<
0
)
{
// TODO
}
return
0
;
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
64c2b51d
...
...
@@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in fetch queue is processing"
);
char
*
msgstr
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_FETCH
:
return
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
...
...
@@ -65,8 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return
vnodeGetTableMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_VND_TASK_EXEC
:
return
tqProcessTaskExec
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_VND_TASK_PIPE_EXEC
:
case
TDMT_VND_TASK_MERGE_EXEC
:
return
tqProcessTaskExec
(
pVnode
->
pTq
,
msgstr
,
msgLen
);
case
TDMT_VND_STREAM_TRIGGER
:
return
tqProcessStreamTrigger
(
pVnode
->
pTq
,
pMsg
->
pCont
,
pMsg
->
contLen
);
case
TDMT_VND_QUERY_HEARTBEAT
:
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
64c2b51d
...
...
@@ -178,6 +178,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
case
TDMT_VND_TASK_WRITE_EXEC
:
{
if
(
tqProcessTaskExec
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
#if 0
SSmaCfg vCreateSmaReq = {0};
...
...
source/libs/stream/src/tstream.c
浏览文件 @
64c2b51d
...
...
@@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
//
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaHandle
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
//
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
//
...
...
@@ -121,7 +122,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SStreamTaskExecReq
req
=
{
.
streamId
=
pTask
->
streamId
,
.
taskId
=
pTask
->
taskId
,
.
taskId
=
pTask
->
fixedEpDispatcher
.
taskId
,
.
data
=
pRes
,
};
...
...
@@ -205,14 +206,22 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
!=
TASK_SINK__NONE
)
{
// TODO: wrap
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tEncodeI64
(
pEncoder
,
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
fetchSink
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SHOW
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
showSink
.
reserved
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tEncodeI
8
(
pEncoder
,
pTask
->
inplaceDispatcher
.
reserve
d
)
<
0
)
return
-
1
;
if
(
tEncodeI
32
(
pEncoder
,
pTask
->
inplaceDispatcher
.
taskI
d
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
...
...
@@ -243,13 +252,22 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
!=
TASK_SINK__NON
E
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__TABL
E
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
tbSink
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
fetchSink
.
reserved
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SHOW
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
showSink
.
reserved
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tDecodeI
8
(
pDecoder
,
&
pTask
->
inplaceDispatcher
.
reserve
d
)
<
0
)
return
-
1
;
if
(
tDecodeI
32
(
pDecoder
,
&
pTask
->
inplaceDispatcher
.
taskI
d
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
64c2b51d
...
...
@@ -19,13 +19,13 @@
#include "tref.h"
#include "walInt.h"
int64_t
inline
walGetFirstVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
firstVer
;
}
int64_t
FORCE_INLINE
walGetFirstVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
firstVer
;
}
int64_t
inline
walGetSnaphostVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
snapshotVer
;
}
int64_t
FORCE_INLINE
walGetSnaphostVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
snapshotVer
;
}
int64_t
inline
walGetLastVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
lastVer
;
}
int64_t
FORCE_INLINE
walGetLastVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
lastVer
;
}
static
inline
int
walBuildMetaName
(
SWal
*
pWal
,
int
metaVer
,
char
*
buf
)
{
static
FORCE_INLINE
int
walBuildMetaName
(
SWal
*
pWal
,
int
metaVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
}
...
...
@@ -46,7 +46,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
return
NULL
;
}
static
inline
int64_t
walScanLogGetLastVer
(
SWal
*
pWal
)
{
static
FORCE_INLINE
int64_t
walScanLogGetLastVer
(
SWal
*
pWal
)
{
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
int
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
ASSERT
(
sz
>
0
);
...
...
source/libs/wal/src/walSeek.c
浏览文件 @
64c2b51d
...
...
@@ -74,9 +74,9 @@ int walSetWrite(SWal* pWal) {
}
int
walChangeWrite
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
0
;
int
code
;
TdFilePtr
pIdxTFile
,
pLogTFile
;
char
fnameStr
[
WAL_FILE_LEN
];
char
fnameStr
[
WAL_FILE_LEN
];
if
(
pWal
->
pWriteLogTFile
!=
NULL
)
{
code
=
taosCloseFile
(
&
pWal
->
pWriteLogTFile
);
if
(
code
!=
0
)
{
...
...
@@ -133,7 +133,6 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) {
return
-
1
;
}
if
(
ver
<
pWal
->
vers
.
snapshotVer
)
{
}
if
(
ver
<
walGetCurFileFirstVer
(
pWal
)
||
(
ver
>
walGetCurFileLastVer
(
pWal
)))
{
code
=
walChangeWrite
(
pWal
,
ver
);
...
...
source/util/src/tprocess.c
浏览文件 @
64c2b51d
...
...
@@ -56,7 +56,6 @@ typedef struct SProcObj {
int32_t
pid
;
bool
isChild
;
bool
stopFlag
;
bool
testFlag
;
}
SProcObj
;
static
int32_t
taosProcInitMutex
(
TdThreadMutex
**
ppMutex
,
int32_t
*
pShmid
)
{
...
...
@@ -77,7 +76,7 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
goto
_OVER
;
}
shmid
=
shmget
(
IPC_PRIVATE
,
sizeof
(
TdThreadMutex
),
0600
);
shmid
=
shmget
(
IPC_PRIVATE
,
sizeof
(
TdThreadMutex
),
IPC_CREAT
|
0600
);
if
(
shmid
<=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while shmget since %s"
,
terrstr
());
...
...
@@ -101,8 +100,13 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
_OVER:
if
(
code
!=
0
)
{
taosThreadMutexDestroy
(
pMutex
);
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
if
(
pMutex
!=
NULL
)
{
taosThreadMutexDestroy
(
pMutex
);
shmdt
(
pMutex
);
}
if
(
shmid
>=
0
)
{
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
}
}
else
{
*
ppMutex
=
pMutex
;
*
pShmid
=
shmid
;
...
...
@@ -112,12 +116,12 @@ _OVER:
return
code
;
}
static
void
taosProcDestroyMutex
(
TdThreadMutex
*
pMutex
,
int32_t
*
pS
hmid
)
{
static
void
taosProcDestroyMutex
(
TdThreadMutex
*
pMutex
,
int32_t
s
hmid
)
{
if
(
pMutex
!=
NULL
)
{
taosThreadMutexDestroy
(
pMutex
);
}
if
(
*
pShmid
>
0
)
{
shmctl
(
*
pS
hmid
,
IPC_RMID
,
NULL
);
if
(
shmid
>=
0
)
{
shmctl
(
s
hmid
,
IPC_RMID
,
NULL
);
}
}
...
...
@@ -141,13 +145,14 @@ static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) {
return
shmid
;
}
static
void
taosProcDestroyBuffer
(
void
*
pBuffer
,
int32_t
*
pShmid
)
{
if
(
*
pShmid
>
0
)
{
shmctl
(
*
pShmid
,
IPC_RMID
,
NULL
);
static
void
taosProcDestroyBuffer
(
void
*
pBuffer
,
int32_t
shmid
)
{
if
(
shmid
>
0
)
{
shmdt
(
pBuffer
);
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
}
}
static
SProcQueue
*
taosProc
QueueInit
(
int32_t
size
)
{
static
SProcQueue
*
taosProc
InitQueue
(
int32_t
size
)
{
if
(
size
<=
0
)
size
=
SHM_DEFAULT_SIZE
;
int32_t
bufSize
=
CEIL8
(
size
);
...
...
@@ -155,29 +160,28 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
SProcQueue
*
pQueue
=
NULL
;
int32_t
shmId
=
taosProcInitBuffer
((
void
**
)
&
pQueue
,
bufSize
+
headSize
);
if
(
shmId
<
=
0
)
{
if
(
shmId
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pQueue
->
bufferShmid
=
shmId
;
if
(
taosProcInitMutex
(
&
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
)
!=
0
)
{
taos
MemoryFree
(
pQueue
);
taos
ProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
return
NULL
;
}
if
(
tsem_init
(
&
pQueue
->
sem
,
1
,
0
)
!=
0
)
{
taosProcDestroyMutex
(
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
);
taos
MemoryFree
(
pQueue
);
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taos
ProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
if
(
taosProcInitMutex
(
&
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
)
!=
0
)
{
taosProcDestroyMutex
(
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
);
tsem_destroy
(
&
pQueue
->
sem
);
taosMemoryFree
(
pQueue
);
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
return
NULL
;
}
...
...
@@ -190,12 +194,12 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
return
pQueue
;
}
static
void
taosProc
QueueCleanup
(
SProcQueue
*
pQueue
)
{
static
void
taosProc
CleanupQueue
(
SProcQueue
*
pQueue
)
{
if
(
pQueue
!=
NULL
)
{
uDebug
(
"proc:%s, queue:%p clean up"
,
pQueue
->
name
,
pQueue
);
taosProcDestroyMutex
(
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
);
tsem_destroy
(
&
pQueue
->
sem
);
taosMemoryFree
(
pQueue
);
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
}
}
...
...
@@ -204,6 +208,11 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
const
int32_t
bodyLen
=
CEIL8
(
rawBodyLen
);
const
int32_t
fullLen
=
headLen
+
bodyLen
+
8
;
if
(
headLen
<=
0
||
bodyLen
<=
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
-
1
;
}
taosThreadMutexLock
(
pQueue
->
mutex
);
if
(
fullLen
>
pQueue
->
avail
)
{
taosThreadMutexUnlock
(
pQueue
->
mutex
);
...
...
@@ -255,7 +264,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
taosThreadMutexUnlock
(
pQueue
->
mutex
);
tsem_post
(
&
pQueue
->
sem
);
uTrace
(
"proc:%s, push msg:%p:%d cont:%p:%d to queue:%p"
,
pQueue
->
name
,
pHead
,
rawHeadLen
,
pBody
,
rawB
odyLen
,
pQueue
);
uTrace
(
"proc:%s, push msg:%p:%d cont:%p:%d to queue:%p"
,
pQueue
->
name
,
pHead
,
headLen
,
pBody
,
b
odyLen
,
pQueue
);
return
0
;
}
...
...
@@ -344,12 +353,10 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
}
pProc
->
name
=
pCfg
->
name
;
pProc
->
testFlag
=
pCfg
->
testFlag
;
pProc
->
pChildQueue
=
taosProcQueueInit
(
pCfg
->
childQueueSize
);
pProc
->
pParentQueue
=
taosProcQueueInit
(
pCfg
->
parentQueueSize
);
pProc
->
pChildQueue
=
taosProcInitQueue
(
pCfg
->
childQueueSize
);
pProc
->
pParentQueue
=
taosProcInitQueue
(
pCfg
->
parentQueueSize
);
if
(
pProc
->
pChildQueue
==
NULL
||
pProc
->
pParentQueue
==
NULL
)
{
taosProc
QueueCleanup
(
pProc
->
pChildQueue
);
taosProc
CleanupQueue
(
pProc
->
pChildQueue
);
taosMemoryFree
(
pProc
);
return
NULL
;
}
...
...
@@ -369,17 +376,15 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc
->
pParentQueue
->
freeBodyFp
=
pCfg
->
parentFreeBodyFp
;
pProc
->
pParentQueue
->
consumeFp
=
pCfg
->
parentConsumeFp
;
uDebug
(
"proc:%s, initialized, child queue:%p parent queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
,
pProc
->
pParentQueue
);
uDebug
(
"proc:%s, i
s i
nitialized, child queue:%p parent queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
,
pProc
->
pParentQueue
);
if
(
!
pProc
->
testFlag
)
{
pProc
->
pid
=
fork
();
if
(
pProc
->
pid
==
0
)
{
pProc
->
isChild
=
1
;
uInfo
(
"this is child process, pid:%d"
,
pProc
->
pid
);
}
else
{
pProc
->
isChild
=
0
;
uInfo
(
"this is parent process, pid:%d"
,
pProc
->
pid
);
}
pProc
->
pid
=
fork
();
if
(
pProc
->
pid
==
0
)
{
pProc
->
isChild
=
1
;
prctl
(
PR_SET_NAME
,
pProc
->
name
,
NULL
,
NULL
,
NULL
);
}
else
{
pProc
->
isChild
=
0
;
uInfo
(
"this is parent process, child pid:%d"
,
pProc
->
pid
);
}
return
pProc
;
...
...
@@ -398,7 +403,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
if
(
code
<
0
)
{
uDebug
(
"proc:%s, get no message from queue:%p and exiting"
,
pQueue
->
name
,
pQueue
);
break
;
}
else
if
(
code
<
0
)
{
}
else
if
(
code
==
0
)
{
uTrace
(
"proc:%s, get no message from queue:%p since %s"
,
pQueue
->
name
,
pQueue
,
terrstr
());
taosMsleep
(
1
);
continue
;
...
...
@@ -413,16 +418,14 @@ int32_t taosProcRun(SProcObj *pProc) {
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pProc
->
isChild
||
pProc
->
testFlag
)
{
if
(
pProc
->
isChild
)
{
if
(
taosThreadCreate
(
&
pProc
->
childThread
,
&
thAttr
,
(
ProcThreadFp
)
taosProcThreadLoop
,
pProc
->
pChildQueue
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to create thread since %s"
,
terrstr
());
return
-
1
;
}
uDebug
(
"proc:%s, child start to consume queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
);
}
if
(
!
pProc
->
isChild
||
pProc
->
testFlag
)
{
}
else
{
if
(
taosThreadCreate
(
&
pProc
->
parentThread
,
&
thAttr
,
(
ProcThreadFp
)
taosProcThreadLoop
,
pProc
->
pParentQueue
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to create thread since %s"
,
terrstr
());
...
...
@@ -441,12 +444,14 @@ void taosProcStop(SProcObj *pProc) {
bool
taosProcIsChild
(
SProcObj
*
pProc
)
{
return
pProc
->
isChild
;
}
int32_t
taosProcChildId
(
SProcObj
*
pProc
)
{
return
pProc
->
pid
;
}
void
taosProcCleanup
(
SProcObj
*
pProc
)
{
if
(
pProc
!=
NULL
)
{
uDebug
(
"proc:%s, clean up"
,
pProc
->
name
);
taosProcStop
(
pProc
);
taosProc
QueueCleanup
(
pProc
->
pChildQueue
);
taosProc
QueueCleanup
(
pProc
->
pParentQueue
);
taosProc
CleanupQueue
(
pProc
->
pChildQueue
);
taosProc
CleanupQueue
(
pProc
->
pParentQueue
);
taosMemoryFree
(
pProc
);
}
}
...
...
tests/script/tsim/tmq/basic.sim
浏览文件 @
64c2b51d
...
...
@@ -45,7 +45,7 @@ print cmd===> system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c
system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal
print cmd result----> $system_content
if $system_content != @{consume success: 100}@ then
print not match in pos000
return -1
endi
sql show databases
...
...
tests/test/c/tmqDemo.c
浏览文件 @
64c2b51d
...
...
@@ -314,7 +314,7 @@ int32_t init_env() {
}
//const char* sql = "select * from tu1";
sprintf
(
sqlStr
,
"create topic test_stb_topic_1 as select
* from %s0
"
,
g_stConfInfo
.
stbName
);
sprintf
(
sqlStr
,
"create topic test_stb_topic_1 as select
ts,c0 from %s
"
,
g_stConfInfo
.
stbName
);
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
@@ -351,36 +351,6 @@ tmq_list_t* build_topic_list() {
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
cnt
=
0
;
/*clock_t startTime = clock();*/
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1000
;
...
...
@@ -438,7 +408,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
if
(
batchCnt
!=
totalMsgs
)
{
printf
(
"%s inserted msgs: %d and consume msgs: %d mismatch %s"
,
GREEN
,
totalMsgs
,
batchCnt
,
NC
);
exit
(
-
1
);
/*exit(-1);*/
}
if
(
0
==
g_stConfInfo
.
simCase
)
{
...
...
@@ -691,12 +661,13 @@ int main(int32_t argc, char *argv[]) {
float
rowsSpeed
=
totalRows
/
seconds
;
float
msgsSpeed
=
totalMsgs
/
seconds
;
walLogSize
=
getDirectorySize
(
g_stConfInfo
.
vnodeWalPath
);
if
(
walLogSize
<=
0
)
{
printf
(
"vnode2/wal size incorrect!"
);
/*exit(-1);*/
}
else
{
if
(
0
==
g_stConfInfo
.
simCase
)
{
if
(
0
==
g_stConfInfo
.
simCase
)
{
walLogSize
=
getDirectorySize
(
g_stConfInfo
.
vnodeWalPath
);
if
(
walLogSize
<=
0
)
{
printf
(
"%s size incorrect!"
,
g_stConfInfo
.
vnodeWalPath
);
exit
(
-
1
);
}
else
{
pPrint
(
".log file size in vnode2/wal: %.3f MBytes
\n
"
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录