Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
65805509
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
65805509
编写于
10月 25, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/3.0_bugfix_wxy
上级
b7056f58
0a007131
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
194 addition
and
64 deletion
+194
-64
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+8
-2
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+14
-3
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+15
-6
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+1
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+12
-14
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+7
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+42
-20
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+18
-2
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+1
-1
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+2
-1
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+69
-11
source/util/src/tfunctional.c
source/util/src/tfunctional.c
+4
-2
未找到文件。
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
65805509
...
...
@@ -1062,7 +1062,7 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids)
if
(
tdbTbcMoveTo
(
pCursor
->
pCur
,
&
ctimeKey
,
sizeof
(
ctimeKey
),
&
cmp
)
<
0
)
{
goto
END
;
}
bool
first
=
true
;
int32_t
valid
=
0
;
while
(
1
)
{
void
*
entryKey
=
NULL
;
...
...
@@ -1074,7 +1074,13 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids)
int32_t
cmp
=
(
*
param
->
filterFunc
)((
void
*
)
&
p
->
ctime
,
(
void
*
)
&
pCtimeKey
->
ctime
,
param
->
type
);
if
(
cmp
==
0
)
taosArrayPush
(
pUids
,
&
p
->
uid
);
if
(
cmp
==
-
1
)
break
;
if
(
param
->
reverse
==
false
)
{
if
(
cmp
==
-
1
)
break
;
}
else
if
(
param
->
reverse
)
{
if
(
cmp
==
1
)
break
;
}
valid
=
param
->
reverse
?
tdbTbcMoveToPrev
(
pCursor
->
pCur
)
:
tdbTbcMoveToNext
(
pCursor
->
pCur
);
if
(
valid
<
0
)
break
;
}
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
65805509
...
...
@@ -572,8 +572,12 @@ static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) {
}
static
int
metaBuildNColIdxKey
(
SNcolIdxKey
*
ncolKey
,
const
SMetaEntry
*
pME
)
{
ncolKey
->
ncol
=
pME
->
ntbEntry
.
schemaRow
.
nCols
;
ncolKey
->
uid
=
pME
->
uid
;
if
(
pME
->
type
==
TSDB_NORMAL_TABLE
)
{
ncolKey
->
ncol
=
pME
->
ntbEntry
.
schemaRow
.
nCols
;
ncolKey
->
uid
=
pME
->
uid
;
}
else
{
return
-
1
;
}
return
0
;
}
...
...
@@ -777,9 +781,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno
=
TSDB_CODE_VND_INVALID_TABLE_ACTION
;
goto
_err
;
}
// search the column to add/drop/update
pSchema
=
&
entry
.
ntbEntry
.
schemaRow
;
// save old entry
SMetaEntry
oldEntry
=
{.
type
=
TSDB_NORMAL_TABLE
,
.
uid
=
entry
.
uid
};
oldEntry
.
ntbEntry
.
schemaRow
.
nCols
=
pSchema
->
nCols
;
int32_t
iCol
=
0
;
for
(;;)
{
pColumn
=
NULL
;
...
...
@@ -872,6 +880,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
entry
.
version
=
version
;
metaDeleteNcolIdx
(
pMeta
,
&
oldEntry
);
metaUpdateNcolIdx
(
pMeta
,
&
entry
);
// do actual write
metaWLock
(
pMeta
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
65805509
...
...
@@ -111,7 +111,7 @@ typedef struct SDataBlockIter {
int32_t
index
;
SArray
*
blockList
;
// SArray<SFileDataBlockInfo>
int32_t
order
;
SDataBlk
block
;
// current SDataBlk data
SDataBlk
block
;
// current SDataBlk data
SHashObj
*
pTableMap
;
}
SDataBlockIter
;
...
...
@@ -1209,19 +1209,19 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
(
pVerRange
->
maxVer
<
pBlock
->
maxVer
&&
pVerRange
->
maxVer
>=
pBlock
->
minVer
);
}
static
SDataBlk
*
getNeighborBlockOfSameTable
(
SFileDataBlockInfo
*
p
F
BlockInfo
,
STableBlockScanInfo
*
pTableBlockScanInfo
,
static
SDataBlk
*
getNeighborBlockOfSameTable
(
SFileDataBlockInfo
*
pBlockInfo
,
STableBlockScanInfo
*
pTableBlockScanInfo
,
int32_t
*
nextIndex
,
int32_t
order
)
{
bool
asc
=
ASCENDING_TRAVERSE
(
order
);
if
(
asc
&&
p
F
BlockInfo
->
tbBlockIdx
>=
taosArrayGetSize
(
pTableBlockScanInfo
->
pBlockList
)
-
1
)
{
if
(
asc
&&
pBlockInfo
->
tbBlockIdx
>=
taosArrayGetSize
(
pTableBlockScanInfo
->
pBlockList
)
-
1
)
{
return
NULL
;
}
if
(
!
asc
&&
p
F
BlockInfo
->
tbBlockIdx
==
0
)
{
if
(
!
asc
&&
pBlockInfo
->
tbBlockIdx
==
0
)
{
return
NULL
;
}
int32_t
step
=
asc
?
1
:
-
1
;
*
nextIndex
=
p
F
BlockInfo
->
tbBlockIdx
+
step
;
*
nextIndex
=
pBlockInfo
->
tbBlockIdx
+
step
;
SDataBlk
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SDataBlk
));
int32_t
*
indexInMapdata
=
taosArrayGet
(
pTableBlockScanInfo
->
pBlockList
,
*
nextIndex
);
...
...
@@ -1631,7 +1631,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
code
=
doMergeRowsInBuf
(
pIter
,
pBlockScanInfo
->
uid
,
k
.
ts
,
pBlockScanInfo
->
delSkyline
,
&
merge
,
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
||
merge
.
pTSchema
==
NULL
)
{
return
code
;
}
}
...
...
@@ -3768,6 +3768,15 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return
false
;
}
bool
tsdbTableNextDataBlock
(
STsdbReader
*
pReader
,
uint64_t
uid
)
{
STableBlockScanInfo
*
pBlockScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
uid
,
sizeof
(
uid
));
if
(
pBlockScanInfo
==
NULL
)
{
// no data block for the table of given uid
return
false
;
}
return
true
;
}
static
void
setBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
)
{
ASSERT
(
pDataBlockInfo
!=
NULL
&&
pReader
!=
NULL
);
pDataBlockInfo
->
rows
=
pReader
->
pResBlock
->
info
.
rows
;
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
65805509
...
...
@@ -279,7 +279,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
}
size_t
size
=
taosArrayGetSize
(
pColMatchInfo
->
pList
);
SArray
*
pMatchInfo
=
taosArrayInit
(
size
,
sizeof
(
SColMatchI
nfo
));
SArray
*
pMatchInfo
=
taosArrayInit
(
size
,
sizeof
(
SColMatchI
tem
));
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SColMatchItem
*
pColInfo
=
taosArrayGet
(
pColMatchInfo
->
pList
,
i
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
65805509
...
...
@@ -1073,7 +1073,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
pMatchInfo
->
matchType
=
type
;
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColMatchI
nfo
));
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColMatchI
tem
));
if
(
pList
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
code
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
65805509
...
...
@@ -2881,7 +2881,7 @@ int optSysDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
default:
return
-
1
;
}
return
1
;
return
cmp
;
}
static
int
optSysFilterFuncImpl__LowerThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
...
...
@@ -2987,10 +2987,6 @@ static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
.
val
=
pVal
->
datum
.
p
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
int32_t
ret
=
metaFilterCreateTime
(
pMeta
,
&
param
,
result
);
if
(
ret
==
0
)
return
0
;
return
-
1
;
}
...
...
@@ -3002,15 +2998,17 @@ static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
bool
reverse
=
false
;
__optSysFilter
func
=
optSysGetFilterFunc
(
pOper
->
opType
,
&
reverse
);
SMetaFltParam
param
=
{.
suid
=
0
,
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_BIGINT
,
.
val
=
&
pVal
->
datum
.
i
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
int32_t
ret
=
metaFilterCreateTime
(
pMeta
,
&
param
,
result
);
if
(
func
==
NULL
)
return
-
1
;
return
0
;
SMetaFltParam
param
=
{.
suid
=
0
,
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_BIGINT
,
.
val
=
&
pVal
->
datum
.
i
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
int32_t
ret
=
metaFilterCreateTime
(
pMeta
,
&
param
,
result
);
return
ret
;
}
static
int32_t
sysFilte__Ncolumn
(
void
*
arg
,
SNode
*
pNode
,
SArray
*
result
)
{
void
*
pMeta
=
((
SSTabFltArg
*
)
arg
)
->
pMeta
;
...
...
@@ -3073,7 +3071,7 @@ static int32_t sysChkFilter__Comm(SNode* pNode) {
SOperatorNode
*
pOper
=
(
SOperatorNode
*
)
pNode
;
EOperatorType
opType
=
pOper
->
opType
;
if
(
opType
!=
OP_TYPE_EQUAL
&&
opType
!=
OP_TYPE_LOWER_EQUAL
&&
opType
!=
OP_TYPE_LOWER_THAN
&&
OP_TYPE_GREATER_EQUAL
&&
opType
!=
OP_TYPE_GREATER_THAN
)
{
opType
!=
OP_TYPE_GREATER_EQUAL
&&
opType
!=
OP_TYPE_GREATER_THAN
)
{
return
-
1
;
}
return
0
;
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
65805509
...
...
@@ -45,6 +45,11 @@
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
void
syncMaybeAdvanceCommitIndex
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
==
NULL
)
{
sError
(
"pSyncNode is NULL"
);
return
;
}
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
syncNodeErrorLog
(
pSyncNode
,
"not leader, can not advance commit index"
);
return
;
...
...
@@ -172,6 +177,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
int32_t
syncNodeDynamicQuorum
(
const
SSyncNode
*
pSyncNode
)
{
return
pSyncNode
->
quorum
;
#if 0
int32_t quorum = 1; // self
int64_t timeNow = taosGetTimestampMs();
...
...
@@ -228,6 +234,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
}
return quorum;
#endif
}
/*
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
65805509
...
...
@@ -835,7 +835,9 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
sInfo
(
"vgId:%d, sync get retry epset: index:%d %s:%d"
,
pSyncNode
->
vgId
,
i
,
pEpSet
->
eps
[
i
].
fqdn
,
pEpSet
->
eps
[
i
].
port
);
}
pEpSet
->
inUse
=
(
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
+
1
)
%
pEpSet
->
numOfEps
;
if
(
pEpSet
->
numOfEps
>
0
)
{
pEpSet
->
inUse
=
(
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
+
1
)
%
pEpSet
->
numOfEps
;
}
sInfo
(
"vgId:%d, sync get retry epset in-use:%d"
,
pSyncNode
->
vgId
,
pEpSet
->
inUse
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -1438,12 +1440,13 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
syncNodeEventLog
(
pSyncNode
,
"sync close"
);
if
(
pSyncNode
==
NULL
)
{
return
;
}
int32_t
ret
;
syncNodeEventLog
(
pSyncNode
,
"sync close"
);
ret
=
raftStoreClose
(
pSyncNode
->
pRaftStore
);
ASSERT
(
ret
==
0
);
...
...
@@ -1879,6 +1882,10 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
}
inline
void
syncNodeEventLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
)
{
if
(
pSyncNode
==
NULL
)
{
return
;
}
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
};
if
(
pSyncNode
->
pFsm
!=
NULL
&&
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
...
...
@@ -1954,6 +1961,10 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
}
inline
void
syncNodeErrorLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
)
{
if
(
pSyncNode
==
NULL
)
{
return
;
}
int32_t
userStrLen
=
strlen
(
str
);
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
};
...
...
@@ -2937,6 +2948,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
sTrace
(
"syncNodeEqNoop pSyncNode->FpEqMsg is NULL"
);
}
syncEntryDestory
(
pEntry
);
taosMemoryFree
(
serialized
);
syncClientRequestDestroy
(
pSyncMsg
);
...
...
@@ -3003,13 +3015,14 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
syncPingReply2RpcMsg
(
pMsgReply
,
&
rpcMsg
);
/*
// htonl
SMsgHead* pHead = rpcMsg.pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
*/
// htonl
SMsgHead* pHead = rpcMsg.pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
*/
syncNodeSendMsgById
(
&
pMsgReply
->
destId
,
ths
,
&
rpcMsg
);
syncPingReplyDestroy
(
pMsgReply
);
return
ret
;
}
...
...
@@ -3058,6 +3071,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
// reply
syncNodeSendMsgById
(
&
pMsgReply
->
destId
,
ths
,
&
rpcMsg
);
syncHeartbeatReplyDestroy
(
pMsgReply
);
return
0
;
}
...
...
@@ -3329,17 +3343,23 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
return
0
;
}
// advance commit index to sanpshot first
SSnapshot
snapshot
=
{
0
};
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>=
0
&&
snapshot
.
lastApplyIndex
>=
beginIndex
)
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit by snapshot from index:%"
PRId64
" to index:%"
PRId64
,
beginIndex
,
snapshot
.
lastApplyIndex
);
syncNodeEventLog
(
ths
,
eventLog
);
if
(
ths
==
NULL
)
{
return
-
1
;
}
if
(
ths
->
pFsm
!=
NULL
&&
ths
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
// advance commit index to sanpshot first
SSnapshot
snapshot
=
{
0
};
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>=
0
&&
snapshot
.
lastApplyIndex
>=
beginIndex
)
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit by snapshot from index:%"
PRId64
" to index:%"
PRId64
,
beginIndex
,
snapshot
.
lastApplyIndex
);
syncNodeEventLog
(
ths
,
eventLog
);
// update begin index
beginIndex
=
snapshot
.
lastApplyIndex
+
1
;
// update begin index
beginIndex
=
snapshot
.
lastApplyIndex
+
1
;
}
}
int32_t
code
=
0
;
...
...
@@ -3413,8 +3433,10 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// config change finish
if
(
pEntry
->
originalRpcType
==
TDMT_SYNC_CONFIG_CHANGE_FINISH
)
{
code
=
syncNodeConfigChangeFinish
(
ths
,
&
rpcMsg
,
pEntry
);
ASSERT
(
code
==
0
);
if
(
rpcMsg
.
pCont
!=
NULL
)
{
code
=
syncNodeConfigChangeFinish
(
ths
,
&
rpcMsg
,
pEntry
);
ASSERT
(
code
==
0
);
}
}
#if 0
...
...
@@ -3528,7 +3550,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
&
(
pSyncNode
->
peersId
)[
i
]);
if
(
pSender
->
start
)
{
if
(
pSender
!=
NULL
&&
pSender
->
start
)
{
sError
(
"sync cannot change3"
);
return
false
;
}
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
65805509
...
...
@@ -411,32 +411,40 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
pMsg
->
bytes
=
bytes
;
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU32
(
&
decoder
,
&
pMsg
->
msgType
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU64
(
&
decoder
,
&
pMsg
->
srcId
.
addr
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
srcId
.
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU64
(
&
decoder
,
&
pMsg
->
destId
.
addr
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
destId
.
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU32
(
&
decoder
,
&
pMsg
->
dataLen
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
uint32_t
len
;
char
*
data
=
NULL
;
if
(
tDecodeBinary
(
&
decoder
,
(
uint8_t
**
)(
&
data
),
&
len
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
ASSERT
(
len
=
pMsg
->
dataLen
);
ASSERT
(
len
=
=
pMsg
->
dataLen
);
memcpy
(
pMsg
->
data
,
data
,
len
);
tEndDecode
(
&
decoder
);
...
...
@@ -673,32 +681,40 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
pMsg
->
bytes
=
bytes
;
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU32
(
&
decoder
,
&
pMsg
->
msgType
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU64
(
&
decoder
,
&
pMsg
->
srcId
.
addr
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
srcId
.
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU64
(
&
decoder
,
&
pMsg
->
destId
.
addr
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
destId
.
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU32
(
&
decoder
,
&
pMsg
->
dataLen
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
uint32_t
len
;
char
*
data
=
NULL
;
if
(
tDecodeBinary
(
&
decoder
,
(
uint8_t
**
)(
&
data
),
&
len
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
ASSERT
(
len
=
pMsg
->
dataLen
);
ASSERT
(
len
=
=
pMsg
->
dataLen
);
memcpy
(
pMsg
->
data
,
data
,
len
);
tEndDecode
(
&
decoder
);
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
65805509
...
...
@@ -532,7 +532,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index,
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
raftEntryCacheGetEntryP
(
pCache
,
index
,
&
pEntry
);
if
(
code
==
1
)
{
*
ppEntry
=
taosMemoryMalloc
(
pEntry
->
bytes
);
*
ppEntry
=
taosMemoryMalloc
(
(
int64_t
)(
pEntry
->
bytes
)
);
memcpy
(
*
ppEntry
,
pEntry
,
pEntry
->
bytes
);
(
*
ppEntry
)
->
rid
=
-
1
;
}
else
{
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
65805509
...
...
@@ -209,7 +209,8 @@ bool syncUtilCanPrint(char c) {
}
char
*
syncUtilprintBin
(
char
*
ptr
,
uint32_t
len
)
{
char
*
s
=
taosMemoryMalloc
(
len
+
1
);
int64_t
memLen
=
(
int64_t
)(
len
+
1
);
char
*
s
=
taosMemoryMalloc
(
memLen
);
ASSERT
(
s
!=
NULL
);
memset
(
s
,
0
,
len
+
1
);
memcpy
(
s
,
ptr
,
len
);
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
65805509
...
...
@@ -374,7 +374,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
}
else
{
tError
(
"fail to dispatch conn to work thread"
);
}
uv_close
((
uv_handle_t
*
)
req
->
data
,
uvFreeCb
);
if
(
!
uv_is_closing
((
uv_handle_t
*
)
req
->
data
))
{
uv_close
((
uv_handle_t
*
)
req
->
data
,
uvFreeCb
);
}
else
{
taosMemoryFree
(
req
->
data
);
}
taosMemoryFree
(
req
);
}
...
...
@@ -651,12 +655,14 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_tcp_init
(
pObj
->
loop
,
cli
);
if
(
uv_accept
(
stream
,
(
uv_stream_t
*
)
cli
)
==
0
)
{
#ifdef WINDOWS
if
(
pObj
->
numOfWorkerReady
<
pObj
->
numOfThreads
)
{
tError
(
"worker-threads are not ready for all, need %d instead of %d."
,
pObj
->
numOfThreads
,
pObj
->
numOfWorkerReady
);
uv_close
((
uv_handle_t
*
)
cli
,
NULL
);
return
;
}
#endif
uv_write_t
*
wr
=
(
uv_write_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_write_t
));
wr
->
data
=
cli
;
...
...
@@ -668,7 +674,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_write2
(
wr
,
(
uv_stream_t
*
)
&
(
pObj
->
pipe
[
pObj
->
workerIdx
][
0
]),
&
buf
,
1
,
(
uv_stream_t
*
)
cli
,
uvOnPipeWriteCb
);
}
else
{
uv_close
((
uv_handle_t
*
)
cli
,
NULL
);
if
(
!
uv_is_closing
((
uv_handle_t
*
)
cli
))
{
uv_close
((
uv_handle_t
*
)
cli
,
NULL
);
}
else
{
taosMemoryFree
(
cli
);
}
}
}
void
uvOnConnectionCb
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
...
...
@@ -681,7 +691,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tWarn
(
"failed to create connect:%p"
,
q
);
taosMemoryFree
(
buf
->
base
);
uv_close
((
uv_handle_t
*
)
q
,
NULL
);
// taosMemoryFree(q);
return
;
}
// free memory allocated by
...
...
@@ -770,7 +779,12 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
return
false
;
}
#ifdef WINDOWS
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
#else
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
uv_pipe_open
(
pThrd
->
pipe
,
pThrd
->
fd
);
#endif
pThrd
->
pipe
->
data
=
pThrd
;
...
...
@@ -785,8 +799,11 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
QUEUE_INIT
(
&
pThrd
->
conn
);
pThrd
->
asyncPool
=
transAsyncPoolCreate
(
pThrd
->
loop
,
5
,
pThrd
,
uvWorkerAsyncCb
);
#ifdef WINDOWS
uv_pipe_connect
(
&
pThrd
->
connect_req
,
pThrd
->
pipe
,
pipeName
,
uvOnPipeConnectionCb
);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
#else
uv_read_start
((
uv_stream_t
*
)
pThrd
->
pipe
,
uvAllocConnBufferCb
,
uvOnConnectionCb
);
#endif
return
true
;
}
...
...
@@ -958,20 +975,19 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv
->
port
=
port
;
uv_loop_init
(
srv
->
loop
);
char
pipeName
[
PATH_MAX
];
#ifdef WINDOWS
int
ret
=
uv_pipe_init
(
srv
->
loop
,
&
srv
->
pipeListen
,
0
);
if
(
ret
!=
0
)
{
tError
(
"failed to init pipe, errmsg: %s"
,
uv_err_name
(
ret
));
goto
End
;
}
#ifdef WINDOWS
char
pipeName
[
64
];
snprintf
(
pipeName
,
sizeof
(
pipeName
),
"
\\\\
?
\\
pipe
\\
trans.rpc.%d-%"
PRIu64
,
taosSafeRand
(),
GetCurrentProcessId
());
#else
char
pipeName
[
PATH_MAX
]
=
{
0
};
snprintf
(
pipeName
,
sizeof
(
pipeName
),
"%s%spipe.trans.rpc.%08d-%"
PRIu64
,
tsTempDir
,
TD_DIRSEP
,
taosSafeRand
(),
taosGetSelfPthreadId
());
#endif
// char pipeName[PATH_MAX] = {0};
// snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(),
// taosGetSelfPthreadId());
ret
=
uv_pipe_bind
(
&
srv
->
pipeListen
,
pipeName
);
if
(
ret
!=
0
)
{
tError
(
"failed to bind pipe, errmsg: %s"
,
uv_err_name
(
ret
));
...
...
@@ -997,6 +1013,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
if
(
false
==
addHandleToWorkloop
(
thrd
,
pipeName
))
{
goto
End
;
}
int
err
=
taosThreadCreate
(
&
(
thrd
->
thread
),
NULL
,
transWorkerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"success to create worker-thread:%d"
,
i
);
...
...
@@ -1006,14 +1023,54 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto
End
;
}
}
#else
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
SWorkThrd
*
thrd
=
(
SWorkThrd
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SWorkThrd
));
thrd
->
pTransInst
=
shandle
;
thrd
->
quit
=
false
;
thrd
->
pTransInst
=
shandle
;
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
taosMemoryCalloc
(
2
,
sizeof
(
uv_pipe_t
));
srv
->
pThreadObj
[
i
]
=
thrd
;
uv_os_sock_t
fds
[
2
];
if
(
uv_socketpair
(
AF_UNIX
,
SOCK_STREAM
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
goto
End
;
}
uv_pipe_init
(
srv
->
loop
,
&
(
srv
->
pipe
[
i
][
0
]),
1
);
uv_pipe_open
(
&
(
srv
->
pipe
[
i
][
0
]),
fds
[
1
]);
thrd
->
pipe
=
&
(
srv
->
pipe
[
i
][
1
]);
// init read
thrd
->
fd
=
fds
[
0
];
if
(
false
==
addHandleToWorkloop
(
thrd
,
pipeName
))
{
goto
End
;
}
int
err
=
taosThreadCreate
(
&
(
thrd
->
thread
),
NULL
,
transWorkerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"success to create worker-thread:%d"
,
i
);
}
else
{
// TODO: clear all other resource later
tError
(
"failed to create worker-thread:%d"
,
i
);
goto
End
;
}
}
#endif
if
(
false
==
taosValidIpAndPort
(
srv
->
ip
,
srv
->
port
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"invalid ip/port, %d:%d, reason:%s"
,
srv
->
ip
,
srv
->
port
,
terrstr
());
goto
End
;
}
if
(
false
==
addHandleToAcceptloop
(
srv
))
{
goto
End
;
}
int
err
=
taosThreadCreate
(
&
srv
->
thread
,
NULL
,
transAcceptThread
,
(
void
*
)
srv
);
if
(
err
==
0
)
{
tDebug
(
"success to create accept-thread"
);
...
...
@@ -1022,6 +1079,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto
End
;
// clear all resource later
}
srv
->
inited
=
true
;
return
srv
;
End:
...
...
source/util/src/tfunctional.c
浏览文件 @
65805509
...
...
@@ -16,6 +16,9 @@
#define _DEFAULT_SOURCE
#include "tfunctional.h"
FORCE_INLINE
void
*
genericInvoke
(
tGenericSavedFunc
*
const
pSavedFunc
)
{
return
pSavedFunc
->
func
(
pSavedFunc
->
args
);
}
#if 0
tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int32_t numOfArgs) {
tGenericSavedFunc* pSavedFunc = taosMemoryMalloc(sizeof(tGenericSavedFunc) + numOfArgs * (sizeof(void*)));
if (pSavedFunc == NULL) return NULL;
...
...
@@ -37,10 +40,9 @@ tVoidSavedFunc* voidSavedFuncInit(VoidVaFunc func, int32_t numOfArgs) {
return pSavedFunc;
}
FORCE_INLINE
void
*
genericInvoke
(
tGenericSavedFunc
*
const
pSavedFunc
)
{
return
pSavedFunc
->
func
(
pSavedFunc
->
args
);
}
FORCE_INLINE int32_t i32Invoke(tI32SavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); }
FORCE_INLINE void voidInvoke(tVoidSavedFunc* const pSavedFunc) {
if (pSavedFunc) pSavedFunc->func(pSavedFunc->args);
}
#endif
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录