Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
82890ee5
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
82890ee5
编写于
7月 11, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-11274-3.0
上级
3572520d
a52c089d
变更
12
显示空白变更内容
内联
并排
Showing
12 changed file
with
177 addition
and
63 deletion
+177
-63
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+16
-7
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-0
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+36
-32
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+0
-1
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+1
-0
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+24
-11
source/libs/scheduler/src/schStatus.c
source/libs/scheduler/src/schStatus.c
+1
-0
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+78
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+8
-5
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+9
-5
未找到文件。
source/client/src/clientImpl.c
浏览文件 @
82890ee5
...
...
@@ -810,11 +810,16 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
void
schedulerExecCb
(
SExecResult
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
pRequest
->
code
=
code
;
if
(
pResult
)
{
memcpy
(
&
pRequest
->
body
.
resInfo
.
execRes
,
pResult
,
sizeof
(
*
pResult
));
}
if
(
TDMT_VND_SUBMIT
==
pRequest
->
type
||
TDMT_VND_DELETE
==
pRequest
->
type
||
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)
{
if
(
pResult
)
{
pRequest
->
body
.
resInfo
.
numOfRows
=
pResult
->
numOfRows
;
}
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
}
...
...
@@ -1476,12 +1481,16 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
tsem_wait
(
&
pParam
->
sem
);
}
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
&&
pResultInfo
->
numOfRows
>
0
&&
setupOneRowPtr
)
{
if
(
pResultInfo
->
numOfRows
==
0
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
else
{
if
(
setupOneRowPtr
)
{
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
}
return
pResultInfo
->
row
;
}
}
static
int32_t
doPrepareResPtr
(
SReqResultInfo
*
pResInfo
)
{
...
...
source/common/src/tdatablock.c
浏览文件 @
82890ee5
...
...
@@ -1747,7 +1747,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
if
(
colDataIsNull
(
pColInfoData
,
rows
,
j
,
NULL
)
||
!
var
)
{
if
(
colDataIsNull
(
pColInfoData
,
rows
,
j
,
NULL
)
||
!
pColInfoData
->
pData
)
{
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15s |"
,
"NULL"
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
continue
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
82890ee5
...
...
@@ -779,6 +779,8 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
);
void
getAlignQueryTimeWindow
(
SInterval
*
pInterval
,
int32_t
precision
,
int64_t
key
,
STimeWindow
*
win
);
STimeWindow
getFirstQualifiedTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
);
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
82890ee5
...
...
@@ -869,7 +869,7 @@ static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
return
w
;
}
static
STimeWindow
getFirstQualifiedTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
,
SInterval
*
pInterval
,
int32_t
order
)
{
STimeWindow
getFirstQualifiedTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
,
SInterval
*
pInterval
,
int32_t
order
)
{
int32_t
factor
=
(
order
==
TSDB_ORDER_ASC
)
?
-
1
:
1
;
STimeWindow
win
=
*
pWindow
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
82890ee5
...
...
@@ -545,9 +545,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
if
(
pCtx
[
k
].
fpSet
.
process
==
NULL
)
{
continue
;
}
#ifdef BUF_PAGE_DEBUG
qDebug
(
"page_process"
);
#endif
int32_t
code
=
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s aggregate function error happens, code: %s"
,
GET_TASKID
(
pOperator
->
pTaskInfo
),
tstrerror
(
code
));
...
...
@@ -585,7 +583,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
if
(
pExpr
[
k
].
pExpr
->
nodeType
==
QUERY_NODE_COLUMN
)
{
// it is a project query
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pResult
->
pDataBlock
,
outputSlotId
);
if
(
pResult
->
info
.
rows
>
0
&&
!
createNewColModel
)
{
colDataMergeCol
(
pColInfoData
,
pResult
->
info
.
rows
,
&
pResult
->
info
.
capacity
,
pInputData
->
pData
[
0
],
pInputData
->
numOfRows
);
colDataMergeCol
(
pColInfoData
,
pResult
->
info
.
rows
,
&
pResult
->
info
.
capacity
,
pInputData
->
pData
[
0
],
pInputData
->
numOfRows
);
}
else
{
colDataAssign
(
pColInfoData
,
pInputData
->
pData
[
0
],
pInputData
->
numOfRows
,
&
pResult
->
info
);
}
...
...
@@ -647,7 +646,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
// todo handle the json tag
SColumnInfoData
*
pInput
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
slotId
);
for
(
int32_t
f
=
0
;
f
<
pSrcBlock
->
info
.
rows
;
++
f
)
{
for
(
int32_t
f
=
0
;
f
<
pSrcBlock
->
info
.
rows
;
++
f
)
{
bool
isNull
=
colDataIsNull_s
(
pInput
,
f
);
if
(
isNull
)
{
colDataAppendNULL
(
pOutput
,
pResult
->
info
.
rows
+
f
);
...
...
@@ -2443,7 +2442,6 @@ _error:
doDestroyExchangeOperatorInfo
(
pInfo
);
}
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -3393,6 +3391,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
assert
(
pBlock
!=
NULL
);
}
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsCol
);
if
(
*
newgroup
&&
pInfo
->
totalInputRows
>
0
)
{
// there are already processed current group data block
pInfo
->
existNewGroupBlock
=
pBlock
;
*
newgroup
=
false
;
...
...
@@ -3821,7 +3821,8 @@ _error:
return
NULL
;
}
static
void
doHandleDataBlock
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
SOperatorInfo
*
downstream
,
SExecTaskInfo
*
pTaskInfo
)
{
static
void
doHandleDataBlock
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
SOperatorInfo
*
downstream
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
order
=
0
;
int32_t
scanFlag
=
0
;
...
...
@@ -3876,7 +3877,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
while
(
1
)
{
// here we need to handle the existsed group results
if
(
pIndefInfo
->
pNextGroupRes
!=
NULL
)
{
// todo extract method
for
(
int32_t
k
=
0
;
k
<
pSup
->
numOfExprs
;
++
k
)
{
...
...
@@ -3978,7 +3979,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pCondition
=
pPhyNode
->
node
.
pConditions
;
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pSup
->
pCtx
,
numOfExpr
);
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pSup
->
pCtx
,
numOfExpr
);
pOperator
->
name
=
"IndefinitOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC
;
...
...
@@ -4010,6 +4011,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
STimeWindow
w
=
TSWINDOW_INITIALIZER
;
getAlignQueryTimeWindow
(
pInterval
,
pInterval
->
precision
,
win
.
skey
,
&
w
);
w
=
getFirstQualifiedTimeWindow
(
win
.
skey
,
&
w
,
pInterval
,
TSDB_ORDER_ASC
);
int32_t
order
=
TSDB_ORDER_ASC
;
pInfo
->
pFillInfo
=
taosCreateFillInfo
(
order
,
w
.
skey
,
0
,
capacity
,
numOfCols
,
pInterval
,
fillType
,
pColInfo
,
id
);
...
...
@@ -4048,8 +4050,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo
->
primaryTsCol
=
((
SColumnNode
*
)
pPhyFillNode
->
pWStartTs
)
->
slotId
;
int32_t
numOfOutputCols
=
0
;
SArray
*
pColMatchColInfo
=
extractColMatchInfo
(
pPhyFillNode
->
pTargets
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
SArray
*
pColMatchColInfo
=
extractColMatchInfo
(
pPhyFillNode
->
pTargets
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
int32_t
code
=
initFillInfo
(
pInfo
,
pExprInfo
,
num
,
(
SNodeListNode
*
)
pPhyFillNode
->
pValues
,
pPhyFillNode
->
timeRange
,
pResultInfo
->
capacity
,
pTaskInfo
->
id
.
str
,
pInterval
,
type
);
...
...
@@ -4118,6 +4120,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
pTaskInfo
->
schemaVer
.
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
stbEntry
.
schemaRow
);
pTaskInfo
->
schemaVer
.
tversion
=
mr
.
me
.
stbEntry
.
schemaTag
.
version
;
}
else
if
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
tDecoderClear
(
&
mr
.
coder
);
tb_uid_t
suid
=
mr
.
me
.
ctbEntry
.
suid
;
metaGetTableEntryByUid
(
&
mr
,
suid
);
pTaskInfo
->
schemaVer
.
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
stbEntry
.
schemaRow
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
82890ee5
...
...
@@ -4502,7 +4502,6 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
}
size_t
rows
=
pRes
->
info
.
rows
;
blockDataUpdateTsWindow
(
pRes
,
iaInfo
->
primaryTsIndex
);
pOperator
->
resultInfo
.
totalRows
+=
rows
;
return
(
rows
==
0
)
?
NULL
:
pRes
;
}
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
82890ee5
...
...
@@ -472,6 +472,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t
schGetTaskFromList
(
SHashObj
*
pTaskList
,
uint64_t
taskId
,
SSchTask
**
pTask
);
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
,
int32_t
levelNum
);
int32_t
schSwitchTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
void
schDirectPostJobRes
(
SSchedulerReq
*
pReq
,
int32_t
errCode
);
extern
SSchDebug
gSCHDebug
;
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
82890ee5
...
...
@@ -763,6 +763,17 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
return
TSDB_CODE_SUCCESS
;
}
void
schDirectPostJobRes
(
SSchedulerReq
*
pReq
,
int32_t
errCode
)
{
if
(
NULL
==
pReq
||
pReq
->
syncReq
)
{
return
;
}
if
(
pReq
->
execFp
)
{
(
*
pReq
->
execFp
)(
NULL
,
pReq
->
cbParam
,
errCode
);
}
else
if
(
pReq
->
fetchFp
)
{
(
*
pReq
->
fetchFp
)(
NULL
,
pReq
->
cbParam
,
errCode
);
}
}
void
schProcessOnOpEnd
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
)
{
int32_t
op
=
0
;
...
...
@@ -801,17 +812,13 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
int32_t
schProcessOnOpBegin
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_JOB_ELOG
(
"abort op %s cause of job need to stop, status:%s"
,
schGetOpStr
(
type
),
jobTaskStatusStr
(
status
));
SCH_ERR_RET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
switch
(
type
)
{
case
SCH_OP_EXEC
:
if
(
SCH_OP_NULL
!=
atomic_val_compare_exchange_32
(
&
pJob
->
opStatus
.
op
,
SCH_OP_NULL
,
type
))
{
SCH_JOB_ELOG
(
"job already in %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
schDirectPostJobRes
(
pReq
,
TSDB_CODE_TSC_APP_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_TSC_APP_ERROR
);
}
...
...
@@ -822,11 +829,16 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
case
SCH_OP_FETCH
:
if
(
SCH_OP_NULL
!=
atomic_val_compare_exchange_32
(
&
pJob
->
opStatus
.
op
,
SCH_OP_NULL
,
type
))
{
SCH_JOB_ELOG
(
"job already in %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
schDirectPostJobRes
(
pReq
,
TSDB_CODE_TSC_APP_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_TSC_APP_ERROR
);
}
SCH_JOB_DLOG
(
"job start %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
pJob
->
userRes
.
fetchRes
=
pReq
->
pFetchRes
;
pJob
->
userRes
.
fetchFp
=
pReq
->
fetchFp
;
pJob
->
userRes
.
cbParam
=
pReq
->
cbParam
;
pJob
->
opStatus
.
syncReq
=
pReq
->
syncReq
;
if
(
!
SCH_JOB_NEED_FETCH
(
pJob
))
{
...
...
@@ -839,10 +851,6 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
pJob
->
userRes
.
fetchRes
=
pReq
->
pFetchRes
;
pJob
->
userRes
.
fetchFp
=
pReq
->
fetchFp
;
pJob
->
userRes
.
cbParam
=
pReq
->
cbParam
;
break
;
case
SCH_OP_GET_STATUS
:
if
(
pJob
->
status
<
JOB_TASK_STATUS_INIT
||
pJob
->
levelNum
<=
0
||
NULL
==
pJob
->
levels
)
{
...
...
@@ -855,6 +863,11 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_ERR_RET
(
TSDB_CODE_TSC_APP_ERROR
);
}
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_JOB_ELOG
(
"abort op %s cause of job need to stop, status:%s"
,
schGetOpStr
(
type
),
jobTaskStatusStr
(
status
));
SCH_ERR_RET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/scheduler/src/schStatus.c
浏览文件 @
82890ee5
...
...
@@ -77,6 +77,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
int32_t
code
=
errCode
;
if
(
NULL
==
pJob
)
{
schDirectPostJobRes
(
pReq
,
errCode
);
SCH_RET
(
code
);
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
82890ee5
...
...
@@ -201,6 +201,43 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
return
SYNC_TERM_INVALID
;
}
static
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
SyncIndex
index
=
0
;
SWalSyncInfo
syncMeta
;
syncMeta
.
isWeek
=
pEntry
->
isWeak
;
syncMeta
.
seqNum
=
pEntry
->
seqNum
;
syncMeta
.
term
=
pEntry
->
term
;
index
=
walAppendLog
(
pWal
,
pEntry
->
originalRpcType
,
syncMeta
,
pEntry
->
data
,
pEntry
->
dataLen
);
if
(
index
<
0
)
{
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
sysErr
=
errno
;
const
char
*
sysErrStr
=
strerror
(
errno
);
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal write error, index:%"
PRId64
", err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pEntry
->
index
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
syncNodeErrorLog
(
pData
->
pSyncNode
,
logBuf
);
ASSERT
(
0
);
return
-
1
;
}
pEntry
->
index
=
index
;
do
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"write index:%"
PRId64
", type:%s,%d, type2:%s,%d"
,
pEntry
->
index
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
syncNodeEventLog
(
pData
->
pSyncNode
,
eventLog
);
}
while
(
0
);
return
0
;
}
#if 0
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
...
...
@@ -243,6 +280,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
return code;
}
#endif
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
...
...
@@ -361,6 +399,8 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
//-------------------------------
// log[0 .. n]
#if 0
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
...
...
@@ -397,6 +437,44 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
return code;
}
#endif
int32_t
logStoreAppendEntry
(
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
SyncIndex
index
=
0
;
SWalSyncInfo
syncMeta
;
syncMeta
.
isWeek
=
pEntry
->
isWeak
;
syncMeta
.
seqNum
=
pEntry
->
seqNum
;
syncMeta
.
term
=
pEntry
->
term
;
index
=
walAppendLog
(
pWal
,
pEntry
->
originalRpcType
,
syncMeta
,
pEntry
->
data
,
pEntry
->
dataLen
);
if
(
index
<
0
)
{
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
sysErr
=
errno
;
const
char
*
sysErrStr
=
strerror
(
errno
);
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal write error, index:%"
PRId64
", err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pEntry
->
index
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
syncNodeErrorLog
(
pData
->
pSyncNode
,
logBuf
);
ASSERT
(
0
);
return
-
1
;
}
pEntry
->
index
=
index
;
do
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"write2 index:%"
PRId64
", type:%s,%d, type2:%s,%d"
,
pEntry
->
index
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
syncNodeEventLog
(
pData
->
pSyncNode
,
eventLog
);
}
while
(
0
);
return
0
;
}
SSyncRaftEntry
*
logStoreGetEntry
(
SSyncLogStore
*
pLogStore
,
SyncIndex
index
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
82890ee5
...
...
@@ -573,8 +573,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return
;
}
if
(
nread
<
0
)
{
tWarn
(
"%s conn %p read error:%s, ref:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
uv_err_name
(
nread
),
T_REF_VAL_GET
(
conn
));
tWarn
(
"%s conn %p read error:%s, ref:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
uv_err_name
(
nread
),
T_REF_VAL_GET
(
conn
));
conn
->
broken
=
true
;
cliHandleExcept
(
conn
);
}
...
...
@@ -650,7 +649,11 @@ static bool cliHandleNoResp(SCliConn* conn) {
return
res
;
}
static
void
cliSendCb
(
uv_write_t
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
->
data
;
SCliConn
*
pConn
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
taosMemoryFree
(
req
);
if
(
pConn
==
NULL
)
{
return
;
}
if
(
status
==
0
)
{
tTrace
(
"%s conn %p data already was written out"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
...
...
@@ -708,8 +711,8 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP
(
pConn
);
}
pConn
->
writeReq
.
data
=
pConn
;
uv_write
(
&
pConn
->
writeR
eq
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
cliSendCb
);
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
))
;
uv_write
(
r
eq
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
cliSendCb
);
return
;
_RETURN:
return
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
82890ee5
...
...
@@ -265,8 +265,8 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg
.
info
.
refId
=
pConn
->
refId
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
tGTrace
(
"%s handle %p conn:%p translated to app, refId:%"
PRIu64
,
transLabel
(
pTransInst
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
tGTrace
(
"%s handle %p conn:%p translated to app, refId:%"
PRIu64
,
transLabel
(
pTransInst
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
assert
(
transMsg
.
info
.
handle
!=
NULL
);
if
(
pHead
->
noResp
==
1
)
{
...
...
@@ -331,7 +331,10 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
}
void
uvOnSendCb
(
uv_write_t
*
req
,
int
status
)
{
SSvrConn
*
conn
=
req
->
data
;
SSvrConn
*
conn
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
taosMemoryFree
(
req
);
if
(
conn
==
NULL
)
return
;
if
(
status
==
0
)
{
tTrace
(
"conn %p data already was written on stream"
,
conn
);
if
(
!
transQueueEmpty
(
&
conn
->
srvMsgs
))
{
...
...
@@ -390,7 +393,6 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead
->
traceId
=
pMsg
->
info
.
traceId
;
pHead
->
hasEpSet
=
pMsg
->
info
.
hasEpSet
;
if
(
pConn
->
status
==
ConnNormal
)
{
pHead
->
msgType
=
(
0
==
pMsg
->
msgType
?
pConn
->
inType
+
1
:
pMsg
->
msgType
);
}
else
{
...
...
@@ -433,7 +435,9 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
uvPrepareSendData
(
smsg
,
&
wb
);
transRefSrvHandle
(
pConn
);
uv_write
(
&
pConn
->
pWriter
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
));
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
}
static
void
uvStartSendResp
(
SSvrMsg
*
smsg
)
{
// impl
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录