Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7788bcb1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7788bcb1
编写于
7月 28, 2022
作者:
G
gccgdb1234
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
上级
6a20c2a5
2c52b316
变更
11
展开全部
隐藏空白更改
内联
并排
Showing
11 changed file
with
286 addition
and
703 deletion
+286
-703
include/libs/wal/wal.h
include/libs/wal/wal.h
+3
-3
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+23
-584
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+10
-7
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+24
-9
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+117
-54
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+3
-2
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+54
-3
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+7
-7
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+33
-29
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+11
-4
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
未找到文件。
include/libs/wal/wal.h
浏览文件 @
7788bcb1
...
...
@@ -77,11 +77,11 @@ typedef struct {
}
SWalSyncInfo
;
typedef
struct
{
int8_t
protoVer
;
int64_t
version
;
int
16_t
msgType
;
int
64_t
ingestTs
;
int32_t
bodyLen
;
int64_t
ingestTs
;
// not implemented
int16_t
msgType
;
int8_t
protoVer
;
// sync meta
SWalSyncInfo
syncMeta
;
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
7788bcb1
此差异已折叠。
点击以展开。
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
7788bcb1
...
...
@@ -281,8 +281,8 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
if
(
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
)
==
0
)
continue
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vG
Trace
(
"vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p"
,
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
)
,
pMsg
->
info
.
handle
);
vG
Info
(
"vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%ld"
,
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
info
.
handle
,
pMsg
->
info
.
conn
.
applyIndex
);
SRpcMsg
rsp
=
{.
code
=
pMsg
->
code
,
.
info
=
pMsg
->
info
};
if
(
rsp
.
code
==
0
)
{
...
...
@@ -503,9 +503,6 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
static
void
vnodeSyncCommitMsg
(
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
if
(
cbMeta
.
isWeak
==
0
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
vTrace
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s"
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
cbMeta
.
code
==
0
)
{
SRpcMsg
rpcMsg
=
{.
msgType
=
pMsg
->
msgType
,
.
contLen
=
pMsg
->
contLen
};
...
...
@@ -514,11 +511,17 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
syncGetAndDelRespRpc
(
pVnode
->
sync
,
cbMeta
.
seqNum
,
&
rpcMsg
.
info
);
rpcMsg
.
info
.
conn
.
applyIndex
=
cbMeta
.
index
;
rpcMsg
.
info
.
conn
.
applyTerm
=
cbMeta
.
term
;
vInfo
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s"
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
cbMeta
.
index
,
cbMeta
.
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
));
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
}
else
{
SRpcMsg
rsp
=
{.
code
=
cbMeta
.
code
,
.
info
=
pMsg
->
info
};
vError
(
"vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s"
,
syncGetVgId
(
pVnode
->
sync
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
),
cbMeta
.
code
,
tstrerror
(
cbMeta
.
code
));
vError
(
"vgId:%d, sync commit error, msgtype:%d,%s,
index:%ld,
error:0x%X, errmsg:%s"
,
syncGetVgId
(
pVnode
->
sync
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
),
cbMeta
.
index
,
cbMeta
.
code
,
tstrerror
(
cbMeta
.
code
));
if
(
rsp
.
info
.
handle
!=
NULL
)
{
tmsgSendRsp
(
&
rsp
);
}
...
...
source/libs/index/src/indexFilter.c
浏览文件 @
7788bcb1
...
...
@@ -579,11 +579,13 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
if
(
ctx
->
noExec
==
false
)
{
for
(
int32_t
m
=
0
;
m
<
node
->
pParameterList
->
length
;
m
++
)
{
// add impl later
if
(
node
->
condType
==
LOGIC_COND_TYPE_AND
)
{
taosArrayAddAll
(
output
->
result
,
params
[
m
].
result
);
// taosArrayDestroy(params[m].result);
// params[m].result = NULL;
}
else
if
(
node
->
condType
==
LOGIC_COND_TYPE_OR
)
{
taosArrayAddAll
(
output
->
result
,
params
[
m
].
result
);
// params[m].result = NULL;
}
else
if
(
node
->
condType
==
LOGIC_COND_TYPE_NOT
)
{
// taosArrayAddAll(output->result, params[m].result);
}
...
...
@@ -593,6 +595,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
}
else
{
for
(
int32_t
m
=
0
;
m
<
node
->
pParameterList
->
length
;
m
++
)
{
output
->
status
=
sifMergeCond
(
node
->
condType
,
output
->
status
,
params
[
m
].
status
);
taosArrayDestroy
(
params
[
m
].
result
);
params
[
m
].
result
=
NULL
;
}
}
_return:
...
...
@@ -607,6 +611,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) {
SIFCtx
*
ctx
=
context
;
ctx
->
code
=
sifExecFunction
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
!=
TSDB_CODE_SUCCESS
)
{
sifFreeParam
(
&
output
);
return
DEAL_RES_ERROR
;
}
...
...
@@ -624,6 +629,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SIFCtx
*
ctx
=
context
;
ctx
->
code
=
sifExecLogic
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
sifFreeParam
(
&
output
);
return
DEAL_RES_ERROR
;
}
...
...
@@ -640,6 +646,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) {
SIFCtx
*
ctx
=
context
;
ctx
->
code
=
sifExecOper
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
sifFreeParam
(
&
output
);
return
DEAL_RES_ERROR
;
}
if
(
taosHashPut
(
ctx
->
pRes
,
&
pNode
,
POINTER_BYTES
,
&
output
,
sizeof
(
output
)))
{
...
...
@@ -698,7 +705,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
}
nodesWalkExprPostOrder
(
pNode
,
sifCalcWalker
,
&
ctx
);
SIF_ERR_RET
(
ctx
.
code
);
if
(
ctx
.
code
!=
0
)
{
sifFreeRes
(
ctx
.
pRes
);
return
ctx
.
code
;
}
if
(
pDst
)
{
SIFParam
*
res
=
(
SIFParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
...
...
@@ -714,8 +725,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
taosHashRemove
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
}
sifFreeRes
(
ctx
.
pRes
);
SIF_RET
(
code
);
return
code
;
}
static
int32_t
sifGetFltHint
(
SNode
*
pNode
,
SIdxFltStatus
*
status
)
{
...
...
@@ -732,8 +742,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
}
nodesWalkExprPostOrder
(
pNode
,
sifCalcWalker
,
&
ctx
);
SIF_ERR_RET
(
ctx
.
code
);
if
(
ctx
.
code
!=
0
)
{
sifFreeRes
(
ctx
.
pRes
);
return
ctx
.
code
;
}
SIFParam
*
res
=
(
SIFParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
if
(
res
==
NULL
)
{
...
...
@@ -745,8 +757,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
sifFreeParam
(
res
);
taosHashRemove
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
taosHashCleanup
(
ctx
.
pRes
);
SIF_RET
(
code
);
return
code
;
}
int32_t
doFilterTag
(
SNode
*
pFilterNode
,
SIndexMetaArg
*
metaArg
,
SArray
*
result
,
SIdxFltStatus
*
status
)
{
...
...
@@ -760,7 +771,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
SArray
*
output
=
taosArrayInit
(
8
,
sizeof
(
uint64_t
));
SIFParam
param
=
{.
arg
=
*
metaArg
,
.
result
=
output
};
SIF_ERR_RET
(
sifCalculate
((
SNode
*
)
pFilterNode
,
&
param
));
int32_t
code
=
sifCalculate
((
SNode
*
)
pFilterNode
,
&
param
);
if
(
code
!=
0
)
{
sifFreeParam
(
&
param
);
return
code
;
}
taosArrayAddAll
(
result
,
param
.
result
);
sifFreeParam
(
&
param
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
7788bcb1
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
...
...
@@ -16,6 +15,10 @@
#ifdef USE_UV
#include "transComm.h"
typedef
struct
SConnList
{
queue
conn
;
}
SConnList
;
typedef
struct
SCliConn
{
T_REF_DECLARE
()
uv_connect_t
connReq
;
...
...
@@ -26,7 +29,9 @@ typedef struct SCliConn {
SConnBuffer
readBuf
;
STransQueue
cliMsgs
;
queue
q
;
queue
q
;
SConnList
*
list
;
STransCtx
ctx
;
bool
broken
;
// link broken or not
...
...
@@ -56,13 +61,14 @@ typedef struct SCliMsg {
}
SCliMsg
;
typedef
struct
SCliThrd
{
TdThread
thread
;
// tid
int64_t
pid
;
// pid
uv_loop_t
*
loop
;
SAsyncPool
*
asyncPool
;
uv_idle_t
*
idle
;
uv_timer_t
timer
;
void
*
pool
;
// conn pool
TdThread
thread
;
// tid
int64_t
pid
;
// pid
uv_loop_t
*
loop
;
SAsyncPool
*
asyncPool
;
uv_idle_t
*
idle
;
uv_prepare_t
*
prepare
;
uv_timer_t
timer
;
void
*
pool
;
// conn pool
// msg queue
queue
msg
;
...
...
@@ -86,10 +92,6 @@ typedef struct SCliObj {
SCliThrd
**
pThreadObj
;
}
SCliObj
;
typedef
struct
SConnList
{
queue
conn
;
}
SConnList
;
// conn pool
// add expire timeout and capacity limit
static
void
*
createConnPool
(
int
size
);
...
...
@@ -101,7 +103,7 @@ static void doCloseIdleConn(void* param);
static
int
sockDebugInfo
(
struct
sockaddr
*
sockname
,
char
*
dst
)
{
struct
sockaddr_in
addr
=
*
(
struct
sockaddr_in
*
)
sockname
;
char
buf
[
20
]
=
{
0
};
char
buf
[
16
]
=
{
0
};
int
r
=
uv_ip4_name
(
&
addr
,
(
char
*
)
buf
,
sizeof
(
buf
));
sprintf
(
dst
,
"%s:%d"
,
buf
,
ntohs
(
addr
.
sin_port
));
return
r
;
...
...
@@ -118,6 +120,9 @@ static void cliSendCb(uv_write_t* req, int status);
static
void
cliConnCb
(
uv_connect_t
*
req
,
int
status
);
static
void
cliAsyncCb
(
uv_async_t
*
handle
);
static
void
cliIdleCb
(
uv_idle_t
*
handle
);
static
void
cliPrepareCb
(
uv_prepare_t
*
handle
);
static
int32_t
allocConnRef
(
SCliConn
*
conn
,
bool
update
);
static
int
cliAppCb
(
SCliConn
*
pConn
,
STransMsg
*
pResp
,
SCliMsg
*
pMsg
);
...
...
@@ -198,7 +203,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
pThrd = (SCliThrd*)(exh)->pThrd; \
} \
} while (0)
#define CONN_PERSIST_TIME(para) ((para)
== 0 ? 3 * 1
000 : (para))
#define CONN_PERSIST_TIME(para) ((para)
<= 90000 ? 90
000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
...
...
@@ -499,9 +504,8 @@ void* destroyConnPool(void* pool) {
}
static
SCliConn
*
getConnFromPool
(
void
*
pool
,
char
*
ip
,
uint32_t
port
)
{
char
key
[
128
]
=
{
0
};
char
key
[
32
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
ip
,
port
);
SHashObj
*
pPool
=
pool
;
SConnList
*
plist
=
taosHashGet
(
pPool
,
key
,
strlen
(
key
));
if
(
plist
==
NULL
)
{
...
...
@@ -519,13 +523,44 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
conn
->
status
=
ConnNormal
;
QUEUE_REMOVE
(
&
conn
->
q
);
QUEUE_INIT
(
&
conn
->
q
);
assert
(
h
==
&
conn
->
q
);
transDQCancel
(((
SCliThrd
*
)
conn
->
hostThrd
)
->
timeoutQueue
,
conn
->
task
);
conn
->
task
=
NULL
;
return
conn
;
}
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
)
{
if
(
conn
->
status
==
ConnInPool
)
{
return
;
}
SCliThrd
*
thrd
=
conn
->
hostThrd
;
CONN_HANDLE_THREAD_QUIT
(
thrd
);
allocConnRef
(
conn
,
true
);
STrans
*
pTransInst
=
thrd
->
pTransInst
;
cliReleaseUnfinishedMsg
(
conn
);
transQueueClear
(
&
conn
->
cliMsgs
);
transCtxCleanup
(
&
conn
->
ctx
);
conn
->
status
=
ConnInPool
;
if
(
conn
->
list
==
NULL
)
{
char
key
[
32
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
conn
->
ip
,
conn
->
port
);
tTrace
(
"%s conn %p added to conn pool, read buf cap:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
conn
->
readBuf
.
cap
);
conn
->
list
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
}
assert
(
conn
->
list
!=
NULL
);
QUEUE_INIT
(
&
conn
->
q
);
QUEUE_PUSH
(
&
conn
->
list
->
conn
,
&
conn
->
q
);
assert
(
!
QUEUE_IS_EMPTY
(
&
conn
->
list
->
conn
));
STaskArg
*
arg
=
taosMemoryCalloc
(
1
,
sizeof
(
STaskArg
));
arg
->
param1
=
conn
;
arg
->
param2
=
thrd
;
conn
->
task
=
transDQSched
(
thrd
->
timeoutQueue
,
doCloseIdleConn
,
arg
,
CONN_PERSIST_TIME
(
pTransInst
->
idleTime
));
}
static
int32_t
allocConnRef
(
SCliConn
*
conn
,
bool
update
)
{
if
(
update
)
{
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
...
...
@@ -556,38 +591,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
return
0
;
}
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
)
{
if
(
conn
->
status
==
ConnInPool
)
{
return
;
}
SCliThrd
*
thrd
=
conn
->
hostThrd
;
CONN_HANDLE_THREAD_QUIT
(
thrd
);
allocConnRef
(
conn
,
true
);
STrans
*
pTransInst
=
thrd
->
pTransInst
;
cliReleaseUnfinishedMsg
(
conn
);
transQueueClear
(
&
conn
->
cliMsgs
);
transCtxCleanup
(
&
conn
->
ctx
);
conn
->
status
=
ConnInPool
;
char
key
[
128
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
conn
->
ip
,
conn
->
port
);
tTrace
(
"%s conn %p added to conn pool, read buf cap:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
conn
->
readBuf
.
cap
);
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
// list already create before
assert
(
plist
!=
NULL
);
QUEUE_INIT
(
&
conn
->
q
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
q
);
assert
(
!
QUEUE_IS_EMPTY
(
&
plist
->
conn
));
STaskArg
*
arg
=
taosMemoryCalloc
(
1
,
sizeof
(
STaskArg
));
arg
->
param1
=
conn
;
arg
->
param2
=
thrd
;
conn
->
task
=
transDQSched
(
thrd
->
timeoutQueue
,
doCloseIdleConn
,
arg
,
CONN_PERSIST_TIME
(
pTransInst
->
idleTime
));
}
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
SCliConn
*
conn
=
handle
->
data
;
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
...
...
@@ -602,11 +605,9 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
if
(
transReadComplete
(
pBuf
))
{
while
(
transReadComplete
(
pBuf
))
{
tTrace
(
"%s conn %p read complete"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
cliHandleResp
(
conn
);
}
else
{
tTrace
(
"%s conn %p read partial packet, continue to read"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
}
return
;
}
...
...
@@ -967,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) {
static
void
cliIdleCb
(
uv_idle_t
*
handle
)
{
SCliThrd
*
thrd
=
handle
->
data
;
tTrace
(
"do idle work"
);
SAsyncPool
*
pool
=
thrd
->
asyncPool
;
for
(
int
i
=
0
;
i
<
pool
->
nAsync
;
i
++
)
{
uv_async_t
*
async
=
&
(
pool
->
asyncs
[
i
]);
SAsyncItem
*
item
=
async
->
data
;
queue
wq
;
taosThreadMutexLock
(
&
item
->
mtx
);
QUEUE_MOVE
(
&
item
->
qmsg
,
&
wq
);
taosThreadMutexUnlock
(
&
item
->
mtx
);
int
count
=
0
;
while
(
!
QUEUE_IS_EMPTY
(
&
wq
))
{
queue
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
h
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
if
(
pMsg
==
NULL
)
{
continue
;
}
(
*
cliAsyncHandle
[
pMsg
->
type
])(
pMsg
,
thrd
);
count
++
;
}
}
tTrace
(
"prepare work end"
);
if
(
thrd
->
stopMsg
!=
NULL
)
cliHandleQuit
(
thrd
->
stopMsg
,
thrd
);
}
static
void
cliPrepareCb
(
uv_prepare_t
*
handle
)
{
SCliThrd
*
thrd
=
handle
->
data
;
tTrace
(
"prepare work start"
);
SAsyncPool
*
pool
=
thrd
->
asyncPool
;
for
(
int
i
=
0
;
i
<
pool
->
nAsync
;
i
++
)
{
uv_async_t
*
async
=
&
(
pool
->
asyncs
[
i
]);
SAsyncItem
*
item
=
async
->
data
;
queue
wq
;
taosThreadMutexLock
(
&
item
->
mtx
);
QUEUE_MOVE
(
&
item
->
qmsg
,
&
wq
);
taosThreadMutexUnlock
(
&
item
->
mtx
);
int
count
=
0
;
while
(
!
QUEUE_IS_EMPTY
(
&
wq
))
{
queue
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
h
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
if
(
pMsg
==
NULL
)
{
continue
;
}
(
*
cliAsyncHandle
[
pMsg
->
type
])(
pMsg
,
thrd
);
count
++
;
}
}
tTrace
(
"prepare work end"
);
if
(
thrd
->
stopMsg
!=
NULL
)
cliHandleQuit
(
thrd
->
stopMsg
,
thrd
);
}
static
void
*
cliWorkThread
(
void
*
arg
)
{
...
...
@@ -1033,7 +1090,12 @@ static SCliThrd* createThrdObj() {
// pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t));
// uv_idle_init(pThrd->loop, pThrd->idle);
// pThrd->idle->data = pThrd;
// uv_idle_start(pThrd->idle, cliIdleCb);
// uv_idle_start(pThrd->idle, cliIdleCb);
pThrd
->
prepare
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_prepare_t
));
uv_prepare_init
(
pThrd
->
loop
,
pThrd
->
prepare
);
pThrd
->
prepare
->
data
=
pThrd
;
uv_prepare_start
(
pThrd
->
prepare
,
cliPrepareCb
);
pThrd
->
pool
=
createConnPool
(
4
);
transDQCreate
(
pThrd
->
loop
,
&
pThrd
->
delayQueue
);
...
...
@@ -1058,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy
(
pThrd
->
timeoutQueue
,
NULL
);
taosMemoryFree
(
pThrd
->
idle
);
taosMemoryFree
(
pThrd
->
prepare
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
);
}
...
...
source/libs/transport/src/transComm.c
浏览文件 @
7788bcb1
...
...
@@ -120,8 +120,9 @@ int transInitBuffer(SConnBuffer* buf) {
buf
->
total
=
0
;
return
0
;
}
int
transDestroyBuffer
(
SConnBuffer
*
buf
)
{
taosMemoryFree
(
buf
->
buf
);
int
transDestroyBuffer
(
SConnBuffer
*
p
)
{
taosMemoryFree
(
p
->
buf
);
p
->
buf
=
NULL
;
return
0
;
}
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
7788bcb1
...
...
@@ -73,6 +73,7 @@ typedef struct SWorkThrd {
uv_os_fd_t
fd
;
uv_loop_t
*
loop
;
SAsyncPool
*
asyncPool
;
uv_prepare_t
*
prepare
;
queue
msg
;
TdThreadMutex
msgMtx
;
...
...
@@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
);
static
void
uvAcceptAsyncCb
(
uv_async_t
*
handle
);
static
void
uvShutDownCb
(
uv_shutdown_t
*
req
,
int
status
);
static
void
uvPrepareCb
(
uv_prepare_t
*
handle
);
/*
* time-consuming task throwed into BG work thread
...
...
@@ -238,8 +240,6 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg
.
msgType
=
pHead
->
msgType
;
transMsg
.
code
=
pHead
->
code
;
// transClearBuffer(&pConn->readBuf);
pConn
->
inType
=
pHead
->
msgType
;
if
(
pConn
->
status
==
ConnNormal
)
{
if
(
pHead
->
persist
==
1
)
{
...
...
@@ -546,6 +546,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
uv_close
((
uv_handle_t
*
)
req
->
handle
,
uvDestroyConn
);
taosMemoryFree
(
req
);
}
static
void
uvPrepareCb
(
uv_prepare_t
*
handle
)
{
// prepare callback
SWorkThrd
*
pThrd
=
handle
->
data
;
SAsyncPool
*
pool
=
pThrd
->
asyncPool
;
for
(
int
i
=
0
;
i
<
pool
->
nAsync
;
i
++
)
{
uv_async_t
*
async
=
&
(
pool
->
asyncs
[
i
]);
SAsyncItem
*
item
=
async
->
data
;
queue
wq
;
taosThreadMutexLock
(
&
item
->
mtx
);
QUEUE_MOVE
(
&
item
->
qmsg
,
&
wq
);
taosThreadMutexUnlock
(
&
item
->
mtx
);
while
(
!
QUEUE_IS_EMPTY
(
&
wq
))
{
queue
*
head
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
head
);
SSvrMsg
*
msg
=
QUEUE_DATA
(
head
,
SSvrMsg
,
q
);
if
(
msg
==
NULL
)
{
tError
(
"unexcept occurred, continue"
);
continue
;
}
// release handle to rpc init
if
(
msg
->
type
==
Quit
)
{
(
*
transAsyncHandle
[
msg
->
type
])(
msg
,
pThrd
);
continue
;
}
else
{
STransMsg
transMsg
=
msg
->
msg
;
SExHandle
*
exh1
=
transMsg
.
info
.
handle
;
int64_t
refId
=
transMsg
.
info
.
refId
;
SExHandle
*
exh2
=
transAcquireExHandle
(
transGetRefMgt
(),
refId
);
if
(
exh2
==
NULL
||
exh1
!=
exh2
)
{
tTrace
(
"handle except msg %p, ignore it"
,
exh1
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
destroySmsg
(
msg
);
continue
;
}
msg
->
pConn
=
exh1
->
handle
;
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
(
*
transAsyncHandle
[
msg
->
type
])(
msg
,
pThrd
);
}
}
}
}
static
void
uvWorkDoTask
(
uv_work_t
*
req
)
{
// doing time-consumeing task
...
...
@@ -695,13 +741,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
}
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
// int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd
->
pipe
->
data
=
pThrd
;
QUEUE_INIT
(
&
pThrd
->
msg
);
taosThreadMutexInit
(
&
pThrd
->
msgMtx
,
NULL
);
pThrd
->
prepare
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_prepare_t
));
uv_prepare_init
(
pThrd
->
loop
,
pThrd
->
prepare
);
uv_prepare_start
(
pThrd
->
prepare
,
uvPrepareCb
);
pThrd
->
prepare
->
data
=
pThrd
;
// conn set
QUEUE_INIT
(
&
pThrd
->
conn
);
...
...
@@ -986,6 +1036,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
SRV_RELEASE_UV
(
pThrd
->
loop
);
TRANS_DESTROY_ASYNC_POOL_MSG
(
pThrd
->
asyncPool
,
SSvrMsg
,
destroySmsg
);
transAsyncPoolDestroy
(
pThrd
->
asyncPool
);
taosMemoryFree
(
pThrd
->
prepare
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
);
}
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
7788bcb1
...
...
@@ -139,7 +139,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
const
char
*
idxPattern
=
"^[0-9]+.idx$"
;
regex_t
logRegPattern
;
regex_t
idxRegPattern
;
SArray
*
pLogInfoArray
=
taosArrayInit
(
8
,
sizeof
(
SWalFileInfo
));
SArray
*
actualLog
=
taosArrayInit
(
8
,
sizeof
(
SWalFileInfo
));
regcomp
(
&
logRegPattern
,
logPattern
,
REG_EXTENDED
);
regcomp
(
&
idxRegPattern
,
idxPattern
,
REG_EXTENDED
);
...
...
@@ -159,7 +159,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo
fileInfo
;
memset
(
&
fileInfo
,
-
1
,
sizeof
(
SWalFileInfo
));
sscanf
(
name
,
"%"
PRId64
".log"
,
&
fileInfo
.
firstVer
);
taosArrayPush
(
pLogInfoArray
,
&
fileInfo
);
taosArrayPush
(
actualLog
,
&
fileInfo
);
}
}
...
...
@@ -167,10 +167,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
regfree
(
&
logRegPattern
);
regfree
(
&
idxRegPattern
);
taosArraySort
(
pLogInfoArray
,
compareWalFileInfo
);
taosArraySort
(
actualLog
,
compareWalFileInfo
);
int
metaFileNum
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
int
actualFileNum
=
taosArrayGetSize
(
pLogInfoArray
);
int
actualFileNum
=
taosArrayGetSize
(
actualLog
);
#if 0
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
...
...
@@ -196,11 +196,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
taosArrayPopFrontBatch
(
pWal
->
fileInfoSet
,
metaFileNum
-
actualFileNum
);
}
else
if
(
metaFileNum
<
actualFileNum
)
{
for
(
int
i
=
metaFileNum
;
i
<
actualFileNum
;
i
++
)
{
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pLogInfoArray
,
i
);
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
actualLog
,
i
);
taosArrayPush
(
pWal
->
fileInfoSet
,
pFileInfo
);
}
}
taosArrayDestroy
(
pLogInfoArray
);
taosArrayDestroy
(
actualLog
);
pWal
->
writeCur
=
actualFileNum
-
1
;
if
(
actualFileNum
>
0
)
{
...
...
@@ -221,7 +221,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
int
code
=
walSaveMeta
(
pWal
);
if
(
code
<
0
)
{
taosArrayDestroy
(
pLogInfoArray
);
taosArrayDestroy
(
actualLog
);
return
-
1
;
}
}
...
...
source/libs/wal/src/walRead.c
浏览文件 @
7788bcb1
...
...
@@ -423,37 +423,38 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
return
0
;
}
int32_t
walReadVer
(
SWalReader
*
pRead
,
int64_t
ver
)
{
wDebug
(
"vgId:%d wal start to read ver %ld"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
int32_t
walReadVer
(
SWalReader
*
pRead
er
,
int64_t
ver
)
{
wDebug
(
"vgId:%d wal start to read ver %ld"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
ver
);
int64_t
contLen
;
int32_t
code
;
bool
seeked
=
false
;
if
(
pRead
->
pWal
->
vers
.
firstVer
==
-
1
)
{
if
(
pRead
er
->
pWal
->
vers
.
firstVer
==
-
1
)
{
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
if
(
ver
>
pRead
->
pWal
->
vers
.
lastVer
||
ver
<
pRead
->
pWal
->
vers
.
firstVer
)
{
wDebug
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
lastVer
);
if
(
ver
>
pRead
er
->
pWal
->
vers
.
lastVer
||
ver
<
pReader
->
pWal
->
vers
.
firstVer
)
{
wDebug
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
er
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
er
->
pWal
->
vers
.
firstVer
,
pReader
->
pWal
->
vers
.
lastVer
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
if
(
pRead
->
curInvalid
||
pRead
->
curVersion
!=
ver
)
{
if
(
walReadSeekVer
(
pRead
,
ver
)
<
0
)
{
wError
(
"vgId:%d, unexpected wal log, index:%"
PRId64
", since %s"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
terrstr
());
if
(
pRead
er
->
curInvalid
||
pReader
->
curVersion
!=
ver
)
{
if
(
walReadSeekVer
(
pRead
er
,
ver
)
<
0
)
{
wError
(
"vgId:%d, unexpected wal log, index:%"
PRId64
", since %s"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
ver
,
terrstr
());
return
-
1
;
}
seeked
=
true
;
}
while
(
1
)
{
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
pRead
->
pHead
,
sizeof
(
SWalCkHead
));
contLen
=
taosReadFile
(
pRead
er
->
pLogFile
,
pReader
->
pHead
,
sizeof
(
SWalCkHead
));
if
(
contLen
==
sizeof
(
SWalCkHead
))
{
break
;
}
else
if
(
contLen
==
0
&&
!
seeked
)
{
walReadSeekVerImpl
(
pRead
,
ver
);
walReadSeekVerImpl
(
pRead
er
,
ver
);
seeked
=
true
;
continue
;
}
else
{
...
...
@@ -467,26 +468,26 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
}
}
co
ntLen
=
walValidHeadCksum
(
pRead
->
pHead
);
if
(
co
ntLen
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log, index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
co
de
=
walValidHeadCksum
(
pReader
->
pHead
);
if
(
co
de
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log, index:%"
PRId64
", since head checksum not passed"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
if
(
pRead
->
capacity
<
pRead
->
pHead
->
head
.
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
pRead
->
pHead
,
sizeof
(
SWalCkHead
)
+
pRead
->
pHead
->
head
.
bodyLen
);
if
(
pRead
er
->
capacity
<
pReader
->
pHead
->
head
.
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
pRead
er
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReader
->
pHead
->
head
.
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
-
1
;
}
pRead
->
pHead
=
ptr
;
pRead
->
capacity
=
pRead
->
pHead
->
head
.
bodyLen
;
pRead
er
->
pHead
=
ptr
;
pRead
er
->
capacity
=
pReader
->
pHead
->
head
.
bodyLen
;
}
if
((
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
body
,
pRead
->
pHead
->
head
.
bodyLen
))
!=
pRead
->
pHead
->
head
.
bodyLen
)
{
if
((
contLen
=
taosReadFile
(
pRead
er
->
pLogFile
,
pReader
->
pHead
->
head
.
body
,
pReader
->
pHead
->
head
.
bodyLen
))
!=
pRead
er
->
pHead
->
head
.
bodyLen
)
{
if
(
contLen
<
0
)
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
else
{
...
...
@@ -496,25 +497,28 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
return
-
1
;
}
if
(
pRead
->
pHead
->
head
.
version
!=
ver
)
{
wError
(
"vgId:%d, unexpected wal log, index:%"
PRId64
", read request index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
curInvalid
=
1
;
if
(
pRead
er
->
pHead
->
head
.
version
!=
ver
)
{
wError
(
"vgId:%d, unexpected wal log, index:%"
PRId64
", read request index:%"
PRId64
,
pRead
er
->
pWal
->
cfg
.
vgId
,
pRead
er
->
pHead
->
head
.
version
,
ver
);
pRead
er
->
curInvalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
return
-
1
;
}
co
ntLen
=
walValidBodyCksum
(
pRead
->
pHead
);
if
(
co
ntLen
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log, index:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
co
de
=
walValidBodyCksum
(
pReader
->
pHead
);
if
(
co
de
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log, index:%"
PRId64
", since body checksum not passed"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
ver
);
pRead
->
curInvalid
=
1
;
uint32_t
readCkSum
=
walCalcBodyCksum
(
pReader
->
pHead
->
head
.
body
,
pReader
->
pHead
->
head
.
bodyLen
);
uint32_t
logCkSum
=
pReader
->
pHead
->
cksumBody
;
wError
(
"checksum written into log: %u, checksum calculated: %u"
,
logCkSum
,
readCkSum
);
pReader
->
curInvalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
return
-
1
;
}
pRead
->
curVersion
++
;
pRead
er
->
curVersion
++
;
return
0
;
}
source/libs/wal/src/walWrite.c
浏览文件 @
7788bcb1
...
...
@@ -289,18 +289,25 @@ int32_t walEndSnapshot(SWal *pWal) {
newTotSize
-=
iter
->
fileSize
;
}
}
char
fnameStr
[
WAL_FILE_LEN
];
int32_t
actualDelete
=
0
;
char
fnameStr
[
WAL_FILE_LEN
];
// remove file
for
(
int
i
=
0
;
i
<
deleteCnt
;
i
++
)
{
pInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
i
);
walBuildLogName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
taosRemoveFile
(
fnameStr
);
if
(
taosRemoveFile
(
fnameStr
)
<
0
)
{
goto
UPDATE_META
;
}
walBuildIdxName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
taosRemoveFile
(
fnameStr
);
if
(
taosRemoveFile
(
fnameStr
)
<
0
)
{
ASSERT
(
0
);
}
actualDelete
++
;
}
UPDATE_META:
// make new array, remove files
taosArrayPopFrontBatch
(
pWal
->
fileInfoSet
,
deleteCnt
);
taosArrayPopFrontBatch
(
pWal
->
fileInfoSet
,
actualDelete
);
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
{
pWal
->
writeCur
=
-
1
;
pWal
->
vers
.
firstVer
=
-
1
;
...
...
tests/script/jenkins/basic.txt
浏览文件 @
7788bcb1
...
...
@@ -166,7 +166,7 @@
# ---- query ----
./test.sh -f tsim/query/charScalarFunction.sim
#
./test.sh -f tsim/query/explain.sim
./test.sh -f tsim/query/explain.sim
./test.sh -f tsim/query/interval-offset.sim
./test.sh -f tsim/query/interval.sim
./test.sh -f tsim/query/scalarFunction.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录