Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
jobily
TDengine
提交
3ed78dda
T
TDengine
项目概览
jobily
/
TDengine
9 个月 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
3ed78dda
编写于
8月 31, 2023
作者:
H
Haojun Liao
提交者:
GitHub
8月 31, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22662 from taosdata/fix/3_liaohj
refactor: do some internal refactor.
上级
8645f763
ecff0acb
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
159 addition
and
255 deletion
+159
-255
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+2
-3
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+5
-5
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+1
-1
source/dnode/vnode/src/tq/tqRestore.c
source/dnode/vnode/src/tq/tqRestore.c
+8
-8
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+1
-1
source/libs/stream/inc/streamInt.h
source/libs/stream/inc/streamInt.h
+1
-1
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+3
-139
source/libs/stream/src/streamCheckpoint.c
source/libs/stream/src/streamCheckpoint.c
+1
-1
source/libs/stream/src/streamData.c
source/libs/stream/src/streamData.c
+2
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+2
-2
source/libs/stream/src/streamQueue.c
source/libs/stream/src/streamQueue.c
+121
-86
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+7
-2
tests/system-test/6-cluster/clusterCommonCheck.py
tests/system-test/6-cluster/clusterCommonCheck.py
+1
-1
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py
...anually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py
+1
-1
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
3ed78dda
...
...
@@ -189,7 +189,6 @@ int32_t streamInit();
void
streamCleanUp
();
SStreamQueue
*
streamQueueOpen
(
int64_t
cap
);
void
streamQueueCleanup
(
SStreamQueue
*
pQueue
);
void
streamQueueClose
(
SStreamQueue
*
pQueue
,
int32_t
taskId
);
static
FORCE_INLINE
void
streamQueueProcessSuccess
(
SStreamQueue
*
queue
)
{
...
...
@@ -425,8 +424,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg
int32_t
tDecodeStreamTaskChkInfo
(
SDecoder
*
pDecoder
,
SCheckpointInfo
*
pChkpInfo
);
int32_t
tAppendDataToInputQueue
(
SStreamTask
*
pTask
,
SStreamQueueItem
*
pItem
);
bool
tInputQueueIsFull
(
const
SStreamTask
*
pTask
);
int32_t
streamTaskPutDataIntoInputQ
(
SStreamTask
*
pTask
,
SStreamQueueItem
*
pItem
);
bool
streamQueueIsFull
(
const
STaosQueue
*
pQueue
);
typedef
struct
{
SMsgHead
head
;
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
3ed78dda
...
...
@@ -164,7 +164,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int64_t
ver
);
int32_t
tqScanWalForStreamTasks
(
STQ
*
pTq
);
int32_t
tq
SetStreamTasksReady
(
STQ
*
pTq
);
int32_t
tq
CheckAndRunStreamTask
(
STQ
*
pTq
);
int32_t
tqStopStreamTasks
(
STQ
*
pTq
);
// tq util
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
3ed78dda
...
...
@@ -223,11 +223,11 @@ void tqClose(STQ*);
int
tqPushMsg
(
STQ
*
,
tmsg_t
msgType
);
int
tqRegisterPushHandle
(
STQ
*
pTq
,
void
*
handle
,
SRpcMsg
*
pMsg
);
int
tqUnregisterPushHandle
(
STQ
*
pTq
,
void
*
pHandle
);
int
tqS
tartStreamTasksAsync
(
STQ
*
pTq
,
bool
ckPause
);
// restore all stream tasks after vnode launching completed.
int
tqS
canWalAsync
(
STQ
*
pTq
,
bool
ckPause
);
int32_t
tqProcessStreamCheckPointSourceReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessStreamTaskCheckpointReadyMsg
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskUpdateReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tq
SetStreamTasksReady
Async
(
STQ
*
pTq
);
int32_t
tq
CheckAndRunStreamTask
Async
(
STQ
*
pTq
);
int
tqCommit
(
STQ
*
);
int32_t
tqUpdateTbUidList
(
STQ
*
pTq
,
const
SArray
*
tbUidList
,
bool
isAdd
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
3ed78dda
...
...
@@ -1199,7 +1199,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamSetStatusNormal
(
pTask
);
}
tqS
tartStreamTasks
Async
(
pTq
,
false
);
tqS
canWal
Async
(
pTq
,
false
);
}
streamMetaReleaseTask
(
pMeta
,
pTask
);
...
...
@@ -1341,7 +1341,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
if
(
taskId
==
STREAM_EXEC_TASK_STATUS_CHECK_ID
)
{
tq
SetStreamTasksReady
(
pTq
);
tq
CheckAndRunStreamTask
(
pTq
);
return
0
;
}
...
...
@@ -1365,7 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
tqS
tartStreamTasks
Async
(
pTq
,
false
);
tqS
canWal
Async
(
pTq
,
false
);
return
0
;
}
else
{
// NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this
...
...
@@ -1505,7 +1505,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
pTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY
)
{
streamStartScanHistoryAsync
(
pTask
,
igUntreated
);
}
else
if
(
level
==
TASK_LEVEL__SOURCE
&&
(
taosQueueItemSize
(
pTask
->
inputQueue
->
queue
)
==
0
))
{
tqS
tartStreamTasks
Async
(
pTq
,
false
);
tqS
canWal
Async
(
pTq
,
false
);
}
else
{
streamSchedExec
(
pTask
);
}
...
...
@@ -1824,7 +1824,7 @@ _end:
taosWUnLockLatch
(
&
pMeta
->
lock
);
if
(
vnodeIsRoleLeader
(
pTq
->
pVnode
)
&&
!
tsDisableStream
)
{
vInfo
(
"vgId:%d, restart all stream tasks"
,
vgId
);
tq
SetStreamTasksReady
Async
(
pTq
);
tq
CheckAndRunStreamTask
Async
(
pTq
);
}
}
}
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
3ed78dda
...
...
@@ -46,7 +46,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if
((
!
tsDisableStream
)
&&
(
numOfTasks
>
0
)
&&
(
msgType
==
TDMT_VND_SUBMIT
||
msgType
==
TDMT_VND_DELETE
))
{
tqS
tartStreamTasks
Async
(
pTq
,
true
);
tqS
canWal
Async
(
pTq
,
true
);
}
return
0
;
...
...
source/dnode/vnode/src/tq/tqRestore.c
浏览文件 @
3ed78dda
...
...
@@ -56,7 +56,7 @@ int32_t tqScanWalForStreamTasks(STQ* pTq) {
return
0
;
}
int32_t
tq
SetStreamTasksReady
(
STQ
*
pTq
)
{
int32_t
tq
CheckAndRunStreamTask
(
STQ
*
pTq
)
{
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
SStreamMeta
*
pMeta
=
pTq
->
pStreamMeta
;
...
...
@@ -73,18 +73,18 @@ int32_t tqSetStreamTasksReady(STQ* pTq) {
// broadcast the check downstream tasks msg
for
(
int32_t
i
=
0
;
i
<
numOfTasks
;
++
i
)
{
SStreamTaskId
*
pTaskId
=
taosArrayGet
(
pTaskList
,
i
);
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pMeta
,
pTaskId
->
streamId
,
pTaskId
->
taskId
);
SStreamTaskId
*
pTaskId
=
taosArrayGet
(
pTaskList
,
i
);
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pMeta
,
pTaskId
->
streamId
,
pTaskId
->
taskId
);
if
(
pTask
==
NULL
)
{
continue
;
}
// fill-history task can only be launched by related stream tasks.
if
(
pTask
->
info
.
fillHistory
==
1
)
{
streamMetaReleaseTask
(
pMeta
,
pTask
);
continue
;
}
// todo: how about the fill-history task?
if
(
pTask
->
status
.
downstreamReady
==
1
)
{
tqDebug
(
"s-task:%s downstream ready, no need to check downstream, check only related fill-history task"
,
pTask
->
id
.
idStr
);
...
...
@@ -103,7 +103,7 @@ int32_t tqSetStreamTasksReady(STQ* pTq) {
return
0
;
}
int32_t
tq
SetStreamTasksReady
Async
(
STQ
*
pTq
)
{
int32_t
tq
CheckAndRunStreamTask
Async
(
STQ
*
pTq
)
{
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
SStreamMeta
*
pMeta
=
pTq
->
pStreamMeta
;
...
...
@@ -136,7 +136,7 @@ int32_t tqSetStreamTasksReadyAsync(STQ* pTq) {
return
0
;
}
int32_t
tqS
tartStreamTasks
Async
(
STQ
*
pTq
,
bool
ckPause
)
{
int32_t
tqS
canWal
Async
(
STQ
*
pTq
,
bool
ckPause
)
{
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
SStreamMeta
*
pMeta
=
pTq
->
pStreamMeta
;
...
...
@@ -340,7 +340,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue
;
}
if
(
tInputQueueIsFull
(
pTask
))
{
if
(
streamQueueIsFull
(
pTask
->
inputQueue
->
queue
))
{
tqTrace
(
"s-task:%s input queue is full, do nothing"
,
pTask
->
id
.
idStr
);
streamMetaReleaseTask
(
pStreamMeta
,
pTask
);
continue
;
...
...
@@ -386,7 +386,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if
(
pItem
!=
NULL
)
{
noDataInWal
=
false
;
code
=
tAppendDataToInputQueue
(
pTask
,
pItem
);
code
=
streamTaskPutDataIntoInputQ
(
pTask
,
pItem
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
int64_t
ver
=
walReaderGetCurrentVer
(
pTask
->
exec
.
pWalReader
);
pTask
->
chkInfo
.
currentVer
=
ver
;
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
3ed78dda
...
...
@@ -560,7 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo
(
"vgId:%d, not launch stream tasks, since stream tasks are disabled"
,
vgId
);
}
else
{
vInfo
(
"vgId:%d start to launch stream tasks"
,
pVnode
->
config
.
vgId
);
tq
SetStreamTasksReady
Async
(
pVnode
->
pTq
);
tq
CheckAndRunStreamTask
Async
(
pVnode
->
pTq
);
}
}
else
{
vInfo
(
"vgId:%d, sync restore finished, not launch stream tasks since not leader"
,
vgId
);
...
...
source/libs/stream/inc/streamInt.h
浏览文件 @
3ed78dda
...
...
@@ -68,7 +68,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t
streamTaskSendCheckpointSourceRsp
(
SStreamTask
*
pTask
);
int32_t
streamTaskGetNumOfDownstream
(
const
SStreamTask
*
pTask
);
int32_t
extractBlocks
FromInputQ
(
SStreamTask
*
pTask
,
SStreamQueueItem
**
pInput
,
int32_t
*
numOfBlocks
);
int32_t
streamTaskGetData
FromInputQ
(
SStreamTask
*
pTask
,
SStreamQueueItem
**
pInput
,
int32_t
*
numOfBlocks
);
SStreamQueueItem
*
streamMergeQueueItem
(
SStreamQueueItem
*
dst
,
SStreamQueueItem
*
pElem
);
int32_t
streamTaskBuildScanhistoryRspMsg
(
SStreamTask
*
pTask
,
SStreamScanHistoryFinishReq
*
pReq
,
void
**
pBuffer
,
int32_t
*
pLen
);
...
...
source/libs/stream/src/stream.c
浏览文件 @
3ed78dda
...
...
@@ -16,9 +16,6 @@
#include "streamInt.h"
#include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
SStreamGlobalEnv
streamEnv
;
int32_t
streamInit
()
{
...
...
@@ -85,7 +82,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
atomic_store_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__INACTIVE
);
pTrigger
->
pBlock
->
info
.
type
=
STREAM_GET_ALL
;
if
(
tAppendDataToInputQueue
(
pTask
,
(
SStreamQueueItem
*
)
pTrigger
)
<
0
)
{
if
(
streamTaskPutDataIntoInputQ
(
pTask
,
(
SStreamQueueItem
*
)
pTrigger
)
<
0
)
{
taosFreeQitem
(
pTrigger
);
taosTmrReset
(
streamSchedByTimer
,
(
int32_t
)
pTask
->
triggerParam
,
pTask
,
streamEnv
.
timer
,
&
pTask
->
schedTimer
);
return
;
...
...
@@ -172,7 +169,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
pTask
->
status
.
appendTranstateBlock
=
true
;
}
int32_t
code
=
tAppendDataToInputQueue
(
pTask
,
(
SStreamQueueItem
*
)
pBlock
);
int32_t
code
=
streamTaskPutDataIntoInputQ
(
pTask
,
(
SStreamQueueItem
*
)
pBlock
);
// input queue is full, upstream is blocked now
status
=
(
code
==
TSDB_CODE_SUCCESS
)
?
TASK_INPUT_STATUS__NORMAL
:
TASK_INPUT_STATUS__BLOCKED
;
}
...
...
@@ -192,7 +189,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
pData
->
type
=
STREAM_INPUT__DATA_RETRIEVE
;
pData
->
srcVgId
=
0
;
streamRetrieveReqToData
(
pReq
,
pData
);
if
(
tAppendDataToInputQueue
(
pTask
,
(
SStreamQueueItem
*
)
pData
)
==
0
)
{
if
(
streamTaskPutDataIntoInputQ
(
pTask
,
(
SStreamQueueItem
*
)
pData
)
==
0
)
{
status
=
TASK_INPUT_STATUS__NORMAL
;
}
else
{
status
=
TASK_INPUT_STATUS__FAILED
;
...
...
@@ -239,47 +236,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return
0
;
}
// static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
// int8_t status = 0;
//
// SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
// if (pBlock == NULL) {
// streamTaskInputFail(pTask);
// status = TASK_INPUT_STATUS__FAILED;
// qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
// pTask->id.idStr);
// } else {
// if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
// pTask->status.appendTranstateBlock = true;
// }
//
// int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// // input queue is full, upstream is blocked now
// status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
// }
//
// return status;
// }
// static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void**
// pBuf) {
// *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
// if (*pBuf == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
//
// ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
// SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
//
// pDispatchRsp->inputStatus = status;
// pDispatchRsp->streamId = htobe64(pReq->streamId);
// pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
// pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
// pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
// pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
//
// return TSDB_CODE_SUCCESS;
// }
int32_t
streamProcessDispatchMsg
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
,
bool
exec
)
{
qDebug
(
"s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%"
PRId64
,
pTask
->
id
.
idStr
,
pReq
->
upstreamTaskId
,
pReq
->
upstreamNodeId
,
pReq
->
totalLen
);
...
...
@@ -343,98 +299,6 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
return
0
;
}
bool
tInputQueueIsFull
(
const
SStreamTask
*
pTask
)
{
bool
isFull
=
taosQueueItemSize
((
pTask
->
inputQueue
->
queue
))
>=
STREAM_TASK_INPUT_QUEUE_CAPACITY
;
double
size
=
QUEUE_MEM_SIZE_IN_MB
(
pTask
->
inputQueue
->
queue
);
return
(
isFull
||
size
>=
STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE
);
}
int32_t
tAppendDataToInputQueue
(
SStreamTask
*
pTask
,
SStreamQueueItem
*
pItem
)
{
int8_t
type
=
pItem
->
type
;
int32_t
total
=
taosQueueItemSize
(
pTask
->
inputQueue
->
queue
)
+
1
;
double
size
=
QUEUE_MEM_SIZE_IN_MB
(
pTask
->
inputQueue
->
queue
);
if
(
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
px
=
(
SStreamDataSubmit
*
)
pItem
;
if
((
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
)
&&
tInputQueueIsFull
(
pTask
))
{
qError
(
"s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push "
"data"
,
pTask
->
id
.
idStr
,
STREAM_TASK_INPUT_QUEUE_CAPACITY
,
STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE
,
total
,
size
);
streamDataSubmitDestroy
(
px
);
taosFreeQitem
(
pItem
);
return
-
1
;
}
int32_t
msgLen
=
px
->
submit
.
msgLen
;
int64_t
ver
=
px
->
submit
.
ver
;
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
streamDataSubmitDestroy
(
px
);
taosFreeQitem
(
pItem
);
return
code
;
}
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
qDebug
(
"s-task:%s submit enqueue msgLen:%d ver:%"
PRId64
", total in queue:%d, size:%.2fMiB"
,
pTask
->
id
.
idStr
,
msgLen
,
ver
,
total
,
size
+
SIZE_IN_MB
(
msgLen
));
}
else
if
(
type
==
STREAM_INPUT__DATA_BLOCK
||
type
==
STREAM_INPUT__DATA_RETRIEVE
||
type
==
STREAM_INPUT__REF_DATA_BLOCK
)
{
if
(
/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */
(
tInputQueueIsFull
(
pTask
)))
{
qError
(
"s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort"
,
pTask
->
id
.
idStr
,
STREAM_TASK_INPUT_QUEUE_CAPACITY
,
STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE
,
total
,
size
);
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
return
-
1
;
}
qDebug
(
"s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB"
,
pTask
->
id
.
idStr
,
total
,
size
);
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
return
code
;
}
}
else
if
(
type
==
STREAM_INPUT__CHECKPOINT
||
type
==
STREAM_INPUT__CHECKPOINT_TRIGGER
||
type
==
STREAM_INPUT__TRANS_STATE
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
qDebug
(
"s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB"
,
pTask
->
id
.
idStr
,
pTask
->
info
.
taskLevel
,
streamGetBlockTypeStr
(
type
),
total
,
size
);
}
else
if
(
type
==
STREAM_INPUT__GET_RES
)
{
// use the default memory limit, refactor later.
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
qDebug
(
"s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)"
,
pTask
->
id
.
idStr
,
total
,
size
);
}
else
{
ASSERT
(
0
);
}
if
(
type
!=
STREAM_INPUT__GET_RES
&&
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
)
{
atomic_val_compare_exchange_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__INACTIVE
,
TASK_TRIGGER_STATUS__ACTIVE
);
qDebug
(
"s-task:%s new data arrived, active the trigger, triggerStatus:%d"
,
pTask
->
id
.
idStr
,
pTask
->
triggerStatus
);
}
return
0
;
}
static
void
*
streamQueueCurItem
(
SStreamQueue
*
queue
)
{
return
queue
->
qItem
;
}
void
*
streamQueueNextItem
(
SStreamQueue
*
pQueue
)
{
int8_t
flag
=
atomic_exchange_8
(
&
pQueue
->
status
,
STREAM_QUEUE__PROCESSING
);
if
(
flag
==
STREAM_QUEUE__FAILED
)
{
ASSERT
(
pQueue
->
qItem
!=
NULL
);
return
streamQueueCurItem
(
pQueue
);
}
else
{
pQueue
->
qItem
=
NULL
;
taosGetQitem
(
pQueue
->
qall
,
&
pQueue
->
qItem
);
if
(
pQueue
->
qItem
==
NULL
)
{
taosReadAllQitems
(
pQueue
->
queue
,
pQueue
->
qall
);
taosGetQitem
(
pQueue
->
qall
,
&
pQueue
->
qItem
);
}
return
streamQueueCurItem
(
pQueue
);
}
}
void
streamTaskInputFail
(
SStreamTask
*
pTask
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
}
void
streamTaskOpenAllUpstreamInput
(
SStreamTask
*
pTask
)
{
...
...
source/libs/stream/src/streamCheckpoint.c
浏览文件 @
3ed78dda
...
...
@@ -124,7 +124,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
taosArrayPush
(
pChkpoint
->
blocks
,
pBlock
);
taosMemoryFree
(
pBlock
);
if
(
tAppendDataToInputQueue
(
pTask
,
(
SStreamQueueItem
*
)
pChkpoint
)
<
0
)
{
if
(
streamTaskPutDataIntoInputQ
(
pTask
,
(
SStreamQueueItem
*
)
pChkpoint
)
<
0
)
{
taosFreeQitem
(
pChkpoint
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
source/libs/stream/src/streamData.c
浏览文件 @
3ed78dda
...
...
@@ -166,6 +166,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
return
0
;
}
// todo handle memory error
SStreamQueueItem
*
streamMergeQueueItem
(
SStreamQueueItem
*
dst
,
SStreamQueueItem
*
pElem
)
{
terrno
=
0
;
...
...
@@ -195,7 +196,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
taosFreeQitem
(
pElem
);
return
(
SStreamQueueItem
*
)
pMerged
;
}
else
{
qDebug
(
"block type:%
d not merged with existed blocks list, type:%d"
,
pElem
->
type
,
dst
->
type
);
qDebug
(
"block type:%
s not merged with existed blocks list, type:%d"
,
streamGetBlockTypeStr
(
pElem
->
type
)
,
dst
->
type
);
return
NULL
;
}
}
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
3ed78dda
...
...
@@ -368,7 +368,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pDelBlock
->
info
.
version
=
0
;
pItem
->
type
=
STREAM_INPUT__REF_DATA_BLOCK
;
pItem
->
pBlock
=
pDelBlock
;
int32_t
code
=
tAppendDataToInputQueue
(
pStreamTask
,
(
SStreamQueueItem
*
)
pItem
);
int32_t
code
=
streamTaskPutDataIntoInputQ
(
pStreamTask
,
(
SStreamQueueItem
*
)
pItem
);
qDebug
(
"s-task:%s append dummy delete block,res:%d"
,
pStreamTask
->
id
.
idStr
,
code
);
}
...
...
@@ -517,7 +517,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// merge multiple input data if possible in the input queue.
qDebug
(
"s-task:%s start to extract data block from inputQ"
,
id
);
/*int32_t code = */
extractBlocks
FromInputQ
(
pTask
,
&
pInput
,
&
numOfBlocks
);
/*int32_t code = */
streamTaskGetData
FromInputQ
(
pTask
,
&
pInput
,
&
numOfBlocks
);
if
(
pInput
==
NULL
)
{
ASSERT
(
numOfBlocks
==
0
);
return
0
;
...
...
source/libs/stream/src/streamQueue.c
浏览文件 @
3ed78dda
...
...
@@ -15,8 +15,30 @@
#include "streamInt.h"
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
// todo refactor:
// read data from input queue
typedef
struct
SQueueReader
{
SStreamQueue
*
pQueue
;
int32_t
taskLevel
;
int32_t
maxBlocks
;
// maximum block in one batch
int32_t
waitDuration
;
// maximum wait time to format several block into a batch to process, unit: ms
}
SQueueReader
;
static
void
streamQueueCleanup
(
SStreamQueue
*
pQueue
)
{
void
*
qItem
=
NULL
;
while
((
qItem
=
streamQueueNextItem
(
pQueue
))
!=
NULL
)
{
streamFreeQitem
(
qItem
);
}
pQueue
->
status
=
STREAM_QUEUE__SUCESS
;
}
static
void
*
streamQueueCurItem
(
SStreamQueue
*
queue
)
{
return
queue
->
qItem
;
}
SStreamQueue
*
streamQueueOpen
(
int64_t
cap
)
{
SStreamQueue
*
pQueue
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamQueue
));
...
...
@@ -40,21 +62,6 @@ SStreamQueue* streamQueueOpen(int64_t cap) {
return
pQueue
;
}
void
streamQueueCleanup
(
SStreamQueue
*
pQueue
)
{
void
*
qItem
=
NULL
;
while
((
qItem
=
streamQueueNextItem
(
pQueue
))
!=
NULL
)
{
streamFreeQitem
(
qItem
);
}
pQueue
->
status
=
STREAM_QUEUE__SUCESS
;
}
// void streamQueueClose(SStreamQueue* pQueue) {
// streamQueueCleanup(pQueue);
// taosFreeQall(pQueue->qall);
// taosCloseQueue(pQueue->queue);
// taosMemoryFree(pQueue);
// }
void
streamQueueClose
(
SStreamQueue
*
pQueue
,
int32_t
taskId
)
{
qDebug
(
"s-task:0x%x free the queue:%p, items in queue:%d"
,
taskId
,
pQueue
->
queue
,
taosQueueItemSize
(
pQueue
->
queue
));
streamQueueCleanup
(
pQueue
);
...
...
@@ -64,6 +71,24 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
taosMemoryFree
(
pQueue
);
}
void
*
streamQueueNextItem
(
SStreamQueue
*
pQueue
)
{
int8_t
flag
=
atomic_exchange_8
(
&
pQueue
->
status
,
STREAM_QUEUE__PROCESSING
);
if
(
flag
==
STREAM_QUEUE__FAILED
)
{
ASSERT
(
pQueue
->
qItem
!=
NULL
);
return
streamQueueCurItem
(
pQueue
);
}
else
{
pQueue
->
qItem
=
NULL
;
taosGetQitem
(
pQueue
->
qall
,
&
pQueue
->
qItem
);
if
(
pQueue
->
qItem
==
NULL
)
{
taosReadAllQitems
(
pQueue
->
queue
,
pQueue
->
qall
);
taosGetQitem
(
pQueue
->
qall
,
&
pQueue
->
qItem
);
}
return
streamQueueCurItem
(
pQueue
);
}
}
#if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
//
...
...
@@ -122,64 +147,13 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
}
#endif
// todo refactor:
// read data from input queue
typedef
struct
SQueueReader
{
SStreamQueue
*
pQueue
;
int32_t
taskLevel
;
int32_t
maxBlocks
;
// maximum block in one batch
int32_t
waitDuration
;
// maximum wait time to format several block into a batch to process, unit: ms
}
SQueueReader
;
#if 0
SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) {
int32_t numOfBlocks = 0;
int32_t tryCount = 0;
SStreamQueueItem* pRet = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue);
if (qItem == NULL) {
if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) {
tryCount++;
taosMsleep(1);
qDebug("try again batchSize:%d", numOfBlocks);
continue;
}
qDebug("break batchSize:%d", numOfBlocks);
break;
}
if (pRet == NULL) {
pRet = qItem;
streamQueueProcessSuccess(pReader->pQueue);
if (pReader->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = NULL;
if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) {
streamQueueProcessFail(pReader->pQueue);
break;
} else {
numOfBlocks++;
pRet = newRet;
streamQueueProcessSuccess(pReader->pQueue);
if (numOfBlocks > pReader->maxBlocks) {
qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr);
break;
}
}
}
}
return pRet;
bool
streamQueueIsFull
(
const
STaosQueue
*
pQueue
)
{
bool
isFull
=
taosQueueItemSize
((
STaosQueue
*
)
pQueue
)
>=
STREAM_TASK_INPUT_QUEUE_CAPACITY
;
double
size
=
QUEUE_MEM_SIZE_IN_MB
((
STaosQueue
*
)
pQueue
);
return
(
isFull
||
size
>=
STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE
);
}
#endif
int32_t
extractBlocks
FromInputQ
(
SStreamTask
*
pTask
,
SStreamQueueItem
**
pInput
,
int32_t
*
numOfBlocks
)
{
int32_t
streamTaskGetData
FromInputQ
(
SStreamTask
*
pTask
,
SStreamQueueItem
**
pInput
,
int32_t
*
numOfBlocks
)
{
int32_t
retryTimes
=
0
;
int32_t
MAX_RETRY_TIMES
=
5
;
const
char
*
id
=
pTask
->
id
.
idStr
;
...
...
@@ -205,7 +179,6 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
}
}
// non sink task
while
(
1
)
{
if
(
streamTaskShouldPause
(
&
pTask
->
status
)
||
streamTaskShouldStop
(
&
pTask
->
status
))
{
qDebug
(
"s-task:%s task should pause, extract input blocks:%d"
,
pTask
->
id
.
idStr
,
*
numOfBlocks
);
...
...
@@ -227,24 +200,17 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
// do not merge blocks for sink node and check point data block
if
(
qItem
->
type
==
STREAM_INPUT__CHECKPOINT
||
qItem
->
type
==
STREAM_INPUT__CHECKPOINT_TRIGGER
||
qItem
->
type
==
STREAM_INPUT__TRANS_STATE
)
{
if
(
*
pInput
==
NULL
)
{
char
*
p
=
NULL
;
if
(
qItem
->
type
==
STREAM_INPUT__CHECKPOINT
)
{
p
=
"checkpoint"
;
}
else
if
(
qItem
->
type
==
STREAM_INPUT__CHECKPOINT_TRIGGER
)
{
p
=
"checkpoint-trigger"
;
}
else
{
p
=
"transtate"
;
}
const
char
*
p
=
streamGetBlockTypeStr
(
qItem
->
type
);
if
(
*
pInput
==
NULL
)
{
qDebug
(
"s-task:%s %s msg extracted, start to process immediately"
,
id
,
p
);
*
numOfBlocks
=
1
;
*
pInput
=
qItem
;
return
TSDB_CODE_SUCCESS
;
}
else
{
// previous existed blocks needs to be handle, before handle the checkpoint msg block
qDebug
(
"s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d"
,
id
,
*
numOfBlocks
);
qDebug
(
"s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d"
,
id
,
p
,
*
numOfBlocks
);
streamQueueProcessFail
(
pTask
->
inputQueue
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -256,7 +222,11 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
// todo we need to sort the data block, instead of just appending into the array list.
void
*
newRet
=
streamMergeQueueItem
(
*
pInput
,
qItem
);
if
(
newRet
==
NULL
)
{
qError
(
"s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d"
,
id
,
*
numOfBlocks
);
if
(
terrno
!=
0
)
{
qError
(
"s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s"
,
id
,
*
numOfBlocks
,
tstrerror
(
terrno
));
}
streamQueueProcessFail
(
pTask
->
inputQueue
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -274,3 +244,68 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
}
}
}
int32_t
streamTaskPutDataIntoInputQ
(
SStreamTask
*
pTask
,
SStreamQueueItem
*
pItem
)
{
int8_t
type
=
pItem
->
type
;
int32_t
total
=
taosQueueItemSize
(
pTask
->
inputQueue
->
queue
)
+
1
;
double
size
=
QUEUE_MEM_SIZE_IN_MB
(
pTask
->
inputQueue
->
queue
);
if
(
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
px
=
(
SStreamDataSubmit
*
)
pItem
;
if
((
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
)
&&
streamQueueIsFull
(
pTask
->
inputQueue
->
queue
))
{
qError
(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data"
,
pTask
->
id
.
idStr
,
STREAM_TASK_INPUT_QUEUE_CAPACITY
,
STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE
,
total
,
size
);
streamDataSubmitDestroy
(
px
);
taosFreeQitem
(
pItem
);
return
-
1
;
}
int32_t
msgLen
=
px
->
submit
.
msgLen
;
int64_t
ver
=
px
->
submit
.
ver
;
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
streamDataSubmitDestroy
(
px
);
taosFreeQitem
(
pItem
);
return
code
;
}
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
qDebug
(
"s-task:%s submit enqueue msgLen:%d ver:%"
PRId64
", total in queue:%d, size:%.2fMiB"
,
pTask
->
id
.
idStr
,
msgLen
,
ver
,
total
,
size
+
SIZE_IN_MB
(
msgLen
));
}
else
if
(
type
==
STREAM_INPUT__DATA_BLOCK
||
type
==
STREAM_INPUT__DATA_RETRIEVE
||
type
==
STREAM_INPUT__REF_DATA_BLOCK
)
{
if
(
streamQueueIsFull
(
pTask
->
inputQueue
->
queue
))
{
qError
(
"s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort"
,
pTask
->
id
.
idStr
,
STREAM_TASK_INPUT_QUEUE_CAPACITY
,
STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE
,
total
,
size
);
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
return
-
1
;
}
qDebug
(
"s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB"
,
pTask
->
id
.
idStr
,
total
,
size
);
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
return
code
;
}
}
else
if
(
type
==
STREAM_INPUT__CHECKPOINT
||
type
==
STREAM_INPUT__CHECKPOINT_TRIGGER
||
type
==
STREAM_INPUT__TRANS_STATE
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
qDebug
(
"s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB"
,
pTask
->
id
.
idStr
,
pTask
->
info
.
taskLevel
,
streamGetBlockTypeStr
(
type
),
total
,
size
);
}
else
if
(
type
==
STREAM_INPUT__GET_RES
)
{
// use the default memory limit, refactor later.
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
qDebug
(
"s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)"
,
pTask
->
id
.
idStr
,
total
,
size
);
}
else
{
ASSERT
(
0
);
}
if
(
type
!=
STREAM_INPUT__GET_RES
&&
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
)
{
atomic_val_compare_exchange_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__INACTIVE
,
TASK_TRIGGER_STATUS__ACTIVE
);
qDebug
(
"s-task:%s new data arrived, active the trigger, triggerStatus:%d"
,
pTask
->
id
.
idStr
,
pTask
->
triggerStatus
);
}
return
0
;
}
source/libs/stream/src/streamRecover.c
浏览文件 @
3ed78dda
...
...
@@ -388,7 +388,7 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
taosArrayPush
(
pTranstate
->
blocks
,
pBlock
);
taosMemoryFree
(
pBlock
);
if
(
tAppendDataToInputQueue
(
pTask
,
(
SStreamQueueItem
*
)
pTranstate
)
<
0
)
{
if
(
streamTaskPutDataIntoInputQ
(
pTask
,
(
SStreamQueueItem
*
)
pTranstate
)
<
0
)
{
taosFreeQitem
(
pTranstate
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -624,7 +624,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
return
TSDB_CODE_SUCCESS
;
}
checkFillhistoryTaskStatus
(
pTask
,
*
pHTask
);
if
((
*
pHTask
)
->
status
.
downstreamReady
==
1
)
{
qDebug
(
"s-task:%s fill-history task is ready, no need to check downstream"
,
(
*
pHTask
)
->
id
.
idStr
);
}
else
{
checkFillhistoryTaskStatus
(
pTask
,
*
pHTask
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
tests/system-test/6-cluster/clusterCommonCheck.py
浏览文件 @
3ed78dda
...
...
@@ -261,7 +261,7 @@ class ClusterComCheck:
count
+=
1
else
:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
notice
(
f
"elections of
{
db_name
}
all vgroups are failed in
{
count
}
s "
)
tdLog
.
notice
(
f
"elections of
{
db_name
}
all vgroups are failed in
{
count
}
s "
)
caller
=
inspect
.
getframeinfo
(
inspect
.
stack
()[
1
][
0
])
args
=
(
caller
.
filename
,
caller
.
lineno
)
tdLog
.
exit
(
"%s(%d) failed "
%
args
)
...
...
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py
浏览文件 @
3ed78dda
...
...
@@ -27,7 +27,7 @@ class TDTestCase:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
tdSql
.
init
(
conn
.
cursor
()
,
True
)
self
.
host
=
socket
.
gethostname
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录