Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9fad9318
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9fad9318
编写于
12月 01, 2020
作者:
L
liuyq-617
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into test/testcase
上级
d4c6aaf2
3f8118c9
变更
31
隐藏空白更改
内联
并排
Showing
31 changed file
with
394 addition
and
369 deletion
+394
-369
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+3
-3
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+5
-2
src/client/src/tscServer.c
src/client/src/tscServer.c
+18
-31
src/client/src/tscSql.c
src/client/src/tscSql.c
+6
-12
src/client/src/tscSub.c
src/client/src/tscSub.c
+2
-2
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+3
-0
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+6
-6
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+8
-27
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+3
-5
src/cq/test/cqtest.c
src/cq/test/cqtest.c
+1
-1
src/dnode/src/dnodeMPeer.c
src/dnode/src/dnodeMPeer.c
+8
-3
src/dnode/src/dnodeMRead.c
src/dnode/src/dnodeMRead.c
+8
-3
src/dnode/src/dnodeMWrite.c
src/dnode/src/dnodeMWrite.c
+11
-7
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+1
-1
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+1
-1
src/inc/mnode.h
src/inc/mnode.h
+3
-0
src/inc/taosdef.h
src/inc/taosdef.h
+1
-1
src/inc/taoserror.h
src/inc/taoserror.h
+6
-3
src/inc/tcq.h
src/inc/tcq.h
+1
-1
src/inc/tsync.h
src/inc/tsync.h
+2
-2
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+5
-7
src/mnode/src/mnodeMain.c
src/mnode/src/mnodeMain.c
+60
-20
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+15
-2
src/mnode/src/mnodeUser.c
src/mnode/src/mnodeUser.c
+13
-2
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+12
-5
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+3
-5
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+1
-2
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+177
-174
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+5
-5
src/util/src/ttimer.c
src/util/src/ttimer.c
+0
-31
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+6
-5
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
9fad9318
...
...
@@ -333,7 +333,7 @@ typedef struct STscObj {
char
superAuth
:
1
;
uint32_t
connId
;
uint64_t
rid
;
// ref ID returned by taosAddRef
struct
SSqlObj
*
pHb
;
int64_t
hbrid
;
struct
SSqlObj
*
sqlList
;
struct
SSqlStream
*
streamList
;
void
*
pDnodeConn
;
...
...
@@ -373,7 +373,7 @@ typedef struct SSqlObj {
struct
SSqlObj
**
pSubs
;
struct
SSqlObj
*
prev
,
*
next
;
struct
SSqlObj
**
self
;
int64_t
self
;
}
SSqlObj
;
typedef
struct
SSqlStream
{
...
...
@@ -507,7 +507,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
}
extern
SCacheObj
*
tscMetaCache
;
extern
SCacheObj
*
tscObjCache
;
extern
int
tscObjRef
;
extern
void
*
tscTmr
;
extern
void
*
tscQhandle
;
extern
int
tscKeepConn
[];
...
...
src/client/src/tscLocal.c
浏览文件 @
9fad9318
...
...
@@ -825,8 +825,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) {
static
int32_t
tscProcessServStatus
(
SSqlObj
*
pSql
)
{
STscObj
*
pObj
=
pSql
->
pTscObj
;
if
(
pObj
->
pHb
!=
NULL
)
{
if
(
pObj
->
pHb
->
res
.
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
SSqlObj
*
pHb
=
(
SSqlObj
*
)
taosAcquireRef
(
tscObjRef
,
pObj
->
hbrid
);
if
(
pHb
!=
NULL
)
{
int32_t
code
=
pHb
->
res
.
code
;
taosReleaseRef
(
tscObjRef
,
pObj
->
hbrid
);
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
pSql
->
res
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
return
pSql
->
res
.
code
;
}
...
...
src/client/src/tscServer.c
浏览文件 @
9fad9318
...
...
@@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if
(
pRsp
->
streamId
)
tscKillStream
(
pObj
,
htonl
(
pRsp
->
streamId
));
}
}
else
{
tscDebug
(
"%
p heartbeat failed, code:%s"
,
pObj
->
pHb
,
tstrerror
(
code
));
tscDebug
(
"%
"
PRId64
" heartbeat failed, code:%s"
,
pObj
->
hbrid
,
tstrerror
(
code
));
}
if
(
pObj
->
pHb
!=
NULL
)
{
if
(
pObj
->
hbrid
!=
0
)
{
int32_t
waitingDuring
=
tsShellActivityTimer
*
500
;
tscDebug
(
"%p send heartbeat in %dms"
,
pSql
,
waitingDuring
);
...
...
@@ -193,20 +193,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj
*
pObj
=
taosAcquireRef
(
tscRefId
,
rid
);
if
(
pObj
==
NULL
)
return
;
SSqlObj
*
pHB
=
pObj
->
pHb
;
void
**
p
=
taosCacheAcquireByKey
(
tscObjCache
,
&
pHB
,
sizeof
(
TSDB_CACHE_PTR_TYPE
));
if
(
p
==
NULL
)
{
tscWarn
(
"%p HB object has been released already"
,
pHB
);
taosReleaseRef
(
tscRefId
,
pObj
->
rid
);
return
;
}
assert
(
*
pHB
->
self
==
pHB
);
SSqlObj
*
pHB
=
taosAcquireRef
(
tscObjRef
,
pObj
->
hbrid
);
assert
(
pHB
->
self
==
pObj
->
hbrid
);
pHB
->
retry
=
0
;
int32_t
code
=
tscProcessSql
(
pHB
);
taos
CacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
false
);
taos
ReleaseRef
(
tscObjRef
,
pObj
->
hbrid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p failed to sent HB to server, reason:%s"
,
pHB
,
tstrerror
(
code
));
...
...
@@ -236,7 +228,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.
msgType
=
pSql
->
cmd
.
msgType
,
.
pCont
=
pMsg
,
.
contLen
=
pSql
->
cmd
.
payloadLen
,
.
ahandle
=
pSql
,
.
ahandle
=
(
void
*
)
pSql
->
self
,
.
handle
=
NULL
,
.
code
=
0
};
...
...
@@ -247,26 +239,24 @@ int tscSendMsgToServer(SSqlObj *pSql) {
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
,
SRpcEpSet
*
pEpSet
)
{
TSDB_CACHE_PTR_TYPE
handle
=
(
TSDB_CACHE_PTR_TYPE
)
rpcMsg
->
ahandle
;
void
**
p
=
taosCacheAcquireByKey
(
tscObjCache
,
&
handle
,
sizeof
(
TSDB_CACHE_PTR_TYPE
)
);
if
(
p
==
NULL
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
taosAcquireRef
(
tscObjRef
,
handle
);
if
(
p
Sql
==
NULL
)
{
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
SSqlObj
*
pSql
=
*
p
;
assert
(
pSql
!=
NULL
);
assert
(
pSql
->
self
==
handle
);
STscObj
*
pObj
=
pSql
->
pTscObj
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
assert
(
*
pSql
->
self
==
pSql
);
pSql
->
rpcRid
=
-
1
;
if
(
pObj
->
signature
!=
pObj
)
{
tscDebug
(
"%p DB connection is closed, cmd:%d pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pObj
,
pObj
->
signature
);
taosCacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
true
);
taosRemoveRef
(
tscObjRef
,
pSql
->
self
);
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
...
...
@@ -276,10 +266,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
tscDebug
(
"%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pQueryInfo
->
type
,
pObj
,
pObj
->
signature
);
void
**
p1
=
p
;
taosCacheRelease
(
tscObjCache
,
(
void
**
)
&
p1
,
false
);
taosCacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
true
);
taosRemoveRef
(
tscObjRef
,
pSql
->
self
);
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
...
...
@@ -322,7 +310,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// if there is an error occurring, proceed to the following error handling procedure.
if
(
rpcMsg
->
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
taos
CacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
false
);
taos
ReleaseRef
(
tscObjRef
,
pSql
->
self
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
...
...
@@ -390,11 +378,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
rpcMsg
->
code
);
}
void
**
p1
=
p
;
taosCacheRelease
(
tscObjCache
,
(
void
**
)
&
p1
,
false
);
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
if
(
shouldFree
)
{
// in case of table-meta/vgrouplist query, automatically free it
taos
CacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
true
);
taos
RemoveRef
(
tscObjRef
,
pSql
->
self
);
tscDebug
(
"%p sqlObj is automatically freed"
,
pSql
);
}
...
...
@@ -2020,7 +2007,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
// TODO multithread problem
static
void
createHBObj
(
STscObj
*
pObj
)
{
if
(
pObj
->
pHb
!=
NULL
)
{
if
(
pObj
->
hbrid
!=
0
)
{
return
;
}
...
...
@@ -2052,7 +2039,7 @@ static void createHBObj(STscObj* pObj) {
registerSqlObj
(
pSql
);
tscDebug
(
"%p HB is allocated, pObj:%p"
,
pSql
,
pObj
);
pObj
->
pHb
=
pSql
;
pObj
->
hbrid
=
pSql
->
self
;
}
int
tscProcessConnectRsp
(
SSqlObj
*
pSql
)
{
...
...
src/client/src/tscSql.c
浏览文件 @
9fad9318
...
...
@@ -276,8 +276,8 @@ void taos_close(TAOS *taos) {
pObj
->
signature
=
NULL
;
taosTmrStopA
(
&
(
pObj
->
pTimer
));
SSqlObj
*
pHb
=
pObj
->
pHb
;
if
(
pHb
!=
NULL
&&
atomic_val_compare_exchange_ptr
(
&
pObj
->
pHb
,
pHb
,
0
)
==
pHb
)
{
SSqlObj
*
pHb
=
(
SSqlObj
*
)
taosAcquireRef
(
tscObjRef
,
pObj
->
hbrid
)
;
if
(
pHb
!=
NULL
)
{
if
(
pHb
->
rpcRid
>
0
)
{
// wait for rsp from dnode
rpcCancelRequest
(
pHb
->
rpcRid
);
pHb
->
rpcRid
=
-
1
;
...
...
@@ -285,6 +285,7 @@ void taos_close(TAOS *taos) {
tscDebug
(
"%p HB is freed"
,
pHb
);
taos_free_result
(
pHb
);
taosReleaseRef
(
tscObjRef
,
pHb
->
self
);
}
int32_t
ref
=
T_REF_DEC
(
pObj
);
...
...
@@ -606,8 +607,7 @@ void taos_free_result(TAOS_RES *res) {
bool
freeNow
=
tscKillQueryInDnode
(
pSql
);
if
(
freeNow
)
{
tscDebug
(
"%p free sqlObj in cache"
,
pSql
);
SSqlObj
**
p
=
pSql
->
self
;
taosCacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
true
);
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
}
}
...
...
@@ -700,13 +700,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
continue
;
}
void
**
p
=
taosCacheAcquireByKey
(
tscObjCache
,
&
pSub
,
sizeof
(
TSDB_CACHE_PTR_TYPE
));
if
(
p
==
NULL
)
{
continue
;
}
SSqlObj
*
pSubObj
=
(
SSqlObj
*
)
(
*
p
);
assert
(
pSubObj
->
self
==
(
SSqlObj
**
)
p
);
SSqlObj
*
pSubObj
=
pSub
;
pSubObj
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
if
(
pSubObj
->
rpcRid
>
0
)
{
...
...
@@ -715,7 +709,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
}
tscQueueAsyncRes
(
pSubObj
);
taos
CacheRelease
(
tscObjCache
,
(
void
**
)
&
p
,
false
);
taos
ReleaseRef
(
tscObjRef
,
pSubObj
->
self
);
}
tscDebug
(
"%p super table query cancelled"
,
pSql
);
...
...
src/client/src/tscSub.c
浏览文件 @
9fad9318
...
...
@@ -179,8 +179,8 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
fail:
tscError
(
"tscCreateSubscription failed at line %d, reason: %s"
,
line
,
tstrerror
(
code
));
if
(
pSql
!=
NULL
)
{
if
(
pSql
->
self
!=
NULL
)
{
taos
_free_result
(
pSql
);
if
(
pSql
->
self
!=
0
)
{
taos
ReleaseRef
(
tscObjRef
,
pSql
->
self
);
}
else
{
tscFreeSqlObj
(
pSql
);
}
...
...
src/client/src/tscSubquery.c
浏览文件 @
9fad9318
...
...
@@ -2198,6 +2198,9 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) {
STableDataBlocks
*
pTableDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
pSupporter
->
index
);
int32_t
code
=
tscCopyDataBlockToPayload
(
pSql
,
pTableDataBlock
);
// free the data block created from insert sql string
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
if
((
pRes
->
code
=
code
)
!=
TSDB_CODE_SUCCESS
)
{
tscQueueAsyncRes
(
pSql
);
return
code
;
// here the pSql may have been released already.
...
...
src/client/src/tscSystem.c
浏览文件 @
9fad9318
...
...
@@ -15,7 +15,7 @@
#include "os.h"
#include "taosmsg.h"
#include "t
cache
.h"
#include "t
ref
.h"
#include "trpc.h"
#include "tsystem.h"
#include "ttimer.h"
...
...
@@ -31,7 +31,7 @@
// global, not configurable
SCacheObj
*
tscMetaCache
;
SCacheObj
*
tscObjCache
;
int
tscObjRef
=
-
1
;
void
*
tscTmr
;
void
*
tscQhandle
;
void
*
tscCheckDiskUsageTmr
;
...
...
@@ -144,7 +144,7 @@ void taos_init_imp(void) {
int64_t
refreshTime
=
10
;
// 10 seconds by default
if
(
tscMetaCache
==
NULL
)
{
tscMetaCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
refreshTime
,
false
,
tscFreeTableMetaHelper
,
"tableMeta"
);
tscObj
Cache
=
taosCacheInit
(
TSDB_CACHE_PTR_KEY
,
refreshTime
/
2
,
false
,
tscFreeRegisteredSqlObj
,
"sqlObj"
);
tscObj
Ref
=
taosOpenRef
(
4096
,
tscFreeRegisteredSqlObj
);
}
tscRefId
=
taosOpenRef
(
200
,
tscCloseTscObj
);
...
...
@@ -167,9 +167,9 @@ void taos_cleanup(void) {
taosCacheCleanup
(
m
);
}
m
=
tscObjCache
;
if
(
m
!=
NULL
&&
atomic_val_compare_exchange_ptr
(
&
tscObjCache
,
m
,
0
)
==
m
)
{
taosC
acheCleanup
(
m
);
int
refId
=
atomic_exchange_32
(
&
tscObjRef
,
-
1
)
;
if
(
refId
!=
-
1
)
{
taosC
loseRef
(
refId
);
}
m
=
tscQhandle
;
...
...
src/client/src/tscUtil.c
浏览文件 @
9fad9318
...
...
@@ -447,20 +447,18 @@ static void tscFreeSubobj(SSqlObj* pSql) {
void
tscFreeRegisteredSqlObj
(
void
*
pSql
)
{
assert
(
pSql
!=
NULL
);
SSqlObj
*
*
p
=
(
SSqlObj
**
)
pSql
;
STscObj
*
pTscObj
=
(
*
p
)
->
pTscObj
;
SSqlObj
*
p
=
*
(
SSqlObj
**
)
pSql
;
STscObj
*
pTscObj
=
p
->
pTscObj
;
assert
((
*
p
)
->
self
!=
0
&&
(
*
p
)
->
self
==
(
p
));
SSqlObj
*
ptr
=
*
p
;
tscFreeSqlObj
(
*
p
);
assert
(
p
->
self
!=
0
);
tscFreeSqlObj
(
p
);
int32_t
ref
=
T_REF_DEC
(
pTscObj
);
assert
(
ref
>=
0
);
tscDebug
(
"%p free sqlObj completed, tscObj:%p ref:%d"
,
p
tr
,
pTscObj
,
ref
);
tscDebug
(
"%p free sqlObj completed, tscObj:%p ref:%d"
,
p
,
pTscObj
,
ref
);
if
(
ref
==
0
)
{
tscDebug
(
"%p all sqlObj freed, free tscObj:%p"
,
p
tr
,
pTscObj
);
tscDebug
(
"%p all sqlObj freed, free tscObj:%p"
,
p
,
pTscObj
);
taosRemoveRef
(
tscRefId
,
pTscObj
->
rid
);
}
}
...
...
@@ -840,7 +838,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
// the length does not include the SSubmitBlk structure
pBlocks
->
dataLen
=
htonl
(
finalLen
);
dataBuf
->
numOfTables
+=
1
;
}
...
...
@@ -1565,19 +1562,6 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) {
}
}
void
tscSetFreeHeatBeat
(
STscObj
*
pObj
)
{
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
||
pObj
->
pHb
==
NULL
)
{
return
;
}
SSqlObj
*
pHeatBeat
=
pObj
->
pHb
;
assert
(
pHeatBeat
==
pHeatBeat
->
signature
);
// to denote the heart-beat timer close connection and free all allocated resources
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pHeatBeat
->
cmd
,
0
);
pQueryInfo
->
type
=
TSDB_QUERY_TYPE_FREE_RESOURCE
;
}
/*
* the following four kinds of SqlObj should not be freed
* 1. SqlObj for stream computing
...
...
@@ -1596,7 +1580,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
}
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
if
(
pSql
->
pStream
!=
NULL
||
pTscObj
->
pHb
==
pSql
||
pSql
->
pSubscription
!=
NULL
)
{
if
(
pSql
->
pStream
!=
NULL
||
pTscObj
->
hbrid
==
pSql
->
self
||
pSql
->
pSubscription
!=
NULL
)
{
return
false
;
}
...
...
@@ -1888,13 +1872,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
}
void
registerSqlObj
(
SSqlObj
*
pSql
)
{
int32_t
DEFAULT_LIFE_TIME
=
2
*
600
*
1000
;
// 1200 sec
int32_t
ref
=
T_REF_INC
(
pSql
->
pTscObj
);
tscDebug
(
"%p add to tscObj:%p, ref:%d"
,
pSql
,
pSql
->
pTscObj
,
ref
);
TSDB_CACHE_PTR_TYPE
p
=
(
TSDB_CACHE_PTR_TYPE
)
pSql
;
pSql
->
self
=
taosCachePut
(
tscObjCache
,
&
p
,
sizeof
(
TSDB_CACHE_PTR_TYPE
),
&
p
,
sizeof
(
TSDB_CACHE_PTR_TYPE
),
DEFAULT_LIFE_TIME
);
pSql
->
self
=
taosAddRef
(
tscObjRef
,
pSql
);
}
SSqlObj
*
createSimpleSubObj
(
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
int32_t
cmd
)
{
...
...
src/cq/src/cqMain.c
浏览文件 @
9fad9318
...
...
@@ -40,15 +40,14 @@
typedef
struct
{
int32_t
vgId
;
int32_t
master
;
int32_t
num
;
// number of continuous streams
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
char
db
[
TSDB_DB_NAME_LEN
];
FCqWrite
cqWrite
;
void
*
ahandle
;
int32_t
num
;
// number of continuous streams
struct
SCqObj
*
pHead
;
void
*
dbConn
;
int32_t
master
;
void
*
tmrCtrl
;
pthread_mutex_t
mutex
;
}
SCqContext
;
...
...
@@ -90,7 +89,6 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
tstrncpy
(
pContext
->
db
,
db
,
sizeof
(
pContext
->
db
));
pContext
->
vgId
=
pCfg
->
vgId
;
pContext
->
cqWrite
=
pCfg
->
cqWrite
;
pContext
->
ahandle
=
ahandle
;
tscEmbedded
=
1
;
pthread_mutex_init
(
&
pContext
->
mutex
,
NULL
);
...
...
@@ -342,7 +340,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pHead
->
version
=
0
;
// write into vnode write queue
pContext
->
cqWrite
(
pContext
->
ahandle
,
pHead
,
TAOS_QTYPE_CQ
,
NULL
);
pContext
->
cqWrite
(
pContext
->
vgId
,
pHead
,
TAOS_QTYPE_CQ
,
NULL
);
free
(
buffer
);
}
src/cq/test/cqtest.c
浏览文件 @
9fad9318
...
...
@@ -24,7 +24,7 @@
int64_t
ver
=
0
;
void
*
pCq
=
NULL
;
int
writeToQueue
(
void
*
pVnode
,
void
*
data
,
int
type
,
void
*
pMsg
)
{
int
writeToQueue
(
int32_t
vgId
,
void
*
data
,
int
type
,
void
*
pMsg
)
{
return
0
;
}
...
...
src/dnode/src/dnodeMPeer.c
浏览文件 @
9fad9318
...
...
@@ -122,11 +122,16 @@ void dnodeFreeMPeerQueue() {
}
void
dnodeDispatchToMPeerQueue
(
SRpcMsg
*
pMsg
)
{
if
(
!
mnodeIsRunning
()
||
tsMPeerQueue
==
NULL
)
{
if
(
!
mnodeIsRunning
())
{
dnodeSendRedirectMsg
(
pMsg
,
false
);
}
else
{
SMnodeMsg
*
pPeer
=
mnodeCreateMsg
(
pMsg
);
taosWriteQitem
(
tsMPeerQueue
,
TAOS_QTYPE_RPC
,
pPeer
);
if
(
!
mnodeIsReady
())
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
mnodeInitCode
(),
.
pCont
=
NULL
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
SMnodeMsg
*
pPeer
=
mnodeCreateMsg
(
pMsg
);
taosWriteQitem
(
tsMPeerQueue
,
TAOS_QTYPE_RPC
,
pPeer
);
}
}
rpcFreeCont
(
pMsg
->
pCont
);
...
...
src/dnode/src/dnodeMRead.c
浏览文件 @
9fad9318
...
...
@@ -123,11 +123,16 @@ void dnodeFreeMReadQueue() {
}
void
dnodeDispatchToMReadQueue
(
SRpcMsg
*
pMsg
)
{
if
(
!
mnodeIsRunning
()
||
tsMReadQueue
==
NULL
)
{
if
(
!
mnodeIsRunning
())
{
dnodeSendRedirectMsg
(
pMsg
,
true
);
}
else
{
SMnodeMsg
*
pRead
=
mnodeCreateMsg
(
pMsg
);
taosWriteQitem
(
tsMReadQueue
,
TAOS_QTYPE_RPC
,
pRead
);
if
(
!
mnodeIsReady
())
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
mnodeInitCode
(),
.
pCont
=
NULL
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
SMnodeMsg
*
pRead
=
mnodeCreateMsg
(
pMsg
);
taosWriteQitem
(
tsMReadQueue
,
TAOS_QTYPE_RPC
,
pRead
);
}
}
rpcFreeCont
(
pMsg
->
pCont
);
...
...
src/dnode/src/dnodeMWrite.c
浏览文件 @
9fad9318
...
...
@@ -123,13 +123,18 @@ void dnodeFreeMWritequeue() {
}
void
dnodeDispatchToMWriteQueue
(
SRpcMsg
*
pMsg
)
{
if
(
!
mnodeIsRunning
()
||
tsMWriteQueue
==
NULL
)
{
if
(
!
mnodeIsRunning
())
{
dnodeSendRedirectMsg
(
pMsg
,
true
);
}
else
{
SMnodeMsg
*
pWrite
=
mnodeCreateMsg
(
pMsg
);
dDebug
(
"msg:%p, app:%p type:%s is put into mwrite queue:%p"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
tsMWriteQueue
);
taosWriteQitem
(
tsMWriteQueue
,
TAOS_QTYPE_RPC
,
pWrite
);
if
(
!
mnodeIsReady
())
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
mnodeInitCode
(),
.
pCont
=
NULL
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
SMnodeMsg
*
pWrite
=
mnodeCreateMsg
(
pMsg
);
dDebug
(
"msg:%p, app:%p type:%s is put into mwrite queue:%p"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
tsMWriteQueue
);
taosWriteQitem
(
tsMWriteQueue
,
TAOS_QTYPE_RPC
,
pWrite
);
}
}
rpcFreeCont
(
pMsg
->
pCont
);
...
...
@@ -187,7 +192,7 @@ static void *dnodeProcessMWriteQueue(void *param) {
void
dnodeReprocessMWriteMsg
(
void
*
pMsg
)
{
SMnodeMsg
*
pWrite
=
pMsg
;
if
(
!
mnodeIsRunning
()
||
tsMWriteQueue
==
NULL
)
{
if
(
!
mnodeIsRunning
())
{
dDebug
(
"msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
pWrite
->
retry
);
...
...
@@ -196,7 +201,6 @@ void dnodeReprocessMWriteMsg(void *pMsg) {
}
else
{
dDebug
(
"msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
tsMWriteQueue
,
pWrite
->
retry
);
taosWriteQitem
(
tsMWriteQueue
,
TAOS_QTYPE_RPC
,
pWrite
);
}
}
...
...
src/dnode/src/dnodeMain.c
浏览文件 @
9fad9318
...
...
@@ -63,6 +63,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{
"dnodeeps"
,
dnodeInitEps
,
dnodeCleanupEps
},
{
"globalcfg"
,
taosCheckGlobalCfg
,
NULL
},
{
"mnodeinfos"
,
dnodeInitMInfos
,
dnodeCleanupMInfos
},
{
"shell"
,
dnodeInitShell
,
dnodeCleanupShell
},
{
"wal"
,
walInit
,
walCleanUp
},
{
"check"
,
dnodeInitCheck
,
dnodeCleanupCheck
},
// NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{
"vread"
,
dnodeInitVRead
,
dnodeCleanupVRead
},
...
...
@@ -75,7 +76,6 @@ static const SDnodeComponent tsDnodeComponents[] = {
{
"mgmt"
,
dnodeInitMgmt
,
dnodeCleanupMgmt
},
{
"modules"
,
dnodeInitModules
,
dnodeCleanupModules
},
{
"mgmt-tmr"
,
dnodeInitMgmtTimer
,
dnodeCleanupMgmtTimer
},
{
"shell"
,
dnodeInitShell
,
dnodeCleanupShell
},
{
"telemetry"
,
dnodeInitTelemetry
,
dnodeCleanupTelemetry
},
};
...
...
src/dnode/src/dnodeShell.c
浏览文件 @
9fad9318
...
...
@@ -144,7 +144,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
static
int
dnodeRetrieveUserAuthInfo
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
int
code
=
mnodeRetriveAuth
(
user
,
spi
,
encrypt
,
secret
,
ckey
);
if
(
code
!=
TSDB_CODE_
APP_NOT_READY
)
return
code
;
if
(
code
!=
TSDB_CODE_
RPC_REDIRECT
)
return
code
;
SAuthMsg
*
pMsg
=
rpcMallocCont
(
sizeof
(
SAuthMsg
));
tstrncpy
(
pMsg
->
user
,
user
,
sizeof
(
pMsg
->
user
));
...
...
src/inc/mnode.h
浏览文件 @
9fad9318
...
...
@@ -65,12 +65,15 @@ void mnodeStopSystem();
void
sdbUpdateAsync
();
void
sdbUpdateSync
(
void
*
pMnodes
);
bool
mnodeIsRunning
();
bool
mnodeIsReady
();
int32_t
mnodeInitCode
();
int32_t
mnodeProcessRead
(
SMnodeMsg
*
pMsg
);
int32_t
mnodeProcessWrite
(
SMnodeMsg
*
pMsg
);
int32_t
mnodeProcessPeerReq
(
SMnodeMsg
*
pMsg
);
void
mnodeProcessPeerRsp
(
SRpcMsg
*
pMsg
);
int32_t
mnodeRetriveAuth
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
#ifdef __cplusplus
}
#endif
...
...
src/inc/taosdef.h
浏览文件 @
9fad9318
...
...
@@ -257,7 +257,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 512
#define TSDB_MAX_ALLOWED_SQL_LEN (
8*1024*1024U) // sql length should be less than 8
mb
#define TSDB_MAX_ALLOWED_SQL_LEN (
1*1024*1024U) // sql length should be less than 1
mb
#define TSDB_APPNAME_LEN TSDB_UNI_LEN
...
...
src/inc/taoserror.h
浏览文件 @
9fad9318
...
...
@@ -125,6 +125,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, 0, 0x030B, "Data expir
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_QUERY_ID
,
0
,
0x030C
,
"Invalid query id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_STREAM_ID
,
0
,
0x030D
,
"Invalid stream id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_CONN_ID
,
0
,
0x030E
,
"Invalid connection id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INIT
,
0
,
0x030F
,
"Mnode is initializing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INIT_SDB
,
0
,
0x0310
,
"Mnode is initializing meta data, it takes a while if many tables exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INIT_OTHER
,
0
,
0x0311
,
"Mnode is initializing other data"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE
,
0
,
0x0320
,
"Object already there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SDB_ERROR
,
0
,
0x0321
,
"Unexpected generic error in sdb"
)
...
...
@@ -184,6 +187,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DATABASES, 0, 0x0385, "Too many d
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_DB_IN_DROPPING
,
0
,
0x0386
,
"Database not available"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_VGROUP_NOT_READY
,
0
,
0x0387
,
"Database unsynced"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_DB_OPTION_DAYS
,
0
,
0x0390
,
"Invalid database option: days out of range"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_DB_OPTION_KEEP
,
0
,
0x0391
,
"Invalid database option: keep >= keep2 >= keep1 >= days"
)
// dnode
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_MSG_NOT_PROCESSED
,
0
,
0x0400
,
"Message not processed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_OUT_OF_MEMORY
,
0
,
0x0401
,
"Dnode out of memory"
)
...
...
@@ -261,9 +267,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_NOT_ENABLED
,
0
,
0x0901
,
"Sync module not enabled"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INVALID_VERSION
,
0
,
0x0902
,
"Invalid Sync version"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_CONFIRM_EXPIRED
,
0
,
0x0903
,
"Sync confirm expired"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_VND_COMMITING
,
0
,
0x0904
,
"Vnode is commiting"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_FILE_CHNAGED
,
0
,
0x0905
,
"Vnode file is changed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_APP_ERROR
,
0
,
0x1000
,
"Unexpected generic error in sync"
)
// wal
TAOS_DEFINE_ERROR
(
TSDB_CODE_WAL_APP_ERROR
,
0
,
0x1000
,
"Unexpected generic error in wal"
)
...
...
src/inc/tcq.h
浏览文件 @
9fad9318
...
...
@@ -21,7 +21,7 @@ extern "C" {
#include "tdataformat.h"
typedef
int32_t
(
*
FCqWrite
)(
void
*
ahandle
,
void
*
pHead
,
int32_t
qtype
,
void
*
pMsg
);
typedef
int32_t
(
*
FCqWrite
)(
int32_t
vgId
,
void
*
pHead
,
int32_t
qtype
,
void
*
pMsg
);
typedef
struct
{
int32_t
vgId
;
...
...
src/inc/tsync.h
浏览文件 @
9fad9318
...
...
@@ -86,7 +86,7 @@ typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
typedef
int32_t
(
*
FNotifyFileSynced
)(
int32_t
vgId
,
uint64_t
fversion
);
// get file version
typedef
int32_t
(
*
FGet
FileVersion
)(
int32_t
vgId
,
uint64_t
*
f
ver
);
typedef
int32_t
(
*
FGet
Version
)(
int32_t
vgId
,
uint64_t
*
fver
,
uint64_t
*
v
ver
);
typedef
struct
{
int32_t
vgId
;
// vgroup ID
...
...
@@ -100,7 +100,7 @@ typedef struct {
FNotifyRole
notifyRole
;
FNotifyFlowCtrl
notifyFlowCtrl
;
FNotifyFileSynced
notifyFileSynced
;
FGet
FileVersion
getFile
Version
;
FGet
Version
get
Version
;
}
SSyncInfo
;
typedef
void
*
tsync_h
;
...
...
src/mnode/src/mnodeDb.c
浏览文件 @
9fad9318
...
...
@@ -236,30 +236,28 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
if
(
pCfg
->
daysPerFile
<
TSDB_MIN_DAYS_PER_FILE
||
pCfg
->
daysPerFile
>
TSDB_MAX_DAYS_PER_FILE
)
{
mError
(
"invalid db option daysPerFile:%d valid range: [%d, %d]"
,
pCfg
->
daysPerFile
,
TSDB_MIN_DAYS_PER_FILE
,
TSDB_MAX_DAYS_PER_FILE
);
return
TSDB_CODE_MND_INVALID_DB_OPTION
;
return
TSDB_CODE_MND_INVALID_DB_OPTION
_DAYS
;
}
if
(
pCfg
->
daysToKeep
<
TSDB_MIN_KEEP
||
pCfg
->
daysToKeep
>
TSDB_MAX_KEEP
)
{
mError
(
"invalid db option daysToKeep:%d valid range: [%d, %d]"
,
pCfg
->
daysToKeep
,
TSDB_MIN_KEEP
,
TSDB_MAX_KEEP
);
return
TSDB_CODE_MND_INVALID_DB_OPTION
;
return
TSDB_CODE_MND_INVALID_DB_OPTION
_KEEP
;
}
if
(
pCfg
->
daysToKeep
<
pCfg
->
daysPerFile
)
{
mError
(
"invalid db option daysToKeep:%d should larger than daysPerFile:%d"
,
pCfg
->
daysToKeep
,
pCfg
->
daysPerFile
);
return
TSDB_CODE_MND_INVALID_DB_OPTION
;
return
TSDB_CODE_MND_INVALID_DB_OPTION
_KEEP
;
}
#if 0
if
(
pCfg
->
daysToKeep2
<
TSDB_MIN_KEEP
||
pCfg
->
daysToKeep2
>
pCfg
->
daysToKeep
)
{
mError
(
"invalid db option daysToKeep2:%d valid range: [%d, %d]"
,
pCfg
->
daysToKeep
,
TSDB_MIN_KEEP
,
pCfg
->
daysToKeep
);
return TSDB_CODE_MND_INVALID_DB_OPTION;
return
TSDB_CODE_MND_INVALID_DB_OPTION
_KEEP
;
}
if
(
pCfg
->
daysToKeep1
<
TSDB_MIN_KEEP
||
pCfg
->
daysToKeep1
>
pCfg
->
daysToKeep2
)
{
mError
(
"invalid db option daysToKeep1:%d valid range: [%d, %d]"
,
pCfg
->
daysToKeep1
,
TSDB_MIN_KEEP
,
pCfg
->
daysToKeep2
);
return TSDB_CODE_MND_INVALID_DB_OPTION;
return
TSDB_CODE_MND_INVALID_DB_OPTION
_KEEP
;
}
#endif
if
(
pCfg
->
maxRowsPerFileBlock
<
TSDB_MIN_MAX_ROW_FBLOCK
||
pCfg
->
maxRowsPerFileBlock
>
TSDB_MAX_MAX_ROW_FBLOCK
)
{
mError
(
"invalid db option maxRowsPerFileBlock:%d valid range: [%d, %d]"
,
pCfg
->
maxRowsPerFileBlock
,
...
...
src/mnode/src/mnodeMain.c
浏览文件 @
9fad9318
...
...
@@ -17,6 +17,7 @@
#include "os.h"
#include "taosdef.h"
#include "tsched.h"
#include "taoserror.h"
#include "tbalance.h"
#include "tgrant.h"
#include "ttimer.h"
...
...
@@ -37,30 +38,41 @@
#include "mnodeShow.h"
#include "mnodeProfile.h"
typedef
enum
{
TSDB_MND_STATUS_NOT_RUNNING
,
TSDB_MND_STATUS_INIT
,
TSDB_MND_STATUS_INIT_SDB
,
TSDB_MND_STATUS_INIT_OTHER
,
TSDB_MND_STATUS_READY
,
TSDB_MND_STATUS_CLEANING
,
}
EMndStatus
;
typedef
struct
{
const
char
*
const
name
;
int
(
*
init
)();
void
(
*
cleanup
)();
EMndStatus
status
;
}
SMnodeComponent
;
void
*
tsMnodeTmr
=
NULL
;
static
bool
tsMgmtIsRunning
=
false
;
void
*
tsMnodeTmr
=
NULL
;
static
bool
tsMgmtIsRunning
=
false
;
static
EMndStatus
tsMgmtStatus
=
TSDB_MND_STATUS_NOT_RUNNING
;
static
const
SMnodeComponent
tsMnodeComponents
[]
=
{
{
"sdbref"
,
sdbInitRef
,
sdbCleanUpRef
},
{
"profile"
,
mnodeInitProfile
,
mnodeCleanupProfile
},
{
"cluster"
,
mnodeInitCluster
,
mnodeCleanupCluster
},
{
"accts"
,
mnodeInitAccts
,
mnodeCleanupAccts
},
{
"users"
,
mnodeInitUsers
,
mnodeCleanupUsers
},
{
"dnodes"
,
mnodeInitDnodes
,
mnodeCleanupDnodes
},
{
"dbs"
,
mnodeInitDbs
,
mnodeCleanupDbs
},
{
"vgroups"
,
mnodeInitVgroups
,
mnodeCleanupVgroups
},
{
"tables"
,
mnodeInitTables
,
mnodeCleanupTables
},
{
"mnodes"
,
mnodeInitMnodes
,
mnodeCleanupMnodes
},
{
"sdb"
,
sdbInit
,
sdbCleanUp
},
{
"balance"
,
balanceInit
,
balanceCleanUp
},
{
"grant"
,
grantInit
,
grantCleanUp
},
{
"show"
,
mnodeInitShow
,
mnodeCleanUpShow
}
{
"sdbref"
,
sdbInitRef
,
sdbCleanUpRef
,
TSDB_MND_STATUS_INIT
},
{
"profile"
,
mnodeInitProfile
,
mnodeCleanupProfile
,
TSDB_MND_STATUS_INIT
},
{
"cluster"
,
mnodeInitCluster
,
mnodeCleanupCluster
,
TSDB_MND_STATUS_INIT
},
{
"accts"
,
mnodeInitAccts
,
mnodeCleanupAccts
,
TSDB_MND_STATUS_INIT
},
{
"users"
,
mnodeInitUsers
,
mnodeCleanupUsers
,
TSDB_MND_STATUS_INIT
},
{
"dnodes"
,
mnodeInitDnodes
,
mnodeCleanupDnodes
,
TSDB_MND_STATUS_INIT
},
{
"dbs"
,
mnodeInitDbs
,
mnodeCleanupDbs
,
TSDB_MND_STATUS_INIT
},
{
"vgroups"
,
mnodeInitVgroups
,
mnodeCleanupVgroups
,
TSDB_MND_STATUS_INIT
},
{
"tables"
,
mnodeInitTables
,
mnodeCleanupTables
,
TSDB_MND_STATUS_INIT
},
{
"mnodes"
,
mnodeInitMnodes
,
mnodeCleanupMnodes
,
TSDB_MND_STATUS_INIT
},
{
"sdb"
,
sdbInit
,
sdbCleanUp
,
TSDB_MND_STATUS_INIT_SDB
},
{
"balance"
,
balanceInit
,
balanceCleanUp
,
TSDB_MND_STATUS_INIT_OTHER
},
{
"grant"
,
grantInit
,
grantCleanUp
,
TSDB_MND_STATUS_INIT_OTHER
},
{
"show"
,
mnodeInitShow
,
mnodeCleanUpShow
,
TSDB_MND_STATUS_INIT_OTHER
},
};
static
void
mnodeInitTimer
();
...
...
@@ -76,21 +88,24 @@ static void mnodeCleanupComponents(int32_t stepId) {
static
int32_t
mnodeInitComponents
()
{
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
sizeof
(
tsMnodeComponents
)
/
sizeof
(
tsMnodeComponents
[
0
]);
i
++
)
{
tsMgmtStatus
=
tsMnodeComponents
[
i
].
status
;
if
(
tsMnodeComponents
[
i
].
init
()
!=
0
)
{
mnodeCleanupComponents
(
i
);
code
=
-
1
;
break
;
}
// sleep(3);
}
return
code
;
}
int32_t
mnodeStartSystem
()
{
if
(
tsMgmt
IsRunning
)
{
if
(
tsMgmt
Status
!=
TSDB_MND_STATUS_NOT_RUNNING
)
{
mInfo
(
"mnode module already started..."
);
return
0
;
}
tsMgmtStatus
=
TSDB_MND_STATUS_INIT
;
mInfo
(
"starting to initialize mnode ..."
);
if
(
mkdir
(
tsMnodeDir
,
0755
)
!=
0
&&
errno
!=
EEXIST
)
{
mError
(
"failed to init mnode dir:%s, reason:%s"
,
tsMnodeDir
,
strerror
(
errno
));
...
...
@@ -106,7 +121,7 @@ int32_t mnodeStartSystem() {
}
grantReset
(
TSDB_GRANT_ALL
,
0
);
tsMgmt
IsRunning
=
true
;
tsMgmt
Status
=
TSDB_MND_STATUS_READY
;
mInfo
(
"mnode is initialized successfully"
);
...
...
@@ -126,7 +141,7 @@ int32_t mnodeInitSystem() {
void
mnodeCleanupSystem
()
{
if
(
tsMgmtIsRunning
)
{
mInfo
(
"starting to clean up mnode"
);
tsMgmt
IsRunning
=
false
;
tsMgmt
Status
=
TSDB_MND_STATUS_CLEANING
;
dnodeFreeMWritequeue
();
dnodeFreeMReadQueue
();
...
...
@@ -134,6 +149,7 @@ void mnodeCleanupSystem() {
mnodeCleanupTimer
();
mnodeCleanupComponents
(
sizeof
(
tsMnodeComponents
)
/
sizeof
(
tsMnodeComponents
[
0
])
-
1
);
tsMgmtStatus
=
TSDB_MND_STATUS_NOT_RUNNING
;
mInfo
(
"mnode is cleaned up"
);
}
}
...
...
@@ -184,5 +200,29 @@ static bool mnodeNeedStart() {
}
bool
mnodeIsRunning
()
{
return
tsMgmtIsRunning
;
return
(
tsMgmtStatus
!=
TSDB_MND_STATUS_NOT_RUNNING
&&
tsMgmtStatus
!=
TSDB_MND_STATUS_CLEANING
);
}
bool
mnodeIsReady
()
{
return
(
tsMgmtStatus
==
TSDB_MND_STATUS_READY
);
}
int32_t
mnodeInitCode
()
{
int32_t
code
=
-
1
;
switch
(
tsMgmtStatus
)
{
case
TSDB_MND_STATUS_INIT
:
code
=
TSDB_CODE_MND_INIT
;
break
;
case
TSDB_MND_STATUS_INIT_SDB
:
code
=
TSDB_CODE_MND_INIT_SDB
;
break
;
case
TSDB_MND_STATUS_INIT_OTHER
:
code
=
TSDB_CODE_MND_INIT_OTHER
;
break
;
default:
code
=
TSDB_CODE_MND_INIT
;
}
return
code
;
}
src/mnode/src/mnodeSdb.c
浏览文件 @
9fad9318
...
...
@@ -251,6 +251,16 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) {
sdbUpdateMnodeRoles
();
}
static
int32_t
sdbNotifyFileSynced
(
int32_t
vgId
,
uint64_t
fversion
)
{
return
0
;
}
static
void
sdbNotifyFlowCtrl
(
int32_t
vgId
,
int32_t
level
)
{}
static
int32_t
sdbGetSyncVersion
(
int32_t
vgId
,
uint64_t
*
fver
,
uint64_t
*
vver
)
{
*
fver
=
0
;
*
vver
=
0
;
return
0
;
}
// failed to forward, need revert insert
static
void
sdbHandleFailedConfirm
(
SSdbRow
*
pRow
)
{
SWalHead
*
pHead
=
pRow
->
pHead
;
...
...
@@ -299,7 +309,7 @@ void sdbUpdateAsync() {
void
sdbUpdateSync
(
void
*
pMnodes
)
{
SMnodeInfos
*
mnodes
=
pMnodes
;
if
(
!
mnodeIsR
unning
())
{
if
(
!
mnodeIsR
eady
())
{
mDebug
(
"vgId:1, mnode not start yet, update sync config later"
);
return
;
}
...
...
@@ -372,11 +382,14 @@ void sdbUpdateSync(void *pMnodes) {
syncInfo
.
version
=
sdbGetVersion
();
syncInfo
.
syncCfg
=
syncCfg
;
sprintf
(
syncInfo
.
path
,
"%s"
,
tsMnodeDir
);
syncInfo
.
getWalInfo
=
sdbGetWalInfo
;
syncInfo
.
getFileInfo
=
sdbGetFileInfo
;
syncInfo
.
getWalInfo
=
sdbGetWalInfo
;
syncInfo
.
writeToCache
=
sdbWriteFwdToQueue
;
syncInfo
.
confirmForward
=
sdbConfirmForward
;
syncInfo
.
notifyRole
=
sdbNotifyRole
;
syncInfo
.
notifyFileSynced
=
sdbNotifyFileSynced
;
syncInfo
.
notifyFlowCtrl
=
sdbNotifyFlowCtrl
;
syncInfo
.
getVersion
=
sdbGetSyncVersion
;
tsSdbMgmt
.
cfg
=
syncCfg
;
if
(
tsSdbMgmt
.
sync
)
{
...
...
src/mnode/src/mnodeUser.c
浏览文件 @
9fad9318
...
...
@@ -585,10 +585,21 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
}
int32_t
mnodeRetriveAuth
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
*
secret
=
0
;
if
(
!
mnodeIsRunning
())
{
mDebug
(
"user:%s, mnode is not running, fail to auth"
,
user
);
return
TSDB_CODE_RPC_REDIRECT
;
}
if
(
!
mnodeIsReady
())
{
mDebug
(
"user:%s, failed to auth user, mnode is not ready"
,
user
);
return
mnodeInitCode
();
}
if
(
!
sdbIsMaster
())
{
*
secret
=
0
;
mDebug
(
"user:%s, failed to auth user, mnode is not master"
,
user
);
return
TSDB_CODE_
APP_NOT_READY
;
return
TSDB_CODE_
RPC_REDIRECT
;
}
SUserObj
*
pUser
=
mnodeGetUser
(
user
);
...
...
src/rpc/src/rpcMain.c
浏览文件 @
9fad9318
...
...
@@ -630,8 +630,16 @@ static void rpcReleaseConn(SRpcConn *pConn) {
}
else
{
// if there is an outgoing message, free it
if
(
pConn
->
outType
&&
pConn
->
pReqMsg
)
{
if
(
pConn
->
pContext
)
pConn
->
pContext
->
pConn
=
NULL
;
taosRemoveRef
(
tsRpcRefId
,
pConn
->
pContext
->
rid
);
SRpcReqContext
*
pContext
=
pConn
->
pContext
;
if
(
pContext
->
pRsp
)
{
// for synchronous API, post semaphore to unblock app
pContext
->
pRsp
->
code
=
TSDB_CODE_RPC_APP_ERROR
;
pContext
->
pRsp
->
pCont
=
NULL
;
pContext
->
pRsp
->
contLen
=
0
;
tsem_post
(
pContext
->
pSem
);
}
pContext
->
pConn
=
NULL
;
taosRemoveRef
(
tsRpcRefId
,
pContext
->
rid
);
}
}
...
...
@@ -1551,10 +1559,9 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
if
(
!
rpcIsReq
(
pHead
->
msgType
)
)
{
// for response, if code is auth failure, it shall bypass the auth process
code
=
htonl
(
pHead
->
code
);
if
(
code
==
TSDB_CODE_RPC_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_RPC_AUTH_FAILURE
||
code
==
TSDB_CODE_RPC_AUTH_REQUIRED
||
code
==
TSDB_CODE_MND_INVALID_USER
||
code
==
TSDB_CODE_RPC_NOT_READY
)
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
if
(
code
!=
0
)
{
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
return
0
;
}
}
...
...
src/sync/inc/syncInt.h
浏览文件 @
9fad9318
...
...
@@ -139,16 +139,14 @@ typedef struct SsyncPeer {
char
id
[
TSDB_EP_LEN
+
32
];
// peer vgId + end point
uint64_t
version
;
uint64_t
sversion
;
// track the peer version in retrieve process
uint64_t
lastVer
;
// track the file version while retrieve
uint64_t
lastFileVer
;
// track the file version while retrieve
uint64_t
lastWalVer
;
// track the wal version while retrieve
int32_t
syncFd
;
int32_t
peerFd
;
// forward FD
int32_t
numOfRetrieves
;
// number of retrieves tried
int32_t
fileChanged
;
// a flag to indicate file is changed during retrieving process
void
*
timer
;
void
*
pConn
;
int32_t
notifyFd
;
int32_t
watchNum
;
int32_t
*
watchFd
;
int32_t
refCount
;
// reference count
struct
SSyncNode
*
pSyncNode
;
}
SSyncPeer
;
...
...
@@ -173,7 +171,7 @@ typedef struct SSyncNode {
FNotifyRole
notifyRole
;
FNotifyFlowCtrl
notifyFlowCtrl
;
FNotifyFileSynced
notifyFileSynced
;
FGet
FileVersion
getFile
Version
;
FGet
Version
get
Version
;
pthread_mutex_t
mutex
;
}
SSyncNode
;
...
...
src/sync/src/syncMain.c
浏览文件 @
9fad9318
...
...
@@ -196,7 +196,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
pNode
->
confirmForward
=
pInfo
->
confirmForward
;
pNode
->
notifyFlowCtrl
=
pInfo
->
notifyFlowCtrl
;
pNode
->
notifyFileSynced
=
pInfo
->
notifyFileSynced
;
pNode
->
get
FileVersion
=
pInfo
->
getFile
Version
;
pNode
->
get
Version
=
pInfo
->
get
Version
;
pNode
->
selfIndex
=
-
1
;
pNode
->
vgId
=
pInfo
->
vgId
;
...
...
@@ -498,7 +498,6 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) {
taosReleaseRef
(
tsSyncRefId
,
pPeer
->
pSyncNode
->
rid
);
sDebug
(
"%s, resource is freed"
,
pPeer
->
id
);
tfree
(
pPeer
->
watchFd
);
tfree
(
pPeer
);
return
0
;
}
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
9fad9318
...
...
@@ -26,39 +26,78 @@
#include "tsync.h"
#include "syncInt.h"
static
int32_t
syncAreFilesModified
(
SSyncNode
*
pNode
,
SSyncPeer
*
pPeer
)
{
if
(
pNode
->
getFileVersion
==
NULL
)
return
TSDB_CODE_SUCCESS
;
static
int32_t
syncGetWalVersion
(
SSyncNode
*
pNode
,
SSyncPeer
*
pPeer
)
{
uint64_t
fver
,
wver
;
int32_t
code
=
(
*
pNode
->
getVersion
)(
pNode
->
vgId
,
&
fver
,
&
wver
);
if
(
code
!=
0
)
{
sDebug
(
"%s, vnode is commiting while retrieve, last wver:%"
PRIu64
,
pPeer
->
id
,
pPeer
->
lastWalVer
);
return
-
1
;
}
pPeer
->
lastWalVer
=
wver
;
return
code
;
}
static
bool
syncIsWalModified
(
SSyncNode
*
pNode
,
SSyncPeer
*
pPeer
)
{
uint64_t
fver
,
wver
;
int32_t
code
=
(
*
pNode
->
getVersion
)(
pNode
->
vgId
,
&
fver
,
&
wver
);
if
(
code
!=
0
)
{
sDebug
(
"%s, vnode is commiting while retrieve, last wver:%"
PRIu64
,
pPeer
->
id
,
pPeer
->
lastWalVer
);
return
true
;
}
if
(
wver
!=
pPeer
->
lastWalVer
)
{
sDebug
(
"%s, wal is modified while retrieve, wver:%"
PRIu64
", last:%"
PRIu64
,
pPeer
->
id
,
wver
,
pPeer
->
lastWalVer
);
return
true
;
}
return
false
;
}
uint64_t
fver
=
0
;
int32_t
code
=
(
*
pNode
->
getFileVersion
)(
pNode
->
vgId
,
&
fver
);
static
int32_t
syncGetFileVersion
(
SSyncNode
*
pNode
,
SSyncPeer
*
pPeer
)
{
uint64_t
fver
,
wver
;
int32_t
code
=
(
*
pNode
->
getVersion
)(
pNode
->
vgId
,
&
fver
,
&
wver
);
if
(
code
!=
0
)
{
sInfo
(
"%s, vnode is commiting while retrieve, last fver:%"
PRIu64
,
pPeer
->
id
,
pPeer
->
lastVer
);
sDebug
(
"%s, vnode is commiting while retrieve, last fver:%"
PRIu64
,
pPeer
->
id
,
pPeer
->
lastFileVer
);
return
-
1
;
}
pPeer
->
lastFileVer
=
fver
;
return
code
;
}
static
bool
syncAreFilesModified
(
SSyncNode
*
pNode
,
SSyncPeer
*
pPeer
)
{
uint64_t
fver
,
wver
;
int32_t
code
=
(
*
pNode
->
getVersion
)(
pNode
->
vgId
,
&
fver
,
&
wver
);
if
(
code
!=
0
)
{
sDebug
(
"%s, vnode is commiting while retrieve, last fver:%"
PRIu64
,
pPeer
->
id
,
pPeer
->
lastFileVer
);
pPeer
->
fileChanged
=
1
;
return
TSDB_CODE_SYN_VND_COMMITING
;
return
true
;
}
if
(
fver
!=
pPeer
->
lastVer
)
{
s
Info
(
"%s, files are modified while retrieve, fver:%"
PRIu64
", last fver:%"
PRIu64
,
pPeer
->
id
,
fver
,
pPeer
->
last
Ver
);
if
(
fver
!=
pPeer
->
last
File
Ver
)
{
s
Debug
(
"%s, files are modified while retrieve, fver:%"
PRIu64
", last:%"
PRIu64
,
pPeer
->
id
,
fver
,
pPeer
->
lastFile
Ver
);
pPeer
->
fileChanged
=
1
;
return
TSDB_CODE_SYN_FILE_CHNAGED
;
return
true
;
}
pPeer
->
fileChanged
=
0
;
return
TSDB_CODE_SUCCESS
;
return
false
;
}
static
int32_t
syncRetrieveFile
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SFileInfo
fileInfo
;
memset
(
&
fileInfo
,
0
,
sizeof
(
SFileInfo
));
SFileAck
fileAck
=
{
0
};
int32_t
code
=
TSDB_CODE_SYN_APP_ERROR
;
int32_t
code
=
-
1
;
char
name
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
if
(
pNode
->
getFileVersion
)
(
*
pNode
->
getFileVersion
)(
pNode
->
vgId
,
&
pPeer
->
lastVer
)
;
if
(
syncGetFileVersion
(
pNode
,
pPeer
)
<
0
)
return
-
1
;
while
(
1
)
{
// retrieve file info
fileInfo
.
name
[
0
]
=
0
;
fileInfo
.
size
=
0
;
fileInfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
fileInfo
.
name
,
&
fileInfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
fileInfo
.
size
,
&
fileInfo
.
fversion
);
// fileInfo.size = htonl(size);
...
...
@@ -67,14 +106,14 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file info
int32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
(
fileInfo
),
sizeof
(
fileInfo
));
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
)
;
code
=
-
1
;
sError
(
"%s, failed to write file:%s info while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
// if no file anymore, break
if
(
fileInfo
.
magic
==
0
||
fileInfo
.
name
[
0
]
==
0
)
{
code
=
TSDB_CODE_SUCCESS
;
code
=
0
;
sDebug
(
"%s, no more files to sync"
,
pPeer
->
id
);
break
;
}
...
...
@@ -82,7 +121,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// wait for the ack from peer
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
fileAck
,
sizeof
(
fileAck
));
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
)
;
code
=
-
1
;
sError
(
"%s, failed to read file:%s ack while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
...
...
@@ -103,7 +142,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file to peer
int32_t
sfd
=
open
(
name
,
O_RDONLY
);
if
(
sfd
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
)
;
code
=
-
1
;
sError
(
"%s, failed to open file:%s while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
...
...
@@ -111,7 +150,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
ret
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
fileInfo
.
size
);
close
(
sfd
);
if
(
ret
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
)
;
code
=
-
1
;
sError
(
"%s, failed to send file:%s while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
...
...
@@ -120,128 +159,103 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo
.
index
++
;
// check if processed files are modified
code
=
syncAreFilesModified
(
pNode
,
pPeer
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
break
;
if
(
syncAreFilesModified
(
pNode
,
pPeer
))
{
code
=
-
1
;
break
;
}
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sError
(
"%s, failed to retrieve file
since %s"
,
pPeer
->
id
,
tstrerror
(
code
)
);
sError
(
"%s, failed to retrieve file
, code:0x%x"
,
pPeer
->
id
,
code
);
}
return
code
;
}
/* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */
static
int32_t
syncReadOneWalRecord
(
int32_t
sfd
,
SWalHead
*
pHead
,
uint32_t
*
pEvent
)
{
int32_t
ret
;
// if only a partial record is read out, upper layer will reload the file to get a complete record
static
int32_t
syncReadOneWalRecord
(
int32_t
sfd
,
SWalHead
*
pHead
)
{
int32_t
ret
=
read
(
sfd
,
pHead
,
sizeof
(
SWalHead
));
if
(
ret
<
0
)
{
sError
(
"sfd:%d, failed to read wal head since %s, ret:%d"
,
sfd
,
strerror
(
errno
),
ret
);
return
-
1
;
}
ret
=
read
(
sfd
,
pHead
,
sizeof
(
SWalHead
));
if
(
ret
<
0
)
return
-
1
;
if
(
ret
==
0
)
return
0
;
if
(
ret
==
0
)
{
sTrace
(
"sfd:%d, read to the end of file, ret:%d"
,
sfd
,
ret
);
return
0
;
}
if
(
ret
!=
sizeof
(
SWalHead
))
{
// file is not at end yet, it shall be reloaded
*
pEvent
=
*
pEvent
|
IN_MODIFY
;
sDebug
(
"sfd:%d, a partial wal head is read out, ret:%d"
,
sfd
,
ret
)
;
return
0
;
}
assert
(
pHead
->
len
<=
TSDB_MAX_WAL_SIZE
);
ret
=
read
(
sfd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
<
0
)
return
-
1
;
if
(
ret
<
0
)
{
sError
(
"sfd:%d, failed to read wal content since %s, ret:%d"
,
sfd
,
strerror
(
errno
),
ret
);
return
-
1
;
}
if
(
ret
!=
pHead
->
len
)
{
// file is not at end yet, it shall be reloaded
*
pEvent
=
*
pEvent
|
IN_MODIFY
;
sDebug
(
"sfd:%d, a partial wal conetnt is read out, ret:%d"
,
sfd
,
ret
)
;
return
0
;
}
return
sizeof
(
SWalHead
)
+
pHead
->
len
;
}
static
int32_t
syncMonitorLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
)
{
pPeer
->
watchNum
=
0
;
taosClose
(
pPeer
->
notifyFd
);
pPeer
->
notifyFd
=
inotify_init1
(
IN_NONBLOCK
);
if
(
pPeer
->
notifyFd
<
0
)
{
sError
(
"%s, failed to init inotify since %s"
,
pPeer
->
id
,
strerror
(
errno
));
return
-
1
;
}
if
(
pPeer
->
watchFd
==
NULL
)
pPeer
->
watchFd
=
malloc
(
sizeof
(
int32_t
)
*
tsMaxWatchFiles
);
if
(
pPeer
->
watchFd
==
NULL
)
{
sError
(
"%s, failed to allocate watchFd"
,
pPeer
->
id
);
return
-
1
;
}
memset
(
pPeer
->
watchFd
,
-
1
,
sizeof
(
int32_t
)
*
tsMaxWatchFiles
);
int32_t
*
wd
=
pPeer
->
watchFd
;
*
wd
=
inotify_add_watch
(
pPeer
->
notifyFd
,
name
,
IN_MODIFY
|
IN_CLOSE_WRITE
);
if
(
*
wd
==
-
1
)
{
sError
(
"%s, failed to watch last wal since %s"
,
pPeer
->
id
,
strerror
(
errno
));
static
int32_t
syncRetrieveLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
,
uint64_t
fversion
,
int64_t
offset
)
{
int32_t
sfd
=
open
(
name
,
O_RDONLY
);
if
(
sfd
<
0
)
{
sError
(
"%s, failed to open wal:%s for retrieve since:%s"
,
pPeer
->
id
,
name
,
tstrerror
(
errno
));
return
-
1
;
}
return
0
;
}
static
int32_t
syncCheckLastWalChanges
(
SSyncPeer
*
pPeer
,
uint32_t
*
pEvent
)
{
char
buf
[
2048
];
int32_t
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
if
(
len
<
0
&&
errno
!=
EAGAIN
)
{
sError
(
"%s, failed to read notify FD since %s"
,
pPeer
->
id
,
strerror
(
errno
));
int32_t
code
=
taosLSeek
(
sfd
,
offset
,
SEEK_SET
);
if
(
code
<
0
)
{
sError
(
"%s, failed to seek %"
PRId64
" in wal:%s for retrieve since:%s"
,
pPeer
->
id
,
offset
,
name
,
tstrerror
(
errno
));
close
(
sfd
);
return
-
1
;
}
if
(
len
==
0
)
return
0
;
struct
inotify_event
*
event
;
for
(
char
*
ptr
=
buf
;
ptr
<
buf
+
len
;
ptr
+=
sizeof
(
struct
inotify_event
)
+
event
->
len
)
{
event
=
(
struct
inotify_event
*
)
ptr
;
if
(
event
->
mask
&
IN_MODIFY
)
*
pEvent
=
*
pEvent
|
IN_MODIFY
;
if
(
event
->
mask
&
IN_CLOSE_WRITE
)
*
pEvent
=
*
pEvent
|
IN_CLOSE_WRITE
;
}
if
(
pEvent
!=
0
)
sDebug
(
"%s, last wal event:0x%x"
,
pPeer
->
id
,
*
pEvent
);
return
0
;
}
sDebug
(
"%s, retrieve last wal:%s, offset:%"
PRId64
" fver:%"
PRIu64
,
pPeer
->
id
,
name
,
offset
,
fversion
);
static
int32_t
syncRetrieveLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
,
uint64_t
fversion
,
int64_t
offset
,
uint32_t
*
pEvent
)
{
SWalHead
*
pHead
=
malloc
(
SYNC_MAX_SIZE
);
int32_t
code
=
-
1
;
int32_t
bytes
=
0
;
int32_t
sfd
;
sfd
=
open
(
name
,
O_RDONLY
);
if
(
sfd
<
0
)
{
free
(
pHead
);
return
-
1
;
}
(
void
)
lseek
(
sfd
,
offset
,
SEEK_SET
);
sDebug
(
"%s, retrieve last wal, offset:%"
PRId64
" fver:%"
PRIu64
,
pPeer
->
id
,
offset
,
fversion
);
while
(
1
)
{
int32_t
wsize
=
syncReadOneWalRecord
(
sfd
,
pHead
,
pEvent
);
if
(
wsize
<
0
)
break
;
if
(
wsize
==
0
)
{
code
=
0
;
code
=
syncReadOneWalRecord
(
sfd
,
pHead
);
if
(
code
<
0
)
{
sError
(
"%s, failed to read one record from wal:%s"
,
pPeer
->
id
,
name
);
break
;
}
if
(
code
==
0
)
{
code
=
bytes
;
sDebug
(
"%s, read to the end of wal, bytes:%d"
,
pPeer
->
id
,
bytes
);
break
;
}
sTrace
(
"%s, last wal is forwarded, hver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
int32_t
wsize
=
code
;
int32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
pHead
,
wsize
);
if
(
ret
!=
wsize
)
break
;
pPeer
->
sversion
=
pHead
->
version
;
if
(
ret
!=
wsize
)
{
code
=
-
1
;
sError
(
"%s, failed to forward wal since %s, hver:%"
PRIu64
,
pPeer
->
id
,
strerror
(
errno
),
pHead
->
version
);
break
;
}
pPeer
->
sversion
=
pHead
->
version
;
bytes
+=
wsize
;
if
(
pHead
->
version
>=
fversion
&&
fversion
>
0
)
{
code
=
0
;
bytes
=
0
;
sDebug
(
"%s, retrieve wal finished, hver:%"
PRIu64
" fver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
,
fversion
)
;
break
;
}
}
...
...
@@ -249,92 +263,62 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
free
(
pHead
);
close
(
sfd
);
if
(
code
==
0
)
return
bytes
;
return
-
1
;
return
code
;
}
static
int32_t
syncProcessLastWal
(
SSyncPeer
*
pPeer
,
char
*
wname
,
int64_t
index
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
int32_t
code
=
-
1
;
int32_t
once
=
0
;
// last WAL has once ever been processed
int64_t
offset
=
0
;
uint64_t
fversion
=
0
;
char
fname
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
// full path to wal file
if
(
syncAreFilesModified
(
pNode
,
pPeer
)
!=
0
)
return
-
1
;
// get full path to wal file
snprintf
(
fname
,
sizeof
(
fname
),
"%s/%s"
,
pNode
->
path
,
wname
);
sDebug
(
"%s, start to retrieve last wal:%s"
,
pPeer
->
id
,
fname
);
while
(
1
)
{
int32_t
once
=
0
;
// last WAL has once ever been processed
int64_t
offset
=
0
;
uint64_t
fversion
=
0
;
uint32_t
event
=
0
;
// get full path to wal file
snprintf
(
fname
,
sizeof
(
fname
),
"%s/%s"
,
pNode
->
path
,
wname
);
sDebug
(
"%s, start to retrieve last wal:%s"
,
pPeer
->
id
,
fname
);
// monitor last wal
if
(
syncMonitorLastWal
(
pPeer
,
fname
)
<
0
)
break
;
while
(
1
)
{
int32_t
bytes
=
syncRetrieveLastWal
(
pPeer
,
fname
,
fversion
,
offset
,
&
event
);
if
(
bytes
<
0
)
break
;
if
(
syncAreFilesModified
(
pNode
,
pPeer
))
return
-
1
;
if
(
syncGetWalVersion
(
pNode
,
pPeer
)
<
0
)
return
-
1
;
// check file changes
if
(
syncCheckLastWalChanges
(
pPeer
,
&
event
)
<
0
)
break
;
// if file is not updated or updated once, set the fversion and sstatus
if
(((
event
&
IN_MODIFY
)
==
0
)
||
once
)
{
if
(
fversion
==
0
)
{
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_CACHE
;
// start to forward pkt
sDebug
(
"%s, fversion is 0 then set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
fversion
=
nodeVersion
;
// must read data to fversion
}
}
int32_t
bytes
=
syncRetrieveLastWal
(
pPeer
,
fname
,
fversion
,
offset
);
if
(
bytes
<
0
)
{
sDebug
(
"%s, failed to retrieve last wal"
,
pPeer
->
id
);
return
bytes
;
}
// if all data up to fversion is read out, it is over
if
(
pPeer
->
sversion
>=
fversion
&&
fversion
>
0
)
{
code
=
0
;
sDebug
(
"%s, data up to fver:%"
PRIu64
" has been read out, bytes:%d"
,
pPeer
->
id
,
fversion
,
bytes
);
break
;
}
// check file changes
bool
walModified
=
syncIsWalModified
(
pNode
,
pPeer
);
// if all data are read out, and no update
if
((
bytes
==
0
)
&&
((
event
&
IN_MODIFY
)
==
0
))
{
// wal file is closed, break
if
(
event
&
IN_CLOSE_WRITE
)
{
code
=
0
;
sDebug
(
"%s, current wal is closed"
,
pPeer
->
id
);
break
;
}
// wal not closed, it means some data not flushed to disk, wait for a while
usleep
(
10000
);
// if file is not updated or updated once, set the fversion and sstatus
if
(
!
walModified
||
once
)
{
if
(
fversion
==
0
)
{
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_CACHE
;
// start to forward pkt
fversion
=
nodeVersion
;
// must read data to fversion
sDebug
(
"%s, set sstatus:%s and fver:%"
PRIu64
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
],
fversion
);
}
// if bytes>0, file is updated, or fversion is not reached but file still open, read again
once
=
1
;
offset
+=
bytes
;
sDebug
(
"%s, retrieve last wal, bytes:%d"
,
pPeer
->
id
,
bytes
);
event
=
event
&
(
~
IN_MODIFY
);
// clear IN_MODIFY flag
}
if
(
code
<
0
)
break
;
if
(
pPeer
->
sversion
>=
fversion
&&
fversion
>
0
)
break
;
// if all data up to fversion is read out, it is over
if
(
pPeer
->
sversion
>=
fversion
&&
fversion
>
0
)
{
sDebug
(
"%s, data up to fver:%"
PRIu64
" has been read out, bytes:%d sver:%"
PRIu64
,
pPeer
->
id
,
fversion
,
bytes
,
pPeer
->
sversion
);
return
0
;
}
index
++
;
wname
[
0
]
=
0
;
code
=
(
*
pNode
->
getWalInfo
)(
pNode
->
vgId
,
wname
,
&
index
);
if
(
code
<
0
)
break
;
if
(
wname
[
0
]
==
0
)
{
code
=
0
;
break
;
// if all data are read out, and no update
if
(
bytes
==
0
&&
!
walModified
)
{
// wal not closed, it means some data not flushed to disk, wait for a while
usleep
(
10000
);
}
// current last wal is closed, there is a new one
sDebug
(
"%s, last wal is closed, try new one"
,
pPeer
->
id
);
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again
once
=
1
;
offset
+=
bytes
;
sDebug
(
"%s, continue retrieve last wal, bytes:%d offset:%"
PRId64
,
pPeer
->
id
,
bytes
,
offset
);
}
taosClose
(
pPeer
->
notifyFd
);
return
code
;
return
-
1
;
}
static
int32_t
syncRetrieveWal
(
SSyncPeer
*
pPeer
)
{
...
...
@@ -342,7 +326,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
char
fname
[
TSDB_FILENAME_LEN
*
3
];
char
wname
[
TSDB_FILENAME_LEN
*
2
];
int32_t
size
;
struct
stat
fstat
;
int32_t
code
=
-
1
;
int64_t
index
=
0
;
...
...
@@ -350,9 +333,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
// retrieve wal info
wname
[
0
]
=
0
;
code
=
(
*
pNode
->
getWalInfo
)(
pNode
->
vgId
,
wname
,
&
index
);
if
(
code
<
0
)
break
;
// error
if
(
code
<
0
)
{
sError
(
"%s, failed to get wal info since:%s, code:0x%x"
,
pPeer
->
id
,
strerror
(
errno
),
code
);
break
;
}
if
(
wname
[
0
]
==
0
)
{
// no wal file
sDebug
(
"%s, no wal file"
,
pPeer
->
id
);
code
=
0
;
sDebug
(
"%s, no wal file anymore"
,
pPeer
->
id
);
break
;
}
...
...
@@ -364,20 +352,35 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
// get the full path to wal file
snprintf
(
fname
,
sizeof
(
fname
),
"%s/%s"
,
pNode
->
path
,
wname
);
// send wal file,
// inotify is not required, old wal file won't be modified, even remove is ok
if
(
stat
(
fname
,
&
fstat
)
<
0
)
break
;
size
=
fstat
.
st_size
;
// send wal file, old wal file won't be modified, even remove is ok
struct
stat
fstat
;
if
(
stat
(
fname
,
&
fstat
)
<
0
)
{
code
=
-
1
;
sDebug
(
"%s, failed to stat wal:%s for retrieve since %s, code:0x%x"
,
pPeer
->
id
,
fname
,
strerror
(
errno
),
code
);
break
;
}
size
=
fstat
.
st_size
;
sDebug
(
"%s, retrieve wal:%s size:%d"
,
pPeer
->
id
,
fname
,
size
);
int32_t
sfd
=
open
(
fname
,
O_RDONLY
);
if
(
sfd
<
0
)
break
;
if
(
sfd
<
0
)
{
code
=
-
1
;
sError
(
"%s, failed to open wal:%s for retrieve since %s, code:0x%x"
,
pPeer
->
id
,
fname
,
strerror
(
errno
),
code
);
break
;
}
code
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
size
);
close
(
sfd
);
if
(
code
<
0
)
break
;
if
(
code
<
0
)
{
sError
(
"%s, failed to send wal:%s for retrieve since %s, code:0x%x"
,
pPeer
->
id
,
fname
,
strerror
(
errno
),
code
);
break
;
}
if
(
syncAreFilesModified
(
pNode
,
pPeer
)
!=
0
)
break
;
if
(
syncAreFilesModified
(
pNode
,
pPeer
))
{
code
=
-
1
;
break
;
}
}
if
(
code
==
0
)
{
...
...
@@ -386,9 +389,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
SWalHead
walHead
;
memset
(
&
walHead
,
0
,
sizeof
(
walHead
));
code
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
walHead
,
sizeof
(
walHead
));
taosWriteMsg
(
pPeer
->
syncFd
,
&
walHead
,
sizeof
(
walHead
));
}
else
{
sError
(
"%s, failed to send wal since %s
"
,
pPeer
->
id
,
strerror
(
errno
)
);
sError
(
"%s, failed to send wal since %s
, code:0x%x"
,
pPeer
->
id
,
strerror
(
errno
),
code
);
}
return
code
;
...
...
@@ -428,7 +431,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
pPeer
->
sversion
=
0
;
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_FILE
;
sInfo
(
"%s, start to retrieve files, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
if
(
syncRetrieveFile
(
pPeer
)
<
0
)
{
if
(
syncRetrieveFile
(
pPeer
)
!=
0
)
{
sError
(
"%s, failed to retrieve files"
,
pPeer
->
id
);
return
-
1
;
}
...
...
@@ -437,8 +440,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
if
(
pPeer
->
sversion
==
0
)
pPeer
->
sversion
=
1
;
sInfo
(
"%s, start to retrieve wals"
,
pPeer
->
id
);
if
(
syncRetrieveWal
(
pPeer
)
<
0
)
{
sError
(
"%s, failed to retrieve wals"
,
pPeer
->
id
);
int32_t
code
=
syncRetrieveWal
(
pPeer
);
if
(
code
!=
0
)
{
sError
(
"%s, failed to retrieve wals, code:0x%x"
,
pPeer
->
id
,
code
);
return
-
1
;
}
...
...
@@ -474,7 +478,6 @@ void *syncRetrieveData(void *param) {
}
pPeer
->
fileChanged
=
0
;
taosClose
(
pPeer
->
notifyFd
);
taosClose
(
pPeer
->
syncFd
);
syncDecPeerRef
(
pPeer
);
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
9fad9318
...
...
@@ -917,6 +917,8 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
ASSERT
(
pHelper
->
pCompInfo
->
blocks
[
0
].
keyLast
<
pHelper
->
pCompInfo
->
blocks
[
1
].
keyFirst
);
}
ASSERT
((
blkIdx
==
pIdx
->
numOfBlocks
-
1
)
||
(
!
pCompBlock
->
last
));
tsdbDebug
(
"vgId:%d tid:%d a super block is inserted at index %d"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
tableInfo
.
tid
,
blkIdx
);
...
...
@@ -1042,6 +1044,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
pIdx
->
maxKey
=
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
)
->
keyLast
;
pIdx
->
hasLast
=
(
uint32_t
)
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
)
->
last
;
ASSERT
((
blkIdx
==
pIdx
->
numOfBlocks
-
1
)
||
(
!
pCompBlock
->
last
));
tsdbDebug
(
"vgId:%d tid:%d a super block is updated at index %d"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
tableInfo
.
tid
,
blkIdx
);
...
...
@@ -1622,11 +1626,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
pCfg
->
update
);
if
(
pDataCols
->
numOfRows
==
0
)
break
;
if
(
tblkIdx
==
pIdx
->
numOfBlocks
-
1
)
{
if
(
tsdbWriteBlockToProperFile
(
pHelper
,
pDataCols
,
&
compBlock
)
<
0
)
return
-
1
;
}
else
{
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
}
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
if
(
round
==
0
)
{
if
(
oBlock
.
last
&&
pHelper
->
hasOldLastBlock
)
pHelper
->
hasOldLastBlock
=
false
;
...
...
src/util/src/ttimer.c
浏览文件 @
9fad9318
...
...
@@ -560,37 +560,6 @@ void taosTmrCleanUp(void* handle) {
tmrDebug
(
"%s timer controller is cleaned up."
,
ctrl
->
label
);
ctrl
->
label
[
0
]
=
0
;
// cancel all timers of this controller
for
(
size_t
i
=
0
;
i
<
timerMap
.
size
;
i
++
)
{
timer_list_t
*
list
=
timerMap
.
slots
+
i
;
lockTimerList
(
list
);
tmr_obj_t
*
t
=
list
->
timers
;
tmr_obj_t
*
prev
=
NULL
;
while
(
t
!=
NULL
)
{
tmr_obj_t
*
next
=
t
->
mnext
;
if
(
t
->
ctrl
!=
ctrl
)
{
prev
=
t
;
t
=
next
;
continue
;
}
uint8_t
state
=
atomic_val_compare_exchange_8
(
&
t
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_CANCELED
);
if
(
state
==
TIMER_STATE_WAITING
)
{
removeFromWheel
(
t
);
}
timerDecRef
(
t
);
if
(
prev
==
NULL
)
{
list
->
timers
=
next
;
}
else
{
prev
->
mnext
=
next
;
}
t
=
next
;
}
unlockTimerList
(
list
);
}
pthread_mutex_lock
(
&
tmrCtrlMutex
);
ctrl
->
next
=
unusedTmrCtrl
;
numOfTmrCtrl
--
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
9fad9318
...
...
@@ -38,7 +38,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level);
static
int32_t
vnodeNotifyFileSynced
(
int32_t
vgId
,
uint64_t
fversion
);
static
void
vnodeConfirmForard
(
int32_t
vgId
,
void
*
wparam
,
int32_t
code
);
static
int32_t
vnodeWriteToCache
(
int32_t
vgId
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
);
static
int32_t
vnodeGet
FileVersion
(
int32_t
vgId
,
uint64_t
*
f
ver
);
static
int32_t
vnodeGet
Version
(
int32_t
vgId
,
uint64_t
*
fver
,
uint64_t
*
w
ver
);
#ifndef _SYNC
int64_t
syncStart
(
const
SSyncInfo
*
info
)
{
return
NULL
;
}
...
...
@@ -272,7 +272,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy
(
cqCfg
.
pass
,
tsInternalPass
);
strcpy
(
cqCfg
.
db
,
pVnode
->
db
);
cqCfg
.
vgId
=
vnode
;
cqCfg
.
cqWrite
=
vnodeWriteTo
WQueu
e
;
cqCfg
.
cqWrite
=
vnodeWriteTo
Cach
e
;
pVnode
->
cq
=
cqOpen
(
pVnode
,
&
cqCfg
);
if
(
pVnode
->
cq
==
NULL
)
{
vnodeCleanUp
(
pVnode
);
...
...
@@ -353,7 +353,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo
.
notifyRole
=
vnodeNotifyRole
;
syncInfo
.
notifyFlowCtrl
=
vnodeCtrlFlow
;
syncInfo
.
notifyFileSynced
=
vnodeNotifyFileSynced
;
syncInfo
.
get
FileVersion
=
vnodeGetFile
Version
;
syncInfo
.
get
Version
=
vnodeGet
Version
;
pVnode
->
sync
=
syncStart
(
&
syncInfo
);
#ifndef _SYNC
...
...
@@ -771,7 +771,7 @@ static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void
return
code
;
}
static
int32_t
vnodeGet
FileVersion
(
int32_t
vgId
,
uint64_t
*
f
ver
)
{
static
int32_t
vnodeGet
Version
(
int32_t
vgId
,
uint64_t
*
fver
,
uint64_t
*
w
ver
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vError
(
"vgId:%d, vnode not found while write to cache"
,
vgId
);
...
...
@@ -780,10 +780,11 @@ static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
int32_t
code
=
0
;
if
(
pVnode
->
isCommiting
)
{
vDebug
(
"vgId:%d, vnode is commiting while get
file
version"
,
vgId
);
vDebug
(
"vgId:%d, vnode is commiting while get version"
,
vgId
);
code
=
-
1
;
}
else
{
*
fver
=
pVnode
->
fversion
;
*
wver
=
pVnode
->
version
;
}
vnodeRelease
(
pVnode
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录