Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e96516ff
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e96516ff
编写于
11月 22, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/TD-1413
上级
e329a159
a1d247af
变更
48
展开全部
隐藏空白更改
内联
并排
Showing
48 changed file
with
1821 addition
and
1565 deletion
+1821
-1565
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+17
-22
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+9
-8
src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java
...ctor/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java
+1
-1
src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java
...c/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java
+7
-11
src/dnode/src/dnodeMRead.c
src/dnode/src/dnodeMRead.c
+2
-1
src/dnode/src/dnodeMWrite.c
src/dnode/src/dnodeMWrite.c
+5
-5
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+1
-1
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+1
-1
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+2
-2
src/inc/tsync.h
src/inc/tsync.h
+9
-9
src/mnode/inc/mnodeSdb.h
src/mnode/inc/mnodeSdb.h
+45
-42
src/mnode/src/mnodeAcct.c
src/mnode/src/mnodeAcct.c
+35
-36
src/mnode/src/mnodeCluster.c
src/mnode/src/mnodeCluster.c
+30
-30
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+61
-61
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+45
-45
src/mnode/src/mnodeInt.c
src/mnode/src/mnodeInt.c
+1
-1
src/mnode/src/mnodeMnode.c
src/mnode/src/mnodeMnode.c
+41
-45
src/mnode/src/mnodePeer.c
src/mnode/src/mnodePeer.c
+8
-6
src/mnode/src/mnodeRead.c
src/mnode/src/mnodeRead.c
+6
-4
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+418
-479
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+348
-348
src/mnode/src/mnodeUser.c
src/mnode/src/mnodeUser.c
+52
-52
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+86
-86
src/mnode/src/mnodeWrite.c
src/mnode/src/mnodeWrite.c
+7
-7
src/plugins/http/inc/httpInt.h
src/plugins/http/inc/httpInt.h
+1
-1
src/plugins/http/src/httpHandle.c
src/plugins/http/src/httpHandle.c
+2
-2
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+1
-2
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+62
-36
src/query/src/qResultbuf.c
src/query/src/qResultbuf.c
+1
-1
src/query/src/qUtil.c
src/query/src/qUtil.c
+21
-8
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+20
-13
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+139
-84
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+17
-6
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+15
-17
src/sync/src/taosTcpPool.c
src/sync/src/taosTcpPool.c
+1
-1
src/sync/src/tarbitrator.c
src/sync/src/tarbitrator.c
+4
-4
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+2
-2
tests/pytest/query/queryInterval.py
tests/pytest/query/queryInterval.py
+20
-6
tests/pytest/tools/taosdemo.py
tests/pytest/tools/taosdemo.py
+6
-0
tests/script/general/parser/groupby.sim
tests/script/general/parser/groupby.sim
+173
-2
tests/script/general/parser/interp.sim
tests/script/general/parser/interp.sim
+1
-2
tests/script/general/parser/join_multivnode.sim
tests/script/general/parser/join_multivnode.sim
+5
-2
tests/script/general/parser/projection_limit_offset.sim
tests/script/general/parser/projection_limit_offset.sim
+10
-2
tests/script/general/parser/tbnameIn.sim
tests/script/general/parser/tbnameIn.sim
+1
-2
tests/script/general/parser/testSuite.sim
tests/script/general/parser/testSuite.sim
+63
-64
tests/script/general/parser/union.sim
tests/script/general/parser/union.sim
+9
-2
tests/script/general/parser/where.sim
tests/script/general/parser/where.sim
+9
-2
未找到文件。
src/client/src/tscFunctionImpl.c
浏览文件 @
e96516ff
...
...
@@ -130,11 +130,11 @@ typedef struct STopBotInfo {
}
STopBotInfo
;
// leastsquares do not apply to super table
typedef
struct
SLeastsquareInfo
{
typedef
struct
SLeastsquare
s
Info
{
double
mat
[
2
][
3
];
double
startVal
;
int64_t
num
;
}
SLeastsquareInfo
;
}
SLeastsquare
s
Info
;
typedef
struct
SAPercentileInfo
{
SHistogramInfo
*
pHisto
;
...
...
@@ -316,7 +316,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*
interBytes
=
(
int16_t
)
sizeof
(
SPercentileInfo
);
}
else
if
(
functionId
==
TSDB_FUNC_LEASTSQR
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
bytes
=
TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE
;
// string
*
bytes
=
MAX
(
TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE
,
sizeof
(
SLeastsquaresInfo
))
;
// string
*
interBytes
=
*
bytes
;
}
else
if
(
functionId
==
TSDB_FUNC_FIRST_DST
||
functionId
==
TSDB_FUNC_LAST_DST
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -681,7 +681,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, i
}
// no result for first query, data block is required
if
(
GET_RES_INFO
(
pCtx
)
->
numOfRes
<=
0
)
{
if
(
GET_RES_INFO
(
pCtx
)
==
NULL
||
GET_RES_INFO
(
pCtx
)
->
numOfRes
<=
0
)
{
return
BLK_DATA_ALL_NEEDED
;
}
else
{
return
BLK_DATA_NO_NEEDED
;
...
...
@@ -693,7 +693,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, in
return
BLK_DATA_NO_NEEDED
;
}
if
(
GET_RES_INFO
(
pCtx
)
->
numOfRes
<=
0
)
{
if
(
GET_RES_INFO
(
pCtx
)
==
NULL
||
GET_RES_INFO
(
pCtx
)
->
numOfRes
<=
0
)
{
return
BLK_DATA_ALL_NEEDED
;
}
else
{
return
BLK_DATA_NO_NEEDED
;
...
...
@@ -2756,7 +2756,7 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) {
}
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquareInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SLeastsquare
s
Info
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
// 2*3 matrix
pInfo
->
startVal
=
pCtx
->
param
[
0
].
dKey
;
...
...
@@ -2783,7 +2783,7 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) {
static
void
leastsquares_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquareInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SLeastsquare
s
Info
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
double
(
*
param
)[
3
]
=
pInfo
->
mat
;
double
x
=
pInfo
->
startVal
;
...
...
@@ -2853,40 +2853,40 @@ static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) {
return
;
}
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquareInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquare
s
Info
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
double
(
*
param
)[
3
]
=
pInfo
->
mat
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_INT
:
{
int32_t
*
p
=
pData
;
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
index
,
pCtx
->
param
[
1
].
dKey
);
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
0
,
pCtx
->
param
[
1
].
dKey
);
break
;
};
case
TSDB_DATA_TYPE_TINYINT
:
{
int8_t
*
p
=
pData
;
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
index
,
pCtx
->
param
[
1
].
dKey
);
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
0
,
pCtx
->
param
[
1
].
dKey
);
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
int16_t
*
p
=
pData
;
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
index
,
pCtx
->
param
[
1
].
dKey
);
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
0
,
pCtx
->
param
[
1
].
dKey
);
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
{
int64_t
*
p
=
pData
;
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
index
,
pCtx
->
param
[
1
].
dKey
);
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
0
,
pCtx
->
param
[
1
].
dKey
);
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
float
*
p
=
pData
;
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
index
,
pCtx
->
param
[
1
].
dKey
);
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
0
,
pCtx
->
param
[
1
].
dKey
);
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
double
*
p
=
pData
;
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
index
,
pCtx
->
param
[
1
].
dKey
);
LEASTSQR_CAL
(
param
,
pInfo
->
startVal
,
p
,
0
,
pCtx
->
param
[
1
].
dKey
);
break
;
}
default:
...
...
@@ -2904,15 +2904,10 @@ static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) {
static
void
leastsquares_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
// no data in query
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquareInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SLeastsquare
s
Info
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pInfo
->
num
==
0
)
{
if
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_BINARY
||
pCtx
->
outputType
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
(
pCtx
->
aOutputBuf
,
pCtx
->
outputType
);
}
else
{
setNull
(
pCtx
->
aOutputBuf
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
setNull
(
pCtx
->
aOutputBuf
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
return
;
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
e96516ff
...
...
@@ -1454,13 +1454,13 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
}
}
SColumnIndex
index
=
{
0
};
// set the constant column value always attached to first table.
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
SSchema
*
pSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
// add the timestamp column into the output columns
SColumnIndex
index
=
{
0
};
// primary timestamp column info
int32_t
numOfCols
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pQueryInfo
);
tscAddSpecialColumnForSelect
(
pQueryInfo
,
numOfCols
,
TSDB_FUNC_PRJ
,
&
index
,
pSchema
,
TSDB_COL_NORMAL
);
...
...
@@ -2432,6 +2432,8 @@ int32_t getTableIndexImpl(SStrToken* pTableToken, SQueryInfo* pQueryInfo, SColum
if
(
pTableToken
->
n
==
0
)
{
// only one table and no table name prefix in column name
if
(
pQueryInfo
->
numOfTables
==
1
)
{
pIndex
->
tableIndex
=
0
;
}
else
{
pIndex
->
tableIndex
=
COLUMN_INDEX_INITIAL_VAL
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -3950,9 +3952,6 @@ static void doExtractExprForSTable(SSqlCmd* pCmd, tSQLExpr** pExpr, SQueryInfo*
return
;
}
SStrToken
t
=
{
0
};
extractTableNameFromToken
(
&
pLeft
->
colInfo
,
&
t
);
*
pOut
=
*
pExpr
;
(
*
pExpr
)
=
NULL
;
...
...
@@ -4187,7 +4186,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
static
void
doAddJoinTagsColumnsIntoTagList
(
SSqlCmd
*
pCmd
,
SQueryInfo
*
pQueryInfo
,
SCondExpr
*
pCondExpr
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
if
(
QUERY_IS_JOIN_QUERY
(
pQueryInfo
->
type
)
&&
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
SColumnIndex
index
=
{
0
}
;
SColumnIndex
index
=
COLUMN_INDEX_INITIALIZER
;
if
(
getColumnIndexByName
(
pCmd
,
&
pCondExpr
->
pJoinExpr
->
pLeft
->
colInfo
,
pQueryInfo
,
&
index
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p: invalid column name (left)"
,
pQueryInfo
);
...
...
@@ -4604,7 +4603,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
}
SStrToken
columnName
=
{
pVar
->
nLen
,
pVar
->
nType
,
pVar
->
pz
};
SColumnIndex
index
=
{
0
}
;
SColumnIndex
index
=
COLUMN_INDEX_INITIALIZER
;
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
// super table query
if
(
getColumnIndexByName
(
pCmd
,
&
columnName
,
pQueryInfo
,
&
index
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -5509,7 +5508,9 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
tscAddSpecialColumnForSelect
(
pQueryInfo
,
(
int32_t
)
size
,
TSDB_FUNC_PRJ
,
&
colIndex
,
pSchema
,
TSDB_COL_NORMAL
);
SInternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
(
int32_t
)
size
);
int32_t
numOfFields
=
tscNumOfFields
(
pQueryInfo
);
SInternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
numOfFields
-
1
);
doLimitOutputNormalColOfGroupby
(
pInfo
->
pSqlExpr
);
pInfo
->
visible
=
false
;
}
...
...
@@ -6620,7 +6621,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
}
}
}
else
if
(
pSqlExpr
->
nSQLOptr
==
TK_ID
)
{
// column name, normal column arithmetic expression
SColumnIndex
index
=
{
0
}
;
SColumnIndex
index
=
COLUMN_INDEX_INITIALIZER
;
int32_t
ret
=
getColumnIndexByName
(
pCmd
,
&
pSqlExpr
->
colInfo
,
pQueryInfo
,
&
index
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
...
...
src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java
浏览文件 @
e96516ff
...
...
@@ -89,7 +89,7 @@ public class TSDBDriver extends AbstractTaosDriver {
/**
* fetch data from native function in a batch model
*/
public
static
final
String
PROPERTY_KEY_BATCH_LOAD
=
"batch"
;
public
static
final
String
PROPERTY_KEY_BATCH_LOAD
=
"batch
fetch
"
;
private
TSDBDatabaseMetaData
dbMetaData
=
null
;
...
...
src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSetBlockData.java
浏览文件 @
e96516ff
...
...
@@ -31,8 +31,6 @@ import java.util.List;
public
class
TSDBResultSetBlockData
{
private
int
numOfRows
=
0
;
private
int
numOfCols
=
0
;
private
int
rowIndex
=
0
;
private
List
<
ColumnMetaData
>
columnMetaDataList
;
...
...
@@ -40,22 +38,20 @@ public class TSDBResultSetBlockData {
public
TSDBResultSetBlockData
(
List
<
ColumnMetaData
>
colMeta
,
int
numOfCols
)
{
this
.
columnMetaDataList
=
colMeta
;
this
.
setNumOfCols
(
numOfCols
);
this
.
colData
=
new
ArrayList
<
Object
>
(
numOfCols
);
}
public
TSDBResultSetBlockData
()
{
this
.
colData
=
new
ArrayList
<
Object
>();
this
.
setNumOfCols
(
0
);
}
public
void
clear
()
{
int
size
=
this
.
colData
.
size
();
if
(
this
.
colData
!=
null
)
{
this
.
colData
.
clear
();
}
if
(
this
.
numOfCols
==
0
)
{
return
;
}
setNumOfCols
(
size
);
}
public
int
getNumOfRows
()
{
...
...
@@ -67,12 +63,12 @@ public class TSDBResultSetBlockData {
}
public
int
getNumOfCols
()
{
return
numOfCols
;
return
this
.
colData
.
size
()
;
}
public
void
setNumOfCols
(
int
numOfCols
)
{
this
.
numOfCols
=
numOfCols
;
this
.
c
lear
(
);
this
.
colData
=
new
ArrayList
<
Object
>(
numOfCols
)
;
this
.
c
olData
.
addAll
(
Collections
.
nCopies
(
numOfCols
,
null
)
);
}
public
boolean
hasMore
()
{
...
...
src/dnode/src/dnodeMRead.c
浏览文件 @
e96516ff
...
...
@@ -168,7 +168,8 @@ static void *dnodeProcessMReadQueue(void *param) {
break
;
}
dDebug
(
"%p, msg:%s will be processed in mread queue"
,
pRead
->
rpcMsg
.
ahandle
,
taosMsg
[
pRead
->
rpcMsg
.
msgType
]);
dDebug
(
"msg:%p, app:%p type:%s will be processed in mread queue"
,
pRead
->
rpcMsg
.
ahandle
,
pRead
,
taosMsg
[
pRead
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessRead
(
pRead
);
dnodeSendRpcMReadRsp
(
pRead
,
code
);
}
...
...
src/dnode/src/dnodeMWrite.c
浏览文件 @
e96516ff
...
...
@@ -127,7 +127,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
dnodeSendRedirectMsg
(
pMsg
,
true
);
}
else
{
SMnodeMsg
*
pWrite
=
mnodeCreateMsg
(
pMsg
);
dDebug
(
"
app:%p:%p, msg:%s is put into mwrite queue:%p"
,
pWrite
->
rpcMsg
.
ahandle
,
pWrit
e
,
dDebug
(
"
msg:%p, app:%p type:%s is put into mwrite queue:%p"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandl
e
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
tsMWriteQueue
);
taosWriteQitem
(
tsMWriteQueue
,
TAOS_QTYPE_RPC
,
pWrite
);
}
...
...
@@ -136,7 +136,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
}
static
void
dnodeFreeMWriteMsg
(
SMnodeMsg
*
pWrite
)
{
dDebug
(
"
app:%p:%p, msg:%s is freed from mwrite queue:%p"
,
pWrite
->
rpcMsg
.
ahandle
,
pWrit
e
,
dDebug
(
"
msg:%p, app:%p type:%s is freed from mwrite queue:%p"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandl
e
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
tsMWriteQueue
);
mnodeCleanupMsg
(
pWrite
);
...
...
@@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) {
break
;
}
dDebug
(
"
app:%p:%p, msg:%s will be processed in mwrite queue"
,
pWrite
->
rpcMsg
.
ahandle
,
pWrit
e
,
dDebug
(
"
msg:%p, app:%p type:%s will be processed in mwrite queue"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandl
e
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessWrite
(
pWrite
);
...
...
@@ -188,13 +188,13 @@ void dnodeReprocessMWriteMsg(void *pMsg) {
SMnodeMsg
*
pWrite
=
pMsg
;
if
(
!
mnodeIsRunning
()
||
tsMWriteQueue
==
NULL
)
{
dDebug
(
"
app:%p:%p, msg:%s is redirected for mnode not running, retry times:%d"
,
pWrite
->
rpcMsg
.
ahandle
,
pWrit
e
,
dDebug
(
"
msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandl
e
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
pWrite
->
retry
);
dnodeSendRedirectMsg
(
pMsg
,
true
);
dnodeFreeMWriteMsg
(
pWrite
);
}
else
{
dDebug
(
"
app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d"
,
pWrite
->
rpcMsg
.
ahandle
,
pWrit
e
,
dDebug
(
"
msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandl
e
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
],
tsMWriteQueue
,
pWrite
->
retry
);
taosWriteQitem
(
tsMWriteQueue
,
TAOS_QTYPE_RPC
,
pWrite
);
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
e96516ff
...
...
@@ -216,7 +216,7 @@ static void *dnodeProcessMgmtQueue(void *param) {
}
pMsg
=
&
pMgmt
->
rpcMsg
;
dDebug
(
"
%p, msg:%p:%s will be processed"
,
pMsg
->
ahandle
,
pMgmt
,
taosMsg
[
pMsg
->
msgType
]);
dDebug
(
"
msg:%p, ahandle:%p type:%s will be processed"
,
pMgmt
,
pMsg
->
ahandle
,
taosMsg
[
pMsg
->
msgType
]);
if
(
dnodeProcessMgmtMsgFp
[
pMsg
->
msgType
])
{
rsp
.
code
=
(
*
dnodeProcessMgmtMsgFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
e96516ff
...
...
@@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) {
break
;
}
dDebug
(
"
%p, msg:%p:%s will be processed in vread queue, qtype:%d"
,
pRead
->
rpcAhandle
,
pRead
,
dDebug
(
"
msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d"
,
pRead
,
pRead
->
rpcAhandle
,
taosMsg
[
pRead
->
msgType
],
qtype
);
int32_t
code
=
vnodeProcessRead
(
pVnode
,
pRead
);
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
e96516ff
...
...
@@ -205,8 +205,8 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
bool
forceFsync
=
false
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
pWorker
->
qall
,
&
qtype
,
(
void
**
)
&
pWrite
);
dTrace
(
"
%p, msg:%p:%s will be processed in vwrite queue, qtype:%s hver:%"
PRIu64
,
pWrite
->
rpcAhandle
,
pWrite
,
taosMsg
[
pWrite
->
pHead
->
msgType
],
qtypeStr
[
qtype
],
pWrite
->
pHead
->
version
);
dTrace
(
"
msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%"
PRIu64
,
pWrite
,
pWrite
->
rpcAhandle
,
taosMsg
[
pWrite
->
pHead
->
msgType
],
qtypeStr
[
qtype
],
pWrite
->
pHead
->
version
);
pWrite
->
code
=
vnodeProcessWrite
(
pVnode
,
pWrite
->
pHead
,
qtype
,
&
pWrite
->
rspRet
);
if
(
pWrite
->
code
<=
0
)
pWrite
->
processedCount
=
1
;
...
...
src/inc/tsync.h
浏览文件 @
e96516ff
...
...
@@ -24,18 +24,18 @@ extern "C" {
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef
enum
_TAOS_SYNC_ROLE
{
TAOS_SYNC_ROLE_OFFLINE
,
TAOS_SYNC_ROLE_UNSYNCED
,
TAOS_SYNC_ROLE_SYNCING
,
TAOS_SYNC_ROLE_SLAVE
,
TAOS_SYNC_ROLE_MASTER
,
TAOS_SYNC_ROLE_OFFLINE
=
0
,
TAOS_SYNC_ROLE_UNSYNCED
=
1
,
TAOS_SYNC_ROLE_SYNCING
=
2
,
TAOS_SYNC_ROLE_SLAVE
=
3
,
TAOS_SYNC_ROLE_MASTER
=
4
}
ESyncRole
;
typedef
enum
_TAOS_SYNC_STATUS
{
TAOS_SYNC_STATUS_INIT
,
TAOS_SYNC_STATUS_START
,
TAOS_SYNC_STATUS_FILE
,
TAOS_SYNC_STATUS_CACHE
,
TAOS_SYNC_STATUS_INIT
=
0
,
TAOS_SYNC_STATUS_START
=
1
,
TAOS_SYNC_STATUS_FILE
=
2
,
TAOS_SYNC_STATUS_CACHE
=
3
}
ESyncStatus
;
typedef
struct
{
...
...
src/mnode/inc/mnodeSdb.h
浏览文件 @
e96516ff
...
...
@@ -20,7 +20,8 @@
extern
"C"
{
#endif
struct
SMnodeMsg
;
#include "mnode.h"
#include "twal.h"
typedef
enum
{
SDB_TABLE_CLUSTER
=
0
,
...
...
@@ -36,44 +37,46 @@ typedef enum {
}
ESdbTable
;
typedef
enum
{
SDB_KEY_STRING
,
SDB_KEY_INT
,
SDB_KEY_AUTO
,
SDB_KEY_VAR_STRING
,
SDB_KEY_STRING
=
0
,
SDB_KEY_INT
=
1
,
SDB_KEY_AUTO
=
2
,
SDB_KEY_VAR_STRING
=
3
,
}
ESdbKey
;
typedef
enum
{
SDB_OPER_GLOBAL
,
SDB_OPER_LOCAL
SDB_OPER_GLOBAL
=
0
,
SDB_OPER_LOCAL
=
1
}
ESdbOper
;
typedef
struct
SSdbOper
{
ESdbOper
type
;
int32_t
rowSize
;
int32_t
retCode
;
// for callback in sdb queue
int32_t
processedCount
;
// for sync fwd callback
int32_t
(
*
reqFp
)(
struct
SMnodeMsg
*
pMsg
);
int32_t
(
*
writeCb
)(
struct
SMnodeMsg
*
pMsg
,
int32_t
code
);
void
*
table
;
void
*
pObj
;
void
*
rowData
;
struct
SMnodeMsg
*
pMsg
;
}
SSdbOper
;
typedef
struct
SSdbRow
{
ESdbOper
type
;
int32_t
processedCount
;
// for sync fwd callback
int32_t
code
;
// for callback in sdb queue
int32_t
rowSize
;
void
*
rowData
;
void
*
pObj
;
void
*
pTable
;
SMnodeMsg
*
pMsg
;
int32_t
(
*
fpReq
)(
SMnodeMsg
*
pMsg
);
int32_t
(
*
fpRsp
)(
SMnodeMsg
*
pMsg
,
int32_t
code
);
char
reserveForSync
[
16
];
SWalHead
pHead
[];
}
SSdbRow
;
typedef
struct
{
char
*
tableN
ame
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
int32_t
refCountPos
;
ESdbTable
tableI
d
;
char
*
n
ame
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
int32_t
refCountPos
;
ESdbTable
i
d
;
ESdbKey
keyType
;
int32_t
(
*
insertFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
deleteFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
updateFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
encodeFp
)(
SSdbOper
*
pOper
);
int32_t
(
*
decodeFp
)(
SSdbOper
*
pDesc
);
int32_t
(
*
destroyFp
)(
SSdbOper
*
pDesc
);
int32_t
(
*
restoredFp
)();
int32_t
(
*
fpInsert
)(
SSdbRow
*
pRow
);
int32_t
(
*
fpDelete
)(
SSdbRow
*
pRow
);
int32_t
(
*
fpUpdate
)(
SSdbRow
*
pRow
);
int32_t
(
*
fpEncode
)(
SSdbRow
*
pRow
);
int32_t
(
*
fpDecode
)(
SSdbRow
*
pRow
);
int32_t
(
*
fpDestroy
)(
SSdbRow
*
pRow
);
int32_t
(
*
fpRestored
)();
}
SSdbTableDesc
;
int32_t
sdbInit
();
...
...
@@ -84,20 +87,20 @@ bool sdbIsMaster();
bool
sdbIsServing
();
void
sdbUpdateMnodeRoles
();
int32_t
sdbInsertRow
(
SSdb
Oper
*
pOper
);
int32_t
sdbDeleteRow
(
SSdb
Oper
*
pOper
);
int32_t
sdbUpdateRow
(
SSdb
Oper
*
pOper
);
int32_t
sdbInsertRow
Imp
(
SSdbOper
*
pOper
);
int32_t
sdbInsertRow
(
SSdb
Row
*
pRow
);
int32_t
sdbDeleteRow
(
SSdb
Row
*
pRow
);
int32_t
sdbUpdateRow
(
SSdb
Row
*
pRow
);
int32_t
sdbInsertRow
ToQueue
(
SSdbRow
*
pRow
);
void
*
sdbGetRow
(
void
*
hand
le
,
void
*
key
);
void
*
sdbFetchRow
(
void
*
hand
le
,
void
*
pIter
,
void
**
ppRow
);
void
*
sdbGetRow
(
void
*
pTab
le
,
void
*
key
);
void
*
sdbFetchRow
(
void
*
pTab
le
,
void
*
pIter
,
void
**
ppRow
);
void
sdbFreeIter
(
void
*
pIter
);
void
sdbIncRef
(
void
*
thand
le
,
void
*
pRow
);
void
sdbDecRef
(
void
*
thand
le
,
void
*
pRow
);
int64_t
sdbGetNumOfRows
(
void
*
hand
le
);
int32_t
sdbGetId
(
void
*
hand
le
);
void
sdbIncRef
(
void
*
pTab
le
,
void
*
pRow
);
void
sdbDecRef
(
void
*
pTab
le
,
void
*
pRow
);
int64_t
sdbGetNumOfRows
(
void
*
pTab
le
);
int32_t
sdbGetId
(
void
*
pTab
le
);
uint64_t
sdbGetVersion
();
bool
sdbCheckRowDeleted
(
void
*
thand
le
,
void
*
pRow
);
bool
sdbCheckRowDeleted
(
void
*
pTab
le
,
void
*
pRow
);
#ifdef __cplusplus
}
...
...
src/mnode/src/mnodeAcct.c
浏览文件 @
e96516ff
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "dnode.h"
#include "mnodeDef.h"
#include "mnodeInt.h"
...
...
@@ -25,36 +26,34 @@
#include "mnodeUser.h"
#include "mnodeVgroup.h"
#include "tglobal.h"
void
*
tsAcctSdb
=
NULL
;
static
int32_t
tsAcctUpdateSize
;
static
int32_t
mnodeCreateRootAcct
();
static
int32_t
mnodeAcctActionDestroy
(
SSdb
Oper
*
pOper
)
{
SAcctObj
*
pAcct
=
p
Oper
->
pObj
;
static
int32_t
mnodeAcctActionDestroy
(
SSdb
Row
*
pRow
)
{
SAcctObj
*
pAcct
=
p
Row
->
pObj
;
pthread_mutex_destroy
(
&
pAcct
->
mutex
);
tfree
(
p
Oper
->
pObj
);
tfree
(
p
Row
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeAcctActionInsert
(
SSdb
Oper
*
pOper
)
{
SAcctObj
*
pAcct
=
p
Oper
->
pObj
;
static
int32_t
mnodeAcctActionInsert
(
SSdb
Row
*
pRow
)
{
SAcctObj
*
pAcct
=
p
Row
->
pObj
;
memset
(
&
pAcct
->
acctInfo
,
0
,
sizeof
(
SAcctInfo
));
pAcct
->
acctInfo
.
accessState
=
TSDB_VN_ALL_ACCCESS
;
pthread_mutex_init
(
&
pAcct
->
mutex
,
NULL
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeAcctActionDelete
(
SSdb
Oper
*
pOper
)
{
SAcctObj
*
pAcct
=
p
Oper
->
pObj
;
static
int32_t
mnodeAcctActionDelete
(
SSdb
Row
*
pRow
)
{
SAcctObj
*
pAcct
=
p
Row
->
pObj
;
mnodeDropAllUsers
(
pAcct
);
mnodeDropAllDbs
(
pAcct
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeAcctActionUpdate
(
SSdb
Oper
*
pOper
)
{
SAcctObj
*
pAcct
=
p
Oper
->
pObj
;
static
int32_t
mnodeAcctActionUpdate
(
SSdb
Row
*
pRow
)
{
SAcctObj
*
pAcct
=
p
Row
->
pObj
;
SAcctObj
*
pSaved
=
mnodeGetAcct
(
pAcct
->
user
);
if
(
pAcct
!=
pSaved
)
{
memcpy
(
pSaved
,
pAcct
,
tsAcctUpdateSize
);
...
...
@@ -64,19 +63,19 @@ static int32_t mnodeAcctActionUpdate(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeAcctActionEncode
(
SSdb
Oper
*
pOper
)
{
SAcctObj
*
pAcct
=
p
Oper
->
pObj
;
memcpy
(
p
Oper
->
rowData
,
pAcct
,
tsAcctUpdateSize
);
p
Oper
->
rowSize
=
tsAcctUpdateSize
;
static
int32_t
mnodeAcctActionEncode
(
SSdb
Row
*
pRow
)
{
SAcctObj
*
pAcct
=
p
Row
->
pObj
;
memcpy
(
p
Row
->
rowData
,
pAcct
,
tsAcctUpdateSize
);
p
Row
->
rowSize
=
tsAcctUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeAcctActionDecode
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeAcctActionDecode
(
SSdb
Row
*
pRow
)
{
SAcctObj
*
pAcct
=
(
SAcctObj
*
)
calloc
(
1
,
sizeof
(
SAcctObj
));
if
(
pAcct
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pAcct
,
p
Oper
->
rowData
,
tsAcctUpdateSize
);
p
Oper
->
pObj
=
pAcct
;
memcpy
(
pAcct
,
p
Row
->
rowData
,
tsAcctUpdateSize
);
p
Row
->
pObj
=
pAcct
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -99,29 +98,29 @@ int32_t mnodeInitAccts() {
SAcctObj
tObj
;
tsAcctUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_ACCOUNT
,
.
tableName
=
"accounts"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_ACCOUNT
,
.
name
=
"accounts"
,
.
hashSessions
=
TSDB_DEFAULT_ACCOUNTS_HASH_SIZE
,
.
maxRowSize
=
tsAcctUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
insertFp
=
mnodeAcctActionInsert
,
.
deleteFp
=
mnodeAcctActionDelete
,
.
updateFp
=
mnodeAcctActionUpdate
,
.
encodeFp
=
mnodeAcctActionEncode
,
.
decodeFp
=
mnodeAcctActionDecode
,
.
destroyFp
=
mnodeAcctActionDestroy
,
.
restoredFp
=
mnodeAcctActionRestored
.
fpInsert
=
mnodeAcctActionInsert
,
.
fpDelete
=
mnodeAcctActionDelete
,
.
fpUpdate
=
mnodeAcctActionUpdate
,
.
fpEncode
=
mnodeAcctActionEncode
,
.
fpDecode
=
mnodeAcctActionDecode
,
.
fpDestroy
=
mnodeAcctActionDestroy
,
.
fpRestored
=
mnodeAcctActionRestored
};
tsAcctSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsAcctSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsAcctSdb
==
NULL
)
{
mError
(
"table:%s, failed to create hash"
,
tableDesc
.
tableN
ame
);
mError
(
"table:%s, failed to create hash"
,
desc
.
n
ame
);
return
-
1
;
}
mDebug
(
"table:%s, hash is created"
,
tableDesc
.
tableN
ame
);
mDebug
(
"table:%s, hash is created"
,
desc
.
n
ame
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -226,13 +225,13 @@ static int32_t mnodeCreateRootAcct() {
pAcct
->
acctId
=
sdbGetId
(
tsAcctSdb
);
pAcct
->
createdTime
=
taosGetTimestampMs
();
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsAcctSdb
,
.
pObj
=
pAcct
,
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsAcctSdb
,
.
pObj
=
pAcct
,
};
return
sdbInsertRow
(
&
oper
);
return
sdbInsertRow
(
&
row
);
}
#ifndef _ACCT
...
...
src/mnode/src/mnodeCluster.c
浏览文件 @
e96516ff
...
...
@@ -32,36 +32,36 @@ static int32_t mnodeCreateCluster();
static
int32_t
mnodeGetClusterMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mnodeRetrieveClusters
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mnodeClusterActionDestroy
(
SSdb
Oper
*
pOper
)
{
tfree
(
p
Oper
->
pObj
);
static
int32_t
mnodeClusterActionDestroy
(
SSdb
Row
*
pRow
)
{
tfree
(
p
Row
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeClusterActionInsert
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeClusterActionInsert
(
SSdb
Row
*
pRow
)
{
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeClusterActionDelete
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeClusterActionDelete
(
SSdb
Row
*
pRow
)
{
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeClusterActionUpdate
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeClusterActionUpdate
(
SSdb
Row
*
pRow
)
{
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeClusterActionEncode
(
SSdb
Oper
*
pOper
)
{
SClusterObj
*
pCluster
=
p
Oper
->
pObj
;
memcpy
(
p
Oper
->
rowData
,
pCluster
,
tsClusterUpdateSize
);
p
Oper
->
rowSize
=
tsClusterUpdateSize
;
static
int32_t
mnodeClusterActionEncode
(
SSdb
Row
*
pRow
)
{
SClusterObj
*
pCluster
=
p
Row
->
pObj
;
memcpy
(
p
Row
->
rowData
,
pCluster
,
tsClusterUpdateSize
);
p
Row
->
rowSize
=
tsClusterUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeClusterActionDecode
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeClusterActionDecode
(
SSdb
Row
*
pRow
)
{
SClusterObj
*
pCluster
=
(
SClusterObj
*
)
calloc
(
1
,
sizeof
(
SClusterObj
));
if
(
pCluster
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pCluster
,
p
Oper
->
rowData
,
tsClusterUpdateSize
);
p
Oper
->
pObj
=
pCluster
;
memcpy
(
pCluster
,
p
Row
->
rowData
,
tsClusterUpdateSize
);
p
Row
->
pObj
=
pCluster
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -84,32 +84,32 @@ int32_t mnodeInitCluster() {
SClusterObj
tObj
;
tsClusterUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_CLUSTER
,
.
tableName
=
"cluster"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_CLUSTER
,
.
name
=
"cluster"
,
.
hashSessions
=
TSDB_DEFAULT_CLUSTER_HASH_SIZE
,
.
maxRowSize
=
tsClusterUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
insertFp
=
mnodeClusterActionInsert
,
.
deleteFp
=
mnodeClusterActionDelete
,
.
updateFp
=
mnodeClusterActionUpdate
,
.
encodeFp
=
mnodeClusterActionEncode
,
.
decodeFp
=
mnodeClusterActionDecode
,
.
destroyFp
=
mnodeClusterActionDestroy
,
.
restoredFp
=
mnodeClusterActionRestored
.
fpInsert
=
mnodeClusterActionInsert
,
.
fpDelete
=
mnodeClusterActionDelete
,
.
fpUpdate
=
mnodeClusterActionUpdate
,
.
fpEncode
=
mnodeClusterActionEncode
,
.
fpDecode
=
mnodeClusterActionDecode
,
.
fpDestroy
=
mnodeClusterActionDestroy
,
.
fpRestored
=
mnodeClusterActionRestored
};
tsClusterSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsClusterSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsClusterSdb
==
NULL
)
{
mError
(
"table:%s, failed to create hash"
,
tableDesc
.
tableN
ame
);
mError
(
"table:%s, failed to create hash"
,
desc
.
n
ame
);
return
-
1
;
}
mnodeAddShowMetaHandle
(
TSDB_MGMT_TABLE_CLUSTER
,
mnodeGetClusterMeta
);
mnodeAddShowRetrieveHandle
(
TSDB_MGMT_TABLE_CLUSTER
,
mnodeRetrieveClusters
);
mDebug
(
"table:%s, hash is created"
,
tableDesc
.
tableN
ame
);
mDebug
(
"table:%s, hash is created"
,
desc
.
n
ame
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -145,13 +145,13 @@ static int32_t mnodeCreateCluster() {
mDebug
(
"uid is %s"
,
pCluster
->
uid
);
}
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsClusterSdb
,
.
pObj
=
pCluster
,
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsClusterSdb
,
.
pObj
=
pCluster
,
};
return
sdbInsertRow
(
&
oper
);
return
sdbInsertRow
(
&
row
);
}
const
char
*
mnodeGetClusterId
()
{
...
...
src/mnode/src/mnodeDb.c
浏览文件 @
e96516ff
...
...
@@ -56,8 +56,8 @@ static void mnodeDestroyDb(SDbObj *pDb) {
tfree
(
pDb
);
}
static
int32_t
mnodeDbActionDestroy
(
SSdb
Oper
*
pOper
)
{
mnodeDestroyDb
(
p
Oper
->
pObj
);
static
int32_t
mnodeDbActionDestroy
(
SSdb
Row
*
pRow
)
{
mnodeDestroyDb
(
p
Row
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -65,8 +65,8 @@ int64_t mnodeGetDbNum() {
return
sdbGetNumOfRows
(
tsDbSdb
);
}
static
int32_t
mnodeDbActionInsert
(
SSdb
Oper
*
pOper
)
{
SDbObj
*
pDb
=
p
Oper
->
pObj
;
static
int32_t
mnodeDbActionInsert
(
SSdb
Row
*
pRow
)
{
SDbObj
*
pDb
=
p
Row
->
pObj
;
SAcctObj
*
pAcct
=
mnodeGetAcct
(
pDb
->
acct
);
pthread_mutex_init
(
&
pDb
->
mutex
,
NULL
);
...
...
@@ -91,8 +91,8 @@ static int32_t mnodeDbActionInsert(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDbActionDelete
(
SSdb
Oper
*
pOper
)
{
SDbObj
*
pDb
=
p
Oper
->
pObj
;
static
int32_t
mnodeDbActionDelete
(
SSdb
Row
*
pRow
)
{
SDbObj
*
pDb
=
p
Row
->
pObj
;
SAcctObj
*
pAcct
=
mnodeGetAcct
(
pDb
->
acct
);
mnodeDropAllChildTables
(
pDb
);
...
...
@@ -107,11 +107,11 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDbActionUpdate
(
SSdb
Oper
*
pOper
)
{
SDbObj
*
pNew
=
p
Oper
->
pObj
;
static
int32_t
mnodeDbActionUpdate
(
SSdb
Row
*
pRow
)
{
SDbObj
*
pNew
=
p
Row
->
pObj
;
SDbObj
*
pDb
=
mnodeGetDb
(
pNew
->
name
);
if
(
pDb
!=
NULL
&&
pNew
!=
pDb
)
{
memcpy
(
pDb
,
pNew
,
p
Oper
->
rowSize
);
memcpy
(
pDb
,
pNew
,
p
Row
->
rowSize
);
free
(
pNew
->
vgList
);
free
(
pNew
);
}
...
...
@@ -120,19 +120,19 @@ static int32_t mnodeDbActionUpdate(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDbActionEncode
(
SSdb
Oper
*
pOper
)
{
SDbObj
*
pDb
=
p
Oper
->
pObj
;
memcpy
(
p
Oper
->
rowData
,
pDb
,
tsDbUpdateSize
);
p
Oper
->
rowSize
=
tsDbUpdateSize
;
static
int32_t
mnodeDbActionEncode
(
SSdb
Row
*
pRow
)
{
SDbObj
*
pDb
=
p
Row
->
pObj
;
memcpy
(
p
Row
->
rowData
,
pDb
,
tsDbUpdateSize
);
p
Row
->
rowSize
=
tsDbUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDbActionDecode
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeDbActionDecode
(
SSdb
Row
*
pRow
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
calloc
(
1
,
sizeof
(
SDbObj
));
if
(
pDb
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pDb
,
p
Oper
->
rowData
,
tsDbUpdateSize
);
p
Oper
->
pObj
=
pDb
;
memcpy
(
pDb
,
p
Row
->
rowData
,
tsDbUpdateSize
);
p
Row
->
pObj
=
pDb
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -144,23 +144,23 @@ int32_t mnodeInitDbs() {
SDbObj
tObj
;
tsDbUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_DB
,
.
tableName
=
"dbs"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_DB
,
.
name
=
"dbs"
,
.
hashSessions
=
TSDB_DEFAULT_DBS_HASH_SIZE
,
.
maxRowSize
=
tsDbUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
insertFp
=
mnodeDbActionInsert
,
.
deleteFp
=
mnodeDbActionDelete
,
.
updateFp
=
mnodeDbActionUpdate
,
.
encodeFp
=
mnodeDbActionEncode
,
.
decodeFp
=
mnodeDbActionDecode
,
.
destroyFp
=
mnodeDbActionDestroy
,
.
restoredFp
=
mnodeDbActionRestored
.
fpInsert
=
mnodeDbActionInsert
,
.
fpDelete
=
mnodeDbActionDelete
,
.
fpUpdate
=
mnodeDbActionUpdate
,
.
fpEncode
=
mnodeDbActionEncode
,
.
fpDecode
=
mnodeDbActionDecode
,
.
fpDestroy
=
mnodeDbActionDestroy
,
.
fpRestored
=
mnodeDbActionRestored
};
tsDbSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsDbSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsDbSdb
==
NULL
)
{
mError
(
"failed to init db data"
);
return
-
1
;
...
...
@@ -412,16 +412,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
pMsg
->
pDb
=
pDb
;
mnodeIncDbRef
(
pDb
);
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsDbSdb
,
.
pObj
=
pDb
,
.
rowSize
=
sizeof
(
SDbObj
),
.
pMsg
=
pMsg
,
.
writeCb
=
mnodeCreateDbCb
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsDbSdb
,
.
pObj
=
pDb
,
.
rowSize
=
sizeof
(
SDbObj
),
.
pMsg
=
pMsg
,
.
fpRsp
=
mnodeCreateDbCb
};
code
=
sdbInsertRow
(
&
oper
);
code
=
sdbInsertRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to create, reason:%s"
,
pDb
->
name
,
tstrerror
(
code
));
pMsg
->
pDb
=
NULL
;
...
...
@@ -440,8 +440,8 @@ bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
}
#if 0
void mnodePrintVgroups(SDbObj *pDb, char *
oper
) {
mInfo("db:%s, vgroup link from head,
oper:%s", pDb->name, oper
);
void mnodePrintVgroups(SDbObj *pDb, char *
row
) {
mInfo("db:%s, vgroup link from head,
row:%s", pDb->name, row
);
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
mInfo("vgId:%d", pVgroup->vgId);
...
...
@@ -807,13 +807,13 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
if
(
pDb
->
status
)
return
TSDB_CODE_SUCCESS
;
pDb
->
status
=
true
;
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsDbSdb
,
.
pObj
=
pDb
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsDbSdb
,
.
pObj
=
pDb
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
int32_t
code
=
sdbUpdateRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to set dropping state, reason:%s"
,
pDb
->
name
,
tstrerror
(
code
));
}
...
...
@@ -1019,15 +1019,15 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
if
(
memcmp
(
&
newCfg
,
&
pDb
->
cfg
,
sizeof
(
SDbCfg
))
!=
0
)
{
pDb
->
cfg
=
newCfg
;
pDb
->
cfgVersion
++
;
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDbSdb
,
.
pObj
=
pDb
,
.
pMsg
=
pMsg
,
.
writeCb
=
mnodeAlterDbCb
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDbSdb
,
.
pObj
=
pDb
,
.
pMsg
=
pMsg
,
.
fpRsp
=
mnodeAlterDbCb
};
code
=
sdbUpdateRow
(
&
oper
);
code
=
sdbUpdateRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to alter, reason:%s"
,
pDb
->
name
,
tstrerror
(
code
));
}
...
...
@@ -1071,15 +1071,15 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
SDbObj
*
pDb
=
pMsg
->
pDb
;
mInfo
(
"db:%s, drop db from sdb"
,
pDb
->
name
);
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsDbSdb
,
.
pObj
=
pDb
,
.
pMsg
=
pMsg
,
.
writeCb
=
mnodeDropDbCb
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsDbSdb
,
.
pObj
=
pDb
,
.
pMsg
=
pMsg
,
.
fpRsp
=
mnodeDropDbCb
};
int32_t
code
=
sdbDeleteRow
(
&
oper
);
int32_t
code
=
sdbDeleteRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to drop, reason:%s"
,
pDb
->
name
,
tstrerror
(
code
));
}
...
...
@@ -1134,13 +1134,13 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
if
(
pDb
->
pAcct
==
pAcct
)
{
mInfo
(
"db:%s, drop db from sdb for acct:%s is dropped"
,
pDb
->
name
,
pAcct
->
user
);
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
t
able
=
tsDbSdb
,
.
pObj
=
pDb
SSdb
Row
row
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pT
able
=
tsDbSdb
,
.
pObj
=
pDb
};
sdbDeleteRow
(
&
oper
);
sdbDeleteRow
(
&
row
);
numOfDbs
++
;
}
mnodeDecDbRef
(
pDb
);
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
e96516ff
...
...
@@ -87,13 +87,13 @@ static char* offlineReason[] = {
"unknown"
,
};
static
int32_t
mnodeDnodeActionDestroy
(
SSdb
Oper
*
pOper
)
{
tfree
(
p
Oper
->
pObj
);
static
int32_t
mnodeDnodeActionDestroy
(
SSdb
Row
*
pRow
)
{
tfree
(
p
Row
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDnodeActionInsert
(
SSdb
Oper
*
pOper
)
{
SDnodeObj
*
pDnode
=
p
Oper
->
pObj
;
static
int32_t
mnodeDnodeActionInsert
(
SSdb
Row
*
pRow
)
{
SDnodeObj
*
pDnode
=
p
Row
->
pObj
;
if
(
pDnode
->
status
!=
TAOS_DN_STATUS_DROPPING
)
{
pDnode
->
status
=
TAOS_DN_STATUS_OFFLINE
;
pDnode
->
lastAccess
=
tsAccessSquence
;
...
...
@@ -107,8 +107,8 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDnodeActionDelete
(
SSdb
Oper
*
pOper
)
{
SDnodeObj
*
pDnode
=
p
Oper
->
pObj
;
static
int32_t
mnodeDnodeActionDelete
(
SSdb
Row
*
pRow
)
{
SDnodeObj
*
pDnode
=
p
Row
->
pObj
;
#ifndef _SYNC
mnodeDropAllDnodeVgroups
(
pDnode
);
...
...
@@ -121,11 +121,11 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDnodeActionUpdate
(
SSdb
Oper
*
pOper
)
{
SDnodeObj
*
pNew
=
p
Oper
->
pObj
;
static
int32_t
mnodeDnodeActionUpdate
(
SSdb
Row
*
pRow
)
{
SDnodeObj
*
pNew
=
p
Row
->
pObj
;
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
pNew
->
dnodeId
);
if
(
pDnode
!=
NULL
&&
pNew
!=
pDnode
)
{
memcpy
(
pDnode
,
pNew
,
p
Oper
->
rowSize
);
memcpy
(
pDnode
,
pNew
,
p
Row
->
rowSize
);
free
(
pNew
);
}
mnodeDecDnodeRef
(
pDnode
);
...
...
@@ -134,19 +134,19 @@ static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDnodeActionEncode
(
SSdb
Oper
*
pOper
)
{
SDnodeObj
*
pDnode
=
p
Oper
->
pObj
;
memcpy
(
p
Oper
->
rowData
,
pDnode
,
tsDnodeUpdateSize
);
p
Oper
->
rowSize
=
tsDnodeUpdateSize
;
static
int32_t
mnodeDnodeActionEncode
(
SSdb
Row
*
pRow
)
{
SDnodeObj
*
pDnode
=
p
Row
->
pObj
;
memcpy
(
p
Row
->
rowData
,
pDnode
,
tsDnodeUpdateSize
);
p
Row
->
rowSize
=
tsDnodeUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeDnodeActionDecode
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeDnodeActionDecode
(
SSdb
Row
*
pRow
)
{
SDnodeObj
*
pDnode
=
(
SDnodeObj
*
)
calloc
(
1
,
sizeof
(
SDnodeObj
));
if
(
pDnode
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pDnode
,
p
Oper
->
rowData
,
tsDnodeUpdateSize
);
p
Oper
->
pObj
=
pDnode
;
memcpy
(
pDnode
,
p
Row
->
rowData
,
tsDnodeUpdateSize
);
p
Row
->
pObj
=
pDnode
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -171,23 +171,23 @@ int32_t mnodeInitDnodes() {
tsDnodeUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
pthread_mutex_init
(
&
tsDnodeEpsMutex
,
NULL
);
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_DNODE
,
.
tableName
=
"dnodes"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_DNODE
,
.
name
=
"dnodes"
,
.
hashSessions
=
TSDB_DEFAULT_DNODES_HASH_SIZE
,
.
maxRowSize
=
tsDnodeUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_AUTO
,
.
insertFp
=
mnodeDnodeActionInsert
,
.
deleteFp
=
mnodeDnodeActionDelete
,
.
updateFp
=
mnodeDnodeActionUpdate
,
.
encodeFp
=
mnodeDnodeActionEncode
,
.
decodeFp
=
mnodeDnodeActionDecode
,
.
destroyFp
=
mnodeDnodeActionDestroy
,
.
restoredFp
=
mnodeDnodeActionRestored
.
fpInsert
=
mnodeDnodeActionInsert
,
.
fpDelete
=
mnodeDnodeActionDelete
,
.
fpUpdate
=
mnodeDnodeActionUpdate
,
.
fpEncode
=
mnodeDnodeActionEncode
,
.
fpDecode
=
mnodeDnodeActionDecode
,
.
fpDestroy
=
mnodeDnodeActionDestroy
,
.
fpRestored
=
mnodeDnodeActionRestored
};
tsDnodeSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsDnodeSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsDnodeSdb
==
NULL
)
{
mError
(
"failed to init dnodes data"
);
return
-
1
;
...
...
@@ -296,13 +296,13 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode) {
}
void
mnodeUpdateDnode
(
SDnodeObj
*
pDnode
)
{
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsDnodeSdb
,
.
pObj
=
pDnode
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsDnodeSdb
,
.
pObj
=
pDnode
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
int32_t
code
=
sdbUpdateRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"dnodeId:%d, failed update"
,
pDnode
->
dnodeId
);
}
...
...
@@ -644,15 +644,15 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
tstrncpy
(
pDnode
->
dnodeEp
,
ep
,
TSDB_EP_LEN
);
taosGetFqdnPortFromEp
(
ep
,
pDnode
->
dnodeFqdn
,
&
pDnode
->
dnodePort
);
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
.
rowSize
=
sizeof
(
SDnodeObj
),
.
pMsg
=
pMsg
.
pMsg
=
pMsg
};
int32_t
code
=
sdbInsertRow
(
&
oper
);
int32_t
code
=
sdbInsertRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
int
dnodeId
=
pDnode
->
dnodeId
;
tfree
(
pDnode
);
...
...
@@ -665,14 +665,14 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
}
int32_t
mnodeDropDnode
(
SDnodeObj
*
pDnode
,
void
*
pMsg
)
{
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
.
pMsg
=
pMsg
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
.
pMsg
=
pMsg
};
int32_t
code
=
sdbDeleteRow
(
&
oper
);
int32_t
code
=
sdbDeleteRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"dnode:%d, failed to drop from cluster, result:%s"
,
pDnode
->
dnodeId
,
tstrerror
(
code
));
}
else
{
...
...
@@ -1141,7 +1141,7 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
mnodeGetMnodeRoleStr
(
pVgid
->
role
)
);
strcpy
(
pWrite
,
syncRole
[
pVgid
->
role
]
);
cols
++
;
}
}
...
...
src/mnode/src/mnodeInt.c
浏览文件 @
e96516ff
...
...
@@ -48,7 +48,7 @@ void *mnodeCreateMsg(SRpcMsg *pRpcMsg) {
int32_t
mnodeInitMsg
(
SMnodeMsg
*
pMsg
)
{
if
(
pMsg
->
pUser
!=
NULL
)
{
m
Debug
(
"app:%p:%p, user info already inited"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
);
m
Trace
(
"msg:%p, app:%p user info already inited"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
);
return
TSDB_CODE_SUCCESS
;
}
...
...
src/mnode/src/mnodeMnode.c
浏览文件 @
e96516ff
...
...
@@ -58,13 +58,13 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
#define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock)
#endif
static
int32_t
mnodeMnodeActionDestroy
(
SSdb
Oper
*
pOper
)
{
tfree
(
p
Oper
->
pObj
);
static
int32_t
mnodeMnodeActionDestroy
(
SSdb
Row
*
pRow
)
{
tfree
(
p
Row
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeMnodeActionInsert
(
SSdb
Oper
*
pOper
)
{
SMnodeObj
*
pMnode
=
p
Oper
->
pObj
;
static
int32_t
mnodeMnodeActionInsert
(
SSdb
Row
*
pRow
)
{
SMnodeObj
*
pMnode
=
p
Row
->
pObj
;
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
pMnode
->
mnodeId
);
if
(
pDnode
==
NULL
)
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
...
...
@@ -76,8 +76,8 @@ static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeMnodeActionDelete
(
SSdb
Oper
*
pOper
)
{
SMnodeObj
*
pMnode
=
p
Oper
->
pObj
;
static
int32_t
mnodeMnodeActionDelete
(
SSdb
Row
*
pRow
)
{
SMnodeObj
*
pMnode
=
p
Row
->
pObj
;
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
pMnode
->
mnodeId
);
if
(
pDnode
==
NULL
)
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
...
...
@@ -88,30 +88,30 @@ static int32_t mnodeMnodeActionDelete(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeMnodeActionUpdate
(
SSdb
Oper
*
pOper
)
{
SMnodeObj
*
pMnode
=
p
Oper
->
pObj
;
static
int32_t
mnodeMnodeActionUpdate
(
SSdb
Row
*
pRow
)
{
SMnodeObj
*
pMnode
=
p
Row
->
pObj
;
SMnodeObj
*
pSaved
=
mnodeGetMnode
(
pMnode
->
mnodeId
);
if
(
pMnode
!=
pSaved
)
{
memcpy
(
pSaved
,
pMnode
,
p
Oper
->
rowSize
);
memcpy
(
pSaved
,
pMnode
,
p
Row
->
rowSize
);
free
(
pMnode
);
}
mnodeDecMnodeRef
(
pSaved
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeMnodeActionEncode
(
SSdb
Oper
*
pOper
)
{
SMnodeObj
*
pMnode
=
p
Oper
->
pObj
;
memcpy
(
p
Oper
->
rowData
,
pMnode
,
tsMnodeUpdateSize
);
p
Oper
->
rowSize
=
tsMnodeUpdateSize
;
static
int32_t
mnodeMnodeActionEncode
(
SSdb
Row
*
pRow
)
{
SMnodeObj
*
pMnode
=
p
Row
->
pObj
;
memcpy
(
p
Row
->
rowData
,
pMnode
,
tsMnodeUpdateSize
);
p
Row
->
rowSize
=
tsMnodeUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeMnodeActionDecode
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeMnodeActionDecode
(
SSdb
Row
*
pRow
)
{
SMnodeObj
*
pMnode
=
calloc
(
1
,
sizeof
(
SMnodeObj
));
if
(
pMnode
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pMnode
,
p
Oper
->
rowData
,
tsMnodeUpdateSize
);
p
Oper
->
pObj
=
pMnode
;
memcpy
(
pMnode
,
p
Row
->
rowData
,
tsMnodeUpdateSize
);
p
Row
->
pObj
=
pMnode
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -137,23 +137,23 @@ int32_t mnodeInitMnodes() {
SMnodeObj
tObj
;
tsMnodeUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_MNODE
,
.
tableName
=
"mnodes"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_MNODE
,
.
name
=
"mnodes"
,
.
hashSessions
=
TSDB_DEFAULT_MNODES_HASH_SIZE
,
.
maxRowSize
=
tsMnodeUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_INT
,
.
insertFp
=
mnodeMnodeActionInsert
,
.
deleteFp
=
mnodeMnodeActionDelete
,
.
updateFp
=
mnodeMnodeActionUpdate
,
.
encodeFp
=
mnodeMnodeActionEncode
,
.
decodeFp
=
mnodeMnodeActionDecode
,
.
destroyFp
=
mnodeMnodeActionDestroy
,
.
restoredFp
=
mnodeMnodeActionRestored
.
fpInsert
=
mnodeMnodeActionInsert
,
.
fpDelete
=
mnodeMnodeActionDelete
,
.
fpUpdate
=
mnodeMnodeActionUpdate
,
.
fpEncode
=
mnodeMnodeActionEncode
,
.
fpDecode
=
mnodeMnodeActionDecode
,
.
fpDestroy
=
mnodeMnodeActionDestroy
,
.
fpRestored
=
mnodeMnodeActionRestored
};
tsMnodeSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsMnodeSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsMnodeSdb
==
NULL
)
{
mError
(
"failed to init mnodes data"
);
return
-
1
;
...
...
@@ -192,10 +192,6 @@ void *mnodeGetNextMnode(void *pIter, SMnodeObj **pMnode) {
return
sdbFetchRow
(
tsMnodeSdb
,
pIter
,
(
void
**
)
pMnode
);
}
char
*
mnodeGetMnodeRoleStr
(
int32_t
role
)
{
return
syncRole
[
role
];
}
void
mnodeUpdateMnodeEpSet
()
{
mInfo
(
"update mnodes epSet, numOfEps:%d "
,
mnodeGetMnodesNum
());
...
...
@@ -329,11 +325,11 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
pMnode
->
mnodeId
=
dnodeId
;
pMnode
->
createdTime
=
taosGetTimestampMs
();
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsMnodeSdb
,
.
pObj
=
pMnode
,
.
writeCb
=
mnodeCreateMnodeCb
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
tsMnodeSdb
,
.
pObj
=
pMnode
,
.
fpRsp
=
mnodeCreateMnodeCb
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -346,7 +342,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
return
;
}
code
=
sdbInsertRow
(
&
oper
);
code
=
sdbInsertRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"dnode:%d, failed to create mnode, ep:%s reason:%s"
,
dnodeId
,
dnodeEp
,
tstrerror
(
code
));
tfree
(
pMnode
);
...
...
@@ -356,8 +352,8 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
void
mnodeDropMnodeLocal
(
int32_t
dnodeId
)
{
SMnodeObj
*
pMnode
=
mnodeGetMnode
(
dnodeId
);
if
(
pMnode
!=
NULL
)
{
SSdb
Oper
oper
=
{.
type
=
SDB_OPER_LOCAL
,
.
t
able
=
tsMnodeSdb
,
.
pObj
=
pMnode
};
sdbDeleteRow
(
&
oper
);
SSdb
Row
row
=
{.
type
=
SDB_OPER_LOCAL
,
.
pT
able
=
tsMnodeSdb
,
.
pObj
=
pMnode
};
sdbDeleteRow
(
&
row
);
mnodeDecMnodeRef
(
pMnode
);
}
...
...
@@ -371,13 +367,13 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
}
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsMnodeSdb
,
.
pObj
=
pMnode
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsMnodeSdb
,
.
pObj
=
pMnode
};
int32_t
code
=
sdbDeleteRow
(
&
oper
);
int32_t
code
=
sdbDeleteRow
(
&
row
);
sdbDecRef
(
tsMnodeSdb
,
pMnode
);
...
...
@@ -469,7 +465,7 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
char
*
roles
=
mnodeGetMnodeRoleStr
(
pMnode
->
role
)
;
char
*
roles
=
syncRole
[
pMnode
->
role
]
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
roles
,
pShow
->
bytes
[
cols
]);
cols
++
;
...
...
src/mnode/src/mnodePeer.c
浏览文件 @
e96516ff
...
...
@@ -46,7 +46,7 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
int32_t
mnodeProcessPeerReq
(
SMnodeMsg
*
pMsg
)
{
if
(
pMsg
->
rpcMsg
.
pCont
==
NULL
)
{
mError
(
"
%p, msg:%s in mpeer queue, content is null"
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
mError
(
"
msg:%p, ahandle:%p type:%s in mpeer queue, content is null"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
return
TSDB_CODE_MND_INVALID_MSG_LEN
;
}
...
...
@@ -57,8 +57,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp
->
rsp
=
epSet
;
rpcRsp
->
len
=
sizeof
(
SRpcEpSet
);
mDebug
(
"
%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
epSet
->
numOfEps
,
epSet
->
inUse
);
mDebug
(
"
msg:%p, ahandle:%p type:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
epSet
->
numOfEps
,
epSet
->
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
if
(
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
)
{
epSet
->
inUse
=
(
i
+
1
)
%
epSet
->
numOfEps
;
...
...
@@ -72,7 +72,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
}
if
(
tsMnodeProcessPeerMsgFp
[
pMsg
->
rpcMsg
.
msgType
]
==
NULL
)
{
mError
(
"%p, msg:%s in mpeer queue, not processed"
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
mError
(
"msg:%p, ahandle:%p type:%s in mpeer queue, not processed"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
return
TSDB_CODE_MND_MSG_NOT_PROCESSED
;
}
...
...
@@ -81,13 +82,14 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
void
mnodeProcessPeerRsp
(
SRpcMsg
*
pMsg
)
{
if
(
!
sdbIsMaster
())
{
mError
(
"%p, msg:%s is not processed for it is not master"
,
pMsg
->
ahandle
,
taosMsg
[
pMsg
->
msgType
]);
mError
(
"msg:%p, ahandle:%p type:%s is not processed for it is not master"
,
pMsg
,
pMsg
->
ahandle
,
taosMsg
[
pMsg
->
msgType
]);
return
;
}
if
(
tsMnodeProcessPeerRspFp
[
pMsg
->
msgType
])
{
(
*
tsMnodeProcessPeerRspFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
mError
(
"
%p, msg:%s is not processed"
,
pMsg
->
ahandle
,
taosMsg
[
pMsg
->
msgType
]);
mError
(
"
msg:%p, ahandle:%p type:%s is not processed"
,
pMsg
,
pMsg
->
ahandle
,
taosMsg
[
pMsg
->
msgType
]);
}
}
src/mnode/src/mnodeRead.c
浏览文件 @
e96516ff
...
...
@@ -43,7 +43,7 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) {
int32_t
mnodeProcessRead
(
SMnodeMsg
*
pMsg
)
{
if
(
pMsg
->
rpcMsg
.
pCont
==
NULL
)
{
mError
(
"
%p, msg:%s in mread queue, content is null"
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
mError
(
"
msg:%p, app:%p type:%s in mread queue, content is null"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
return
TSDB_CODE_MND_INVALID_MSG_LEN
;
}
...
...
@@ -52,7 +52,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SRpcEpSet
*
epSet
=
rpcMallocCont
(
sizeof
(
SRpcEpSet
));
mnodeGetMnodeEpSetForShell
(
epSet
);
mDebug
(
"
%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
mDebug
(
"
msg:%p, app:%p type:%s in mread queue will be redirected, numOfEps:%d inUse:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
epSet
->
numOfEps
,
epSet
->
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
if
(
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
)
{
...
...
@@ -70,13 +70,15 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
}
if
(
tsMnodeProcessReadMsgFp
[
pMsg
->
rpcMsg
.
msgType
]
==
NULL
)
{
mError
(
"%p, msg:%s in mread queue, not processed"
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
mError
(
"msg:%p, app:%p type:%s in mread queue, not processed"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
return
TSDB_CODE_MND_MSG_NOT_PROCESSED
;
}
int32_t
code
=
mnodeInitMsg
(
pMsg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"%p, msg:%s in mread queue, not processed reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
tstrerror
(
code
));
mError
(
"msg:%p, app:%p type:%s in mread queue, not processed reason:%s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
tstrerror
(
code
));
return
code
;
}
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
src/mnode/src/mnodeTable.c
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
src/mnode/src/mnodeUser.c
浏览文件 @
e96516ff
...
...
@@ -42,13 +42,13 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg);
static
int32_t
mnodeProcessDropUserMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mnodeProcessAuthMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mnodeUserActionDestroy
(
SSdb
Oper
*
pOper
)
{
tfree
(
p
Oper
->
pObj
);
static
int32_t
mnodeUserActionDestroy
(
SSdb
Row
*
pRow
)
{
tfree
(
p
Row
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeUserActionInsert
(
SSdb
Oper
*
pOper
)
{
SUserObj
*
pUser
=
p
Oper
->
pObj
;
static
int32_t
mnodeUserActionInsert
(
SSdb
Row
*
pRow
)
{
SUserObj
*
pUser
=
p
Row
->
pObj
;
SAcctObj
*
pAcct
=
mnodeGetAcct
(
pUser
->
acct
);
if
(
pAcct
!=
NULL
)
{
...
...
@@ -62,8 +62,8 @@ static int32_t mnodeUserActionInsert(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeUserActionDelete
(
SSdb
Oper
*
pOper
)
{
SUserObj
*
pUser
=
p
Oper
->
pObj
;
static
int32_t
mnodeUserActionDelete
(
SSdb
Row
*
pRow
)
{
SUserObj
*
pUser
=
p
Row
->
pObj
;
SAcctObj
*
pAcct
=
mnodeGetAcct
(
pUser
->
acct
);
if
(
pAcct
!=
NULL
)
{
...
...
@@ -74,8 +74,8 @@ static int32_t mnodeUserActionDelete(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeUserActionUpdate
(
SSdb
Oper
*
pOper
)
{
SUserObj
*
pUser
=
p
Oper
->
pObj
;
static
int32_t
mnodeUserActionUpdate
(
SSdb
Row
*
pRow
)
{
SUserObj
*
pUser
=
p
Row
->
pObj
;
SUserObj
*
pSaved
=
mnodeGetUser
(
pUser
->
user
);
if
(
pUser
!=
pSaved
)
{
memcpy
(
pSaved
,
pUser
,
tsUserUpdateSize
);
...
...
@@ -85,19 +85,19 @@ static int32_t mnodeUserActionUpdate(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeUserActionEncode
(
SSdb
Oper
*
pOper
)
{
SUserObj
*
pUser
=
p
Oper
->
pObj
;
memcpy
(
p
Oper
->
rowData
,
pUser
,
tsUserUpdateSize
);
p
Oper
->
rowSize
=
tsUserUpdateSize
;
static
int32_t
mnodeUserActionEncode
(
SSdb
Row
*
pRow
)
{
SUserObj
*
pUser
=
p
Row
->
pObj
;
memcpy
(
p
Row
->
rowData
,
pUser
,
tsUserUpdateSize
);
p
Row
->
rowSize
=
tsUserUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeUserActionDecode
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeUserActionDecode
(
SSdb
Row
*
pRow
)
{
SUserObj
*
pUser
=
(
SUserObj
*
)
calloc
(
1
,
sizeof
(
SUserObj
));
if
(
pUser
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pUser
,
p
Oper
->
rowData
,
tsUserUpdateSize
);
p
Oper
->
pObj
=
pUser
;
memcpy
(
pUser
,
p
Row
->
rowData
,
tsUserUpdateSize
);
p
Row
->
pObj
=
pUser
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -150,25 +150,25 @@ int32_t mnodeInitUsers() {
SUserObj
tObj
;
tsUserUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_USER
,
.
tableName
=
"users"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_USER
,
.
name
=
"users"
,
.
hashSessions
=
TSDB_DEFAULT_USERS_HASH_SIZE
,
.
maxRowSize
=
tsUserUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
insertFp
=
mnodeUserActionInsert
,
.
deleteFp
=
mnodeUserActionDelete
,
.
updateFp
=
mnodeUserActionUpdate
,
.
encodeFp
=
mnodeUserActionEncode
,
.
decodeFp
=
mnodeUserActionDecode
,
.
destroyFp
=
mnodeUserActionDestroy
,
.
restoredFp
=
mnodeUserActionRestored
.
fpInsert
=
mnodeUserActionInsert
,
.
fpDelete
=
mnodeUserActionDelete
,
.
fpUpdate
=
mnodeUserActionUpdate
,
.
fpEncode
=
mnodeUserActionEncode
,
.
fpDecode
=
mnodeUserActionDecode
,
.
fpDestroy
=
mnodeUserActionDestroy
,
.
fpRestored
=
mnodeUserActionRestored
};
tsUserSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsUserSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsUserSdb
==
NULL
)
{
mError
(
"table:%s, failed to create hash"
,
tableDesc
.
tableN
ame
);
mError
(
"table:%s, failed to create hash"
,
desc
.
n
ame
);
return
-
1
;
}
...
...
@@ -179,7 +179,7 @@ int32_t mnodeInitUsers() {
mnodeAddShowRetrieveHandle
(
TSDB_MGMT_TABLE_USER
,
mnodeRetrieveUsers
);
mnodeAddPeerMsgHandle
(
TSDB_MSG_TYPE_DM_AUTH
,
mnodeProcessAuthMsg
);
mDebug
(
"table:%s, hash is created"
,
tableDesc
.
tableN
ame
);
mDebug
(
"table:%s, hash is created"
,
desc
.
n
ame
);
return
0
;
}
...
...
@@ -205,14 +205,14 @@ void mnodeDecUserRef(SUserObj *pUser) {
}
static
int32_t
mnodeUpdateUser
(
SUserObj
*
pUser
,
void
*
pMsg
)
{
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsUserSdb
,
.
pObj
=
pUser
,
.
pMsg
=
pMsg
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsUserSdb
,
.
pObj
=
pUser
,
.
pMsg
=
pMsg
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
int32_t
code
=
sdbUpdateRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to alter by %s, reason:%s"
,
pUser
->
user
,
mnodeGetUserFromMsg
(
pMsg
),
tstrerror
(
code
));
}
else
{
...
...
@@ -259,15 +259,15 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
pUser
->
superAuth
=
1
;
}
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsUserSdb
,
.
pObj
=
pUser
,
.
rowSize
=
sizeof
(
SUserObj
),
.
pMsg
=
pMsg
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsUserSdb
,
.
pObj
=
pUser
,
.
rowSize
=
sizeof
(
SUserObj
),
.
pMsg
=
pMsg
};
code
=
sdbInsertRow
(
&
oper
);
code
=
sdbInsertRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to create by %s, reason:%s"
,
pUser
->
user
,
mnodeGetUserFromMsg
(
pMsg
),
tstrerror
(
code
));
tfree
(
pUser
);
...
...
@@ -279,14 +279,14 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
}
static
int32_t
mnodeDropUser
(
SUserObj
*
pUser
,
void
*
pMsg
)
{
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsUserSdb
,
.
pObj
=
pUser
,
.
pMsg
=
pMsg
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsUserSdb
,
.
pObj
=
pUser
,
.
pMsg
=
pMsg
};
int32_t
code
=
sdbDeleteRow
(
&
oper
);
int32_t
code
=
sdbDeleteRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to drop by %s, reason:%s"
,
pUser
->
user
,
mnodeGetUserFromMsg
(
pMsg
),
tstrerror
(
code
));
}
else
{
...
...
@@ -562,12 +562,12 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
if
(
pUser
==
NULL
)
break
;
if
(
strncmp
(
pUser
->
acct
,
pAcct
->
user
,
acctNameLen
)
==
0
)
{
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
t
able
=
tsUserSdb
,
.
pObj
=
pUser
,
SSdb
Row
row
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pT
able
=
tsUserSdb
,
.
pObj
=
pUser
,
};
sdbDeleteRow
(
&
oper
);
sdbDeleteRow
(
&
row
);
numOfUsers
++
;
}
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
e96516ff
...
...
@@ -72,13 +72,13 @@ static void mnodeDestroyVgroup(SVgObj *pVgroup) {
tfree
(
pVgroup
);
}
static
int32_t
mnodeVgroupActionDestroy
(
SSdb
Oper
*
pOper
)
{
mnodeDestroyVgroup
(
p
Oper
->
pObj
);
static
int32_t
mnodeVgroupActionDestroy
(
SSdb
Row
*
pRow
)
{
mnodeDestroyVgroup
(
p
Row
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeVgroupActionInsert
(
SSdb
Oper
*
pOper
)
{
SVgObj
*
pVgroup
=
p
Oper
->
pObj
;
static
int32_t
mnodeVgroupActionInsert
(
SSdb
Row
*
pRow
)
{
SVgObj
*
pVgroup
=
p
Row
->
pObj
;
// refer to db
SDbObj
*
pDb
=
mnodeGetDb
(
pVgroup
->
dbName
);
...
...
@@ -115,8 +115,8 @@ static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeVgroupActionDelete
(
SSdb
Oper
*
pOper
)
{
SVgObj
*
pVgroup
=
p
Oper
->
pObj
;
static
int32_t
mnodeVgroupActionDelete
(
SSdb
Row
*
pRow
)
{
SVgObj
*
pVgroup
=
p
Row
->
pObj
;
if
(
pVgroup
->
pDb
==
NULL
)
{
mError
(
"vgId:%d, db:%s is not exist while insert into hash"
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
...
...
@@ -137,8 +137,8 @@ static int32_t mnodeVgroupActionDelete(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeVgroupActionUpdate
(
SSdb
Oper
*
pOper
)
{
SVgObj
*
pNew
=
p
Oper
->
pObj
;
static
int32_t
mnodeVgroupActionUpdate
(
SSdb
Row
*
pRow
)
{
SVgObj
*
pNew
=
p
Row
->
pObj
;
SVgObj
*
pVgroup
=
mnodeGetVgroup
(
pNew
->
vgId
);
if
(
pVgroup
!=
pNew
)
{
...
...
@@ -176,25 +176,25 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeVgroupActionEncode
(
SSdb
Oper
*
pOper
)
{
SVgObj
*
pVgroup
=
p
Oper
->
pObj
;
memcpy
(
p
Oper
->
rowData
,
pVgroup
,
tsVgUpdateSize
);
SVgObj
*
pTmpVgroup
=
p
Oper
->
rowData
;
static
int32_t
mnodeVgroupActionEncode
(
SSdb
Row
*
pRow
)
{
SVgObj
*
pVgroup
=
p
Row
->
pObj
;
memcpy
(
p
Row
->
rowData
,
pVgroup
,
tsVgUpdateSize
);
SVgObj
*
pTmpVgroup
=
p
Row
->
rowData
;
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
pTmpVgroup
->
vnodeGid
[
i
].
pDnode
=
NULL
;
pTmpVgroup
->
vnodeGid
[
i
].
role
=
0
;
}
p
Oper
->
rowSize
=
tsVgUpdateSize
;
p
Row
->
rowSize
=
tsVgUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mnodeVgroupActionDecode
(
SSdb
Oper
*
pOper
)
{
static
int32_t
mnodeVgroupActionDecode
(
SSdb
Row
*
pRow
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
calloc
(
1
,
sizeof
(
SVgObj
));
if
(
pVgroup
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
memcpy
(
pVgroup
,
p
Oper
->
rowData
,
tsVgUpdateSize
);
p
Oper
->
pObj
=
pVgroup
;
memcpy
(
pVgroup
,
p
Row
->
rowData
,
tsVgUpdateSize
);
p
Row
->
pObj
=
pVgroup
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -206,23 +206,23 @@ int32_t mnodeInitVgroups() {
SVgObj
tObj
;
tsVgUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_VGROUP
,
.
tableName
=
"vgroups"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_VGROUP
,
.
name
=
"vgroups"
,
.
hashSessions
=
TSDB_DEFAULT_VGROUPS_HASH_SIZE
,
.
maxRowSize
=
tsVgUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_AUTO
,
.
insertFp
=
mnodeVgroupActionInsert
,
.
deleteFp
=
mnodeVgroupActionDelete
,
.
updateFp
=
mnodeVgroupActionUpdate
,
.
encodeFp
=
mnodeVgroupActionEncode
,
.
decodeFp
=
mnodeVgroupActionDecode
,
.
destroyFp
=
mnodeVgroupActionDestroy
,
.
restoredFp
=
mnodeVgroupActionRestored
,
.
fpInsert
=
mnodeVgroupActionInsert
,
.
fpDelete
=
mnodeVgroupActionDelete
,
.
fpUpdate
=
mnodeVgroupActionUpdate
,
.
fpEncode
=
mnodeVgroupActionEncode
,
.
fpDecode
=
mnodeVgroupActionDecode
,
.
fpDestroy
=
mnodeVgroupActionDestroy
,
.
fpRestored
=
mnodeVgroupActionRestored
,
};
tsVgroupSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsVgroupSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsVgroupSdb
==
NULL
)
{
mError
(
"failed to init vgroups data"
);
return
-
1
;
...
...
@@ -253,13 +253,13 @@ SVgObj *mnodeGetVgroup(int32_t vgId) {
}
void
mnodeUpdateVgroup
(
SVgObj
*
pVgroup
)
{
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
int32_t
code
=
sdbUpdateRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"vgId:%d, failed to update vgroup"
,
pVgroup
->
vgId
);
}
...
...
@@ -421,7 +421,7 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
<=
0
)
{
mDebug
(
"
app:%p:%p, db:%s, no enough sid in vgId:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDb
->
name
,
pVgroup
->
vgId
);
mDebug
(
"
msg:%p, app:%p db:%s, no enough sid in vgId:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pDb
->
name
,
pVgroup
->
vgId
);
continue
;
}
...
...
@@ -442,8 +442,8 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
int32_t
code
=
TSDB_CODE_MND_NO_ENOUGH_DNODES
;
if
(
pDb
->
numOfVgroups
<
maxVgroupsPerDb
)
{
mDebug
(
"
app:%p:%p, db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDb
->
name
,
pDb
->
numOfVgroups
,
maxVgroupsPerDb
);
mDebug
(
"
msg:%p, app:%p db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pDb
->
name
,
pDb
->
numOfVgroups
,
maxVgroupsPerDb
);
pthread_mutex_unlock
(
&
pDb
->
mutex
);
code
=
mnodeCreateVgroup
(
pMsg
);
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
...
...
@@ -455,8 +455,8 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
if
(
pDb
->
numOfVgroups
<
1
)
{
pthread_mutex_unlock
(
&
pDb
->
mutex
);
mDebug
(
"
app:%p:%p, db:%s, failed create new vgroup since:%s, numOfVgroups:%d maxVgroupsPerDb:%d "
,
pMsg
->
rpcMsg
.
ahandle
,
p
Msg
,
p
Db
->
name
,
tstrerror
(
code
),
pDb
->
numOfVgroups
,
maxVgroupsPerDb
);
mDebug
(
"
msg:%p, app:%p db:%s, failed create new vgroup since:%s, numOfVgroups:%d maxVgroupsPerDb:%d "
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pDb
->
name
,
tstrerror
(
code
),
pDb
->
numOfVgroups
,
maxVgroupsPerDb
);
return
code
;
}
...
...
@@ -474,7 +474,7 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
<=
0
)
{
mError
(
"
app:%p:%p, db:%s, no enough sid in vgId:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDb
->
name
,
pVgroup
->
vgId
);
mError
(
"
msg:%p, app:%p db:%s, no enough sid in vgId:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pDb
->
name
,
pVgroup
->
vgId
);
pthread_mutex_unlock
(
&
pDb
->
mutex
);
return
TSDB_CODE_MND_NO_ENOUGH_DNODES
;
}
...
...
@@ -496,10 +496,10 @@ static int32_t mnodeCreateVgroupFp(SMnodeMsg *pMsg) {
SDbObj
*
pDb
=
pMsg
->
pDb
;
assert
(
pVgroup
);
mInfo
(
"
app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pVgroup
->
vgId
,
mInfo
(
"
msg:%p, app:%p vgId:%d, is created in mnode, db:%s replica:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
mInfo
(
"
app:%p:%p, vgId:%d, index:%d, dnode:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pVgroup
->
vgId
,
i
,
mInfo
(
"
msg:%p, app:%p vgId:%d, index:%d, dnode:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pVgroup
->
vgId
,
i
,
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
}
...
...
@@ -517,30 +517,30 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
assert
(
pVgroup
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"
app:%p:%p, vgId:%d, failed to create in sdb, reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pVgroup
->
vgId
,
mError
(
"
msg:%p, app:%p vgId:%d, failed to create in sdb, reason:%s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pVgroup
->
vgId
,
tstrerror
(
code
));
SSdb
Oper
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pVgroup
,
.
t
able
=
tsVgroupSdb
};
SSdb
Row
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pVgroup
,
.
pT
able
=
tsVgroupSdb
};
sdbDeleteRow
(
&
desc
);
return
code
;
}
else
{
mInfo
(
"
app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
mInfo
(
"
msg:%p, app:%p vgId:%d, is created in sdb, db:%s replica:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
pVgroup
->
status
=
TAOS_VG_STATUS_READY
;
SSdb
Oper
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pVgroup
,
.
t
able
=
tsVgroupSdb
};
SSdb
Row
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pVgroup
,
.
pT
able
=
tsVgroupSdb
};
(
void
)
sdbUpdateRow
(
&
desc
);
dnodeReprocessMWriteMsg
(
pMsg
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
// if (pVgroup->status == TAOS_VG_STATUS_CREATING || pVgroup->status == TAOS_VG_STATUS_READY) {
// mInfo("
app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg
, pVgroup->vgId,
// mInfo("
msg:%p, app:%p vgId:%d, is created in sdb, db:%s replica:%d", pMsg, pMsg->rpcMsg.ahandle
, pVgroup->vgId,
// pDb->name, pVgroup->numOfVnodes);
// pVgroup->status = TAOS_VG_STATUS_READY;
// SSdb
Oper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .t
able = tsVgroupSdb};
// SSdb
Row desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pT
able = tsVgroupSdb};
// (void)sdbUpdateRow(&desc);
// dnodeReprocessMWriteMsg(pMsg);
// return TSDB_CODE_MND_ACTION_IN_PROGRESS;
// } else {
// mError("
app:%p:%p,
vgId:%d, is created in sdb, db:%s replica:%d, but vgroup is dropping", pMsg->rpcMsg.ahandle,
// mError("
msg:%p, app:%p
vgId:%d, is created in sdb, db:%s replica:%d, but vgroup is dropping", pMsg->rpcMsg.ahandle,
// pMsg, pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
// return TSDB_CODE_MND_VGROUP_NOT_EXIST;
// }
...
...
@@ -571,16 +571,16 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
pMsg
->
pVgroup
=
pVgroup
;
mnodeIncVgroupRef
(
pVgroup
);
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
.
rowSize
=
sizeof
(
SVgObj
),
.
pMsg
=
pMsg
,
.
reqFp
=
mnodeCreateVgroupFp
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
.
rowSize
=
sizeof
(
SVgObj
),
.
pMsg
=
pMsg
,
.
fpReq
=
mnodeCreateVgroupFp
};
code
=
sdbInsertRow
(
&
oper
);
code
=
sdbInsertRow
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
pMsg
->
pVgroup
=
NULL
;
mnodeDestroyVgroup
(
pVgroup
);
...
...
@@ -595,12 +595,12 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
}
else
{
mDebug
(
"vgId:%d, replica:%d is deleting from sdb"
,
pVgroup
->
vgId
,
pVgroup
->
numOfVnodes
);
mnodeSendDropVgroupMsg
(
pVgroup
,
NULL
);
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
};
sdbDeleteRow
(
&
oper
);
sdbDeleteRow
(
&
row
);
}
}
...
...
@@ -770,7 +770,7 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
SDnodeObj
*
pDnode
=
pVgroup
->
vnodeGid
[
i
].
pDnode
;
const
char
*
role
=
"NULL"
;
if
(
pDnode
!=
NULL
)
{
role
=
mnodeGetMnodeRoleStr
(
pVgroup
->
vnodeGid
[
i
].
role
)
;
role
=
syncRole
[
pVgroup
->
vnodeGid
[
i
].
role
]
;
}
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
@@ -957,28 +957,28 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if
(
mnodeMsg
->
received
!=
mnodeMsg
->
expected
)
return
;
if
(
mnodeMsg
->
received
==
mnodeMsg
->
successed
)
{
SSdb
Oper
oper
=
{
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsVgroupSdb
,
.
pTable
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
.
rowSize
=
sizeof
(
SVgObj
),
.
pMsg
=
mnodeMsg
,
.
writeCb
=
mnodeCreateVgroupCb
.
fpRsp
=
mnodeCreateVgroupCb
};
int32_t
code
=
sdbInsertRow
Imp
(
&
oper
);
int32_t
code
=
sdbInsertRow
ToQueue
(
&
row
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mnodeMsg
->
pVgroup
=
NULL
;
mnodeDestroyVgroup
(
pVgroup
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
code
);
}
}
else
{
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
};
sdbDeleteRow
(
&
oper
);
sdbDeleteRow
(
&
row
);
dnodeSendRpcMWriteRsp
(
mnodeMsg
,
mnodeMsg
->
code
);
}
}
...
...
@@ -1031,12 +1031,12 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
if
(
mnodeMsg
->
received
!=
mnodeMsg
->
expected
)
return
;
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
t
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
SSdb
Row
row
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pT
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
};
int32_t
code
=
sdbDeleteRow
(
&
oper
);
int32_t
code
=
sdbDeleteRow
(
&
row
);
if
(
code
!=
0
)
{
code
=
TSDB_CODE_MND_SDB_ERROR
;
}
...
...
@@ -1084,12 +1084,12 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
if
(
pVgroup
->
vnodeGid
[
0
].
dnodeId
==
pDropDnode
->
dnodeId
)
{
mnodeDropAllChildTablesInVgroups
(
pVgroup
);
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
t
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
SSdb
Row
row
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pT
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
};
sdbDeleteRow
(
&
oper
);
sdbDeleteRow
(
&
row
);
numOfVgroups
++
;
}
mnodeDecVgroupRef
(
pVgroup
);
...
...
@@ -1135,12 +1135,12 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
if
(
pVgroup
==
NULL
)
break
;
if
(
pVgroup
->
pDb
==
pDropDb
)
{
SSdb
Oper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
t
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
SSdb
Row
row
=
{
.
type
=
SDB_OPER_LOCAL
,
.
pT
able
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
};
sdbDeleteRow
(
&
oper
);
sdbDeleteRow
(
&
row
);
numOfVgroups
++
;
}
...
...
src/mnode/src/mnodeWrite.c
浏览文件 @
e96516ff
...
...
@@ -43,7 +43,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg))
int32_t
mnodeProcessWrite
(
SMnodeMsg
*
pMsg
)
{
if
(
pMsg
->
rpcMsg
.
pCont
==
NULL
)
{
mError
(
"
app:%p:%p, msg:%s content is null"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
mError
(
"
msg:%p, app:%p type:%s content is null"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
return
TSDB_CODE_MND_INVALID_MSG_LEN
;
}
...
...
@@ -54,15 +54,15 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp
->
rsp
=
epSet
;
rpcRsp
->
len
=
sizeof
(
SRpcEpSet
);
mDebug
(
"
app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
mDebug
(
"
msg:%p, app:%p type:%s in write queue, will be redirected, numOfEps:%d inUse:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
epSet
->
numOfEps
,
epSet
->
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
if
(
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
)
{
epSet
->
inUse
=
(
i
+
1
)
%
epSet
->
numOfEps
;
mDebug
(
"
app:%p:%p, mnode index:%d ep:%s:%d, set inUse to %d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
i
,
epSet
->
fqdn
[
i
],
mDebug
(
"
msg:%p, app:%p mnode index:%d ep:%s:%d, set inUse to %d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
}
else
{
mDebug
(
"
app:%p:%p, mnode index:%d ep:%s:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
i
,
epSet
->
fqdn
[
i
],
mDebug
(
"
msg:%p, app:%p mnode index:%d ep:%s:%d"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]));
}
}
...
...
@@ -71,19 +71,19 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
}
if
(
tsMnodeProcessWriteMsgFp
[
pMsg
->
rpcMsg
.
msgType
]
==
NULL
)
{
mError
(
"
app:%p:%p, msg:%s not processed"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
mError
(
"
msg:%p, app:%p type:%s not processed"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
return
TSDB_CODE_MND_MSG_NOT_PROCESSED
;
}
int32_t
code
=
mnodeInitMsg
(
pMsg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"
app:%p:%p, msg:%s not processed, reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
mError
(
"
msg:%p, app:%p type:%s not processed, reason:%s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
tstrerror
(
code
));
return
code
;
}
if
(
!
pMsg
->
pUser
->
writeAuth
)
{
mError
(
"
app:%p:%p, msg:%s not processed, no write auth"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
mError
(
"
msg:%p, app:%p type:%s not processed, no write auth"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
]);
return
TSDB_CODE_MND_NO_RIGHTS
;
}
...
...
src/plugins/http/inc/httpInt.h
浏览文件 @
e96516ff
...
...
@@ -118,7 +118,7 @@ typedef struct {
typedef
struct
{
char
*
module
;
bool
(
*
decodeFp
)(
struct
HttpContext
*
pContext
);
bool
(
*
fpDecode
)(
struct
HttpContext
*
pContext
);
}
HttpDecodeMethod
;
typedef
struct
{
...
...
src/plugins/http/src/httpHandle.c
浏览文件 @
e96516ff
...
...
@@ -21,11 +21,11 @@
#include "httpHandle.h"
bool
httpDecodeRequest
(
HttpContext
*
pContext
)
{
if
(
pContext
->
decodeMethod
->
decodeFp
==
NULL
)
{
if
(
pContext
->
decodeMethod
->
fpDecode
==
NULL
)
{
return
false
;
}
return
(
*
pContext
->
decodeMethod
->
decodeFp
)(
pContext
);
return
(
*
pContext
->
decodeMethod
->
fpDecode
)(
pContext
);
}
/**
...
...
src/query/inc/qExecutor.h
浏览文件 @
e96516ff
...
...
@@ -171,11 +171,10 @@ typedef struct SQuery {
typedef
struct
SQueryRuntimeEnv
{
jmp_buf
env
;
SResultRow
*
pResultRow
;
// todo refactor to merge with SWindowResInfo
SQuery
*
pQuery
;
SQLFunctionCtx
*
pCtx
;
int32_t
numOfRowsPerPage
;
uint16_t
offset
[
TSDB_MAX_COLUMNS
]
;
uint16_t
*
offset
;
uint16_t
scanFlag
;
// denotes reversed scan of data or not
SFillInfo
*
pFillInfo
;
SWindowResInfo
windowResInfo
;
...
...
src/query/inc/qUtil.h
浏览文件 @
e96516ff
...
...
@@ -34,7 +34,7 @@ int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, int32_t size, int32_t
void
cleanupTimeWindowInfo
(
SWindowResInfo
*
pWindowResInfo
);
void
resetTimeWindowInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResInfo
*
pWindowResInfo
);
void
clearFirstN
TimeWindow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
num
);
void
clearFirstN
WindowRes
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
num
);
void
clearClosedTimeWindow
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
int32_t
numOfClosedTimeWindow
(
SWindowResInfo
*
pWindowResInfo
);
...
...
src/query/src/qExecutor.c
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
src/query/src/qResultbuf.c
浏览文件 @
e96516ff
...
...
@@ -165,7 +165,7 @@ static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
static
char
*
flushPageToDisk
(
SDiskbasedResultBuf
*
pResultBuf
,
SPageInfo
*
pg
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
assert
((
int64_t
)
pResultBuf
->
numOfPages
*
pResultBuf
->
pageSize
==
pResultBuf
->
totalBufSize
&&
pResultBuf
->
numOfPages
>=
pResultBuf
->
inMemPages
);
assert
((
(
int64_t
)
pResultBuf
->
numOfPages
*
pResultBuf
->
pageSize
)
==
pResultBuf
->
totalBufSize
&&
pResultBuf
->
numOfPages
>=
pResultBuf
->
inMemPages
);
if
(
pResultBuf
->
file
==
NULL
)
{
if
((
ret
=
createDiskFile
(
pResultBuf
))
!=
TSDB_CODE_SUCCESS
)
{
...
...
src/query/src/qUtil.c
浏览文件 @
e96516ff
...
...
@@ -53,7 +53,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) {
return
;
}
if
(
pWindowResInfo
->
capacity
==
0
)
{
assert
(
/*pWindowResInfo->hashList == NULL && */
pWindowResInfo
->
pResult
==
NULL
);
assert
(
pWindowResInfo
->
pResult
==
NULL
);
return
;
}
...
...
@@ -64,10 +64,18 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
if
(
pWindowResInfo
==
NULL
||
pWindowResInfo
->
capacity
==
0
)
{
return
;
}
// assert(pWindowResInfo->size == 1);
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SResultRow
*
pWindowRes
=
pWindowResInfo
->
pResult
[
i
];
clearResultRow
(
pRuntimeEnv
,
pWindowRes
);
int32_t
groupIndex
=
0
;
int64_t
uid
=
0
;
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
&
groupIndex
,
sizeof
(
groupIndex
),
uid
);
taosHashRemove
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
groupIndex
)));
}
pWindowResInfo
->
curIndex
=
-
1
;
...
...
@@ -77,7 +85,7 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
pWindowResInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
}
void
clearFirstN
TimeWindow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
num
)
{
void
clearFirstN
WindowRes
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
num
)
{
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
pWindowResInfo
==
NULL
||
pWindowResInfo
->
capacity
==
0
||
pWindowResInfo
->
size
==
0
||
num
==
0
)
{
return
;
...
...
@@ -88,6 +96,11 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
int16_t
type
=
pWindowResInfo
->
type
;
STableId
*
id
=
TSDB_TABLEID
(
pRuntimeEnv
->
pQuery
->
current
->
pTable
);
// uid is always set to be 0.
int64_t
uid
=
id
->
uid
;
if
(
pRuntimeEnv
->
groupbyNormalCol
)
{
uid
=
0
;
}
char
*
key
=
NULL
;
int16_t
bytes
=
-
1
;
...
...
@@ -97,14 +110,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
// todo refactor
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
key
=
varDataVal
(
pResult
->
key
);
key
=
varDataVal
(
pResult
->
key
);
bytes
=
varDataLen
(
pResult
->
key
);
}
else
{
key
=
(
char
*
)
&
pResult
->
win
.
skey
;
bytes
=
tDataTypeDesc
[
pWindowResInfo
->
type
].
nSize
;
}
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
id
->
uid
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
taosHashRemove
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
}
else
{
break
;
...
...
@@ -137,14 +150,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
bytes
=
tDataTypeDesc
[
pWindowResInfo
->
type
].
nSize
;
}
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
id
->
uid
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
assert
(
p
!=
NULL
);
int32_t
v
=
(
*
p
-
num
);
assert
(
v
>=
0
&&
v
<=
pWindowResInfo
->
size
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
id
->
uid
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
taosHashPut
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
...
...
@@ -158,7 +171,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
}
int32_t
numOfClosed
=
numOfClosedTimeWindow
(
pWindowResInfo
);
clearFirstN
TimeWindow
(
pRuntimeEnv
,
numOfClosed
);
clearFirstN
WindowRes
(
pRuntimeEnv
,
numOfClosed
);
}
int32_t
numOfClosedTimeWindow
(
SWindowResInfo
*
pWindowResInfo
)
{
...
...
src/sync/inc/syncInt.h
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
src/sync/src/syncMain.c
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
src/sync/src/syncRestore.c
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
src/sync/src/syncRetrieve.c
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
src/sync/src/taosTcpPool.c
浏览文件 @
e96516ff
...
...
@@ -150,7 +150,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
}
void
taosFreeTcpConn
(
void
*
param
)
{
SConnObj
*
pConn
=
(
SConnObj
*
)
param
;
SConnObj
*
pConn
=
param
;
SThreadObj
*
pThread
=
pConn
->
pThread
;
sDebug
(
"%p TCP connection will be closed, fd:%d"
,
pThread
,
pConn
->
fd
);
...
...
src/sync/src/tarbitrator.c
浏览文件 @
e96516ff
...
...
@@ -115,14 +115,14 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
SFirstPkt
firstPkt
;
if
(
taosReadMsg
(
connFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
!=
sizeof
(
firstPkt
))
{
sError
(
"failed to read peer first pkt from ip:%s
(%s)
"
,
ipstr
,
strerror
(
errno
));
sError
(
"failed to read peer first pkt from ip:%s
since %s
"
,
ipstr
,
strerror
(
errno
));
taosCloseSocket
(
connFd
);
return
;
}
SNodeConn
*
pNode
=
(
SNodeConn
*
)
calloc
(
sizeof
(
SNodeConn
),
1
);
SNodeConn
*
pNode
=
calloc
(
sizeof
(
SNodeConn
),
1
);
if
(
pNode
==
NULL
)
{
sError
(
"failed to allocate memory
(%s)
"
,
strerror
(
errno
));
sError
(
"failed to allocate memory
since %s
"
,
strerror
(
errno
));
taosCloseSocket
(
connFd
);
return
;
}
...
...
@@ -146,7 +146,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
static
void
arbProcessBrokenLink
(
void
*
param
)
{
SNodeConn
*
pNode
=
param
;
sDebug
(
"%s, TCP link is broken
(%s)
, close connection"
,
pNode
->
id
,
strerror
(
errno
));
sDebug
(
"%s, TCP link is broken
since %s
, close connection"
,
pNode
->
id
,
strerror
(
errno
));
tfree
(
pNode
);
}
...
...
src/wal/src/walWrite.c
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/pytest/query/queryInterval.py
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/pytest/tools/taosdemo.py
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/script/general/parser/groupby.sim
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/script/general/parser/interp.sim
浏览文件 @
e96516ff
...
...
@@ -47,8 +47,7 @@ while $i < $halfNum
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar )
sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar ) $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
$x = $x + 1
endw
...
...
tests/script/general/parser/join_multivnode.sim
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/script/general/parser/projection_limit_offset.sim
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/script/general/parser/tbnameIn.sim
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/script/general/parser/testSuite.sim
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/script/general/parser/union.sim
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
tests/script/general/parser/where.sim
浏览文件 @
e96516ff
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录