Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
m0_53320357
TDengine
提交
7f43265b
TDengine
项目概览
m0_53320357
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
7f43265b
编写于
4月 04, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into refactor/cluster
上级
d2effbae
886a301b
变更
38
展开全部
隐藏空白更改
内联
并排
Showing
38 changed file
with
2225 addition
and
684 deletion
+2225
-684
.gitignore
.gitignore
+1
-0
cmake/define.inc
cmake/define.inc
+10
-3
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+0
-1
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+2
-3
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+31
-30
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+0
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+22
-29
src/client/src/tscServer.c
src/client/src/tscServer.c
+10
-13
src/client/src/tscSql.c
src/client/src/tscSql.c
+54
-62
src/client/src/tscSub.c
src/client/src/tscSub.c
+0
-1
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+7
-13
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+3
-12
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+9
-9
src/dnode/src/dnodeRead.c
src/dnode/src/dnodeRead.c
+1
-0
src/inc/taoserror.h
src/inc/taoserror.h
+2
-2
src/mnode/inc/mgmtMnode.h
src/mnode/inc/mgmtMnode.h
+0
-3
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+2
-1
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+453
-2
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+61
-101
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+20
-1
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+9
-0
src/os/linux/src/tlinux.c
src/os/linux/src/tlinux.c
+1
-1
src/query/inc/qast.h
src/query/inc/qast.h
+26
-34
src/query/inc/qsqlparser.h
src/query/inc/qsqlparser.h
+19
-35
src/query/src/qast.c
src/query/src/qast.c
+252
-166
src/query/src/qparserImpl.c
src/query/src/qparserImpl.c
+173
-2
src/query/src/qtokenizer.c
src/query/src/qtokenizer.c
+2
-2
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+6
-7
src/query/src/queryUtil.c
src/query/src/queryUtil.c
+1
-1
src/query/tests/astTest.cpp
src/query/tests/astTest.cpp
+630
-0
src/util/inc/tbuffer.h
src/util/inc/tbuffer.h
+35
-34
src/util/inc/tstatus.h
src/util/inc/tstatus.h
+15
-0
src/util/src/.tqueue.c.swp
src/util/src/.tqueue.c.swp
+0
-0
src/util/src/tbuffer.c
src/util/src/tbuffer.c
+9
-8
src/util/src/tstatus.c
src/util/src/tstatus.c
+19
-0
src/vnode/tsdb/src/tsdbFile.c
src/vnode/tsdb/src/tsdbFile.c
+5
-0
src/vnode/tsdb/src/tsdbRead.c
src/vnode/tsdb/src/tsdbRead.c
+319
-107
tests/script/general/show/dnodes.sim
tests/script/general/show/dnodes.sim
+16
-0
未找到文件。
.gitignore
浏览文件 @
7f43265b
...
...
@@ -12,6 +12,7 @@ rpms/
mac/
*.pyc
*.tmp
*.swp
src/connector/nodejs/node_modules/
src/connector/nodejs/out/
tests/test/
...
...
cmake/define.inc
浏览文件 @
7f43265b
...
...
@@ -3,10 +3,17 @@ PROJECT(TDengine)
IF
(
TD_CLUSTER
)
ADD_DEFINITIONS
(
-
D_CLUSTER
)
ADD_DEFINITIONS
(
-
DTSDB_REPLICA_MAX_NUM
=
1
)
ENDIF
()
IF
(
TD_MPEER
)
ADD_DEFINITIONS
(
-
D_MPEER
)
ENDIF
()
IF
(
TD_VPEER
)
ADD_DEFINITIONS
(
-
D_VPEER
)
ADD_DEFINITIONS
(
-
DTSDB_REPLICA_MAX_NUM
=
3
)
ELSE
()
ADD_DEFINITIONS
(
-
DLITE
)
ADD_DEFINITIONS
(
-
DTSDB_REPLICA_MAX_NUM
=
1
)
ADD_DEFINITIONS
(
-
DTSDB_REPLICA_MAX_NUM
=
1
)
ENDIF
()
IF
(
TD_ACCOUNT
)
...
...
src/client/inc/tscUtil.h
浏览文件 @
7f43265b
...
...
@@ -132,7 +132,6 @@ void tscFieldInfoSetExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlExpr* pExpr)
void
tscFieldInfoSetBinExpr
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
SSqlFunctionExpr
*
pExpr
);
void
tscFieldInfoCalOffset
(
SQueryInfo
*
pQueryInfo
);
void
tscFieldInfoUpdateOffsetForInterResult
(
SQueryInfo
*
pQueryInfo
);
void
tscFieldInfoCopy
(
SFieldInfo
*
src
,
SFieldInfo
*
dst
,
const
int32_t
*
indexList
,
int32_t
size
);
void
tscFieldInfoCopyAll
(
SFieldInfo
*
dst
,
SFieldInfo
*
src
);
...
...
src/client/inc/tsclient.h
浏览文件 @
7f43265b
...
...
@@ -357,7 +357,6 @@ typedef struct SSqlObj {
char
freed
:
4
;
char
listed
:
4
;
tsem_t
rspSem
;
tsem_t
emptyRspSem
;
SSqlCmd
cmd
;
SSqlRes
res
;
uint8_t
numOfSubs
;
...
...
@@ -409,7 +408,7 @@ int tscProcessSql(SSqlObj *pSql);
int
tscRenewMeterMeta
(
SSqlObj
*
pSql
,
char
*
tableId
);
void
tscQueueAsyncRes
(
SSqlObj
*
pSql
);
void
tscQueueAsyncError
(
void
(
*
fp
),
void
*
param
);
void
tscQueueAsyncError
(
void
(
*
fp
),
void
*
param
,
int32_t
code
);
int
tscProcessLocalCmd
(
SSqlObj
*
pSql
);
int
tscCfgDynamicOptions
(
char
*
msg
);
...
...
@@ -450,7 +449,7 @@ void tscFreeSqlObj(SSqlObj *pObj);
void
tscCloseTscObj
(
STscObj
*
pObj
);
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
const
char
*
sqlstr
,
int32
_t
sqlLen
);
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
const
char
*
sqlstr
,
size
_t
sqlLen
);
void
tscProcessMultiVnodesInsert
(
SSqlObj
*
pSql
);
void
tscProcessMultiVnodesInsertFromFile
(
SSqlObj
*
pSql
);
...
...
src/client/src/tscAsync.c
浏览文件 @
7f43265b
...
...
@@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static
void
tscAsyncFetchRowsProxy
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
static
void
tscAsyncFetchSingleRowProxy
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
const
char
*
sqlstr
,
int32
_t
sqlLen
)
{
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
const
char
*
sqlstr
,
size
_t
sqlLen
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
...
...
@@ -51,17 +51,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
))
{
tscError
(
"failed to malloc payload"
);
tfree
(
pSql
);
tscQueueAsyncError
(
fp
,
param
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
return
;
}
pSql
->
sqlstr
=
malloc
(
sqlLen
+
1
);
pSql
->
sqlstr
=
realloc
(
pSql
->
sqlstr
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"%p failed to malloc sql string buffer"
,
pSql
);
tscQueueAsyncError
(
fp
,
param
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
free
(
pCmd
->
payload
);
free
(
pSql
);
return
;
}
...
...
@@ -75,7 +73,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
(
uint8_t
)
code
;
pSql
->
res
.
code
=
code
;
tscQueueAsyncRes
(
pSql
);
return
;
}
...
...
@@ -88,15 +86,16 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
tscError
(
"bug!!! pObj:%p"
,
pObj
);
globalCode
=
TSDB_CODE_DISCONNECTED
;
tscQueueAsyncError
(
fp
,
param
);
terrno
=
TSDB_CODE_DISCONNECTED
;
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_DISCONNECTED
);
return
;
}
int32_t
sqlLen
=
strlen
(
sqlstr
);
if
(
sqlLen
>
tsMaxSQLStringLen
)
{
tscError
(
"sql string too long"
);
tscQueueAsyncError
(
fp
,
param
);
terrno
=
TSDB_CODE_INVALID_SQL
;
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_INVALID_SQL
);
return
;
}
...
...
@@ -105,7 +104,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
SSqlObj
*
pSql
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pSql
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
tscQueueAsyncError
(
fp
,
param
);
terrno
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
return
;
}
...
...
@@ -170,7 +170,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pRes
->
code
=
numOfRows
;
}
tscQueueAsyncError
(
pSql
->
fetchFp
,
param
);
tscQueueAsyncError
(
pSql
->
fetchFp
,
param
,
pRes
->
code
);
return
;
}
...
...
@@ -200,8 +200,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
SSqlObj
*
pSql
=
(
SSqlObj
*
)
taosa
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscError
(
"sql object is NULL"
);
globalCode
=
TSDB_CODE_DISCONNECTED
;
tscQueueAsyncError
(
fp
,
param
);
//
globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_DISCONNECTED
);
return
;
}
...
...
@@ -210,7 +210,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if
(
pRes
->
qhandle
==
0
)
{
tscError
(
"qhandle is NULL"
);
tscQueueAsyncError
(
fp
,
param
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_INVALID_QHANDLE
);
return
;
}
...
...
@@ -232,8 +232,8 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
SSqlObj
*
pSql
=
(
SSqlObj
*
)
taosa
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscError
(
"sql object is NULL"
);
globalCode
=
TSDB_CODE_DISCONNECTED
;
tscQueueAsyncError
(
fp
,
param
);
//
globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_DISCONNECTED
);
return
;
}
...
...
@@ -242,7 +242,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if
(
pRes
->
qhandle
==
0
)
{
tscError
(
"qhandle is NULL"
);
tscQueueAsyncError
(
fp
,
param
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_INVALID_QHANDLE
);
return
;
}
...
...
@@ -331,7 +331,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
// pCmd may be released, so cache pCmd->command
int
cmd
=
pCmd
->
command
;
int
code
=
pRes
->
code
?
-
pRes
->
code
:
pRes
->
numOfRows
;
int
code
=
pRes
->
code
;
//
? -pRes->code : pRes->numOfRows;
// in case of async insert, restore the user specified callback function
bool
shouldFree
=
tscShouldFreeAsyncSqlObj
(
pSql
);
...
...
@@ -349,18 +349,20 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
}
}
void
tscProcessAsyncError
(
SSchedMsg
*
pMsg
)
{
static
void
tscProcessAsyncError
(
SSchedMsg
*
pMsg
)
{
void
(
*
fp
)()
=
pMsg
->
ahandle
;
(
*
fp
)(
pMsg
->
thandle
,
NULL
,
-
1
);
(
*
fp
)(
pMsg
->
thandle
,
NULL
,
*
(
int32_t
*
)
pMsg
->
msg
);
}
void
tscQueueAsyncError
(
void
(
*
fp
),
void
*
param
)
{
void
tscQueueAsyncError
(
void
(
*
fp
),
void
*
param
,
int32_t
code
)
{
int32_t
*
c
=
malloc
(
sizeof
(
int32_t
));
*
c
=
code
;
SSchedMsg
schedMsg
;
schedMsg
.
fp
=
tscProcessAsyncError
;
schedMsg
.
ahandle
=
fp
;
schedMsg
.
thandle
=
param
;
schedMsg
.
msg
=
NULL
;
schedMsg
.
msg
=
c
;
taosScheduleTask
(
tscQhandle
,
&
schedMsg
);
}
...
...
@@ -369,7 +371,7 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
tscTrace
(
"%p SqlObj is freed, not add into queue async res"
,
pSql
);
return
;
}
else
{
tscError
(
"%p add into queued async res, code:%
d"
,
pSql
,
pSql
->
res
.
code
);
tscError
(
"%p add into queued async res, code:%
s"
,
pSql
,
tstrerror
(
pSql
->
res
.
code
)
);
}
SSchedMsg
schedMsg
;
...
...
@@ -410,10 +412,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
pSql
->
fp
=
NULL
;
if
(
code
!=
0
)
{
code
=
abs
(
code
);
pRes
->
code
=
code
;
tscTrace
(
"%p failed to renew tableMeta"
,
pSql
);
tsem_post
(
&
pSql
->
rspSem
);
//
tsem_post(&pSql->rspSem);
}
else
{
tscTrace
(
"%p renew tableMeta successfully, command:%d, code:%d, retry:%d"
,
pSql
,
pSql
->
cmd
.
command
,
pSql
->
res
.
code
,
pSql
->
retry
);
...
...
@@ -425,15 +426,15 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
code
=
tscSendMsgToServer
(
pSql
);
if
(
code
!=
0
)
{
pRes
->
code
=
code
;
tsem_post
(
&
pSql
->
rspSem
);
//
tsem_post(&pSql->rspSem);
}
}
return
;
}
if
(
code
!=
0
)
{
pRes
->
code
=
(
uint8_t
)
abs
(
code
)
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRes
->
code
=
code
;
tscQueueAsyncRes
(
pSql
);
return
;
}
...
...
src/client/src/tscPrepare.c
浏览文件 @
7f43265b
...
...
@@ -488,7 +488,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
}
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
tsem_init
(
&
pSql
->
emptyRspSem
,
0
,
1
);
pSql
->
signature
=
pSql
;
pSql
->
pTscObj
=
pObj
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
7f43265b
...
...
@@ -117,7 +117,7 @@ static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
static
int32_t
doCheckForStream
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
static
int32_t
doCheckForQuery
(
SSqlObj
*
pSql
,
SQuerySQL
*
pQuerySql
,
int32_t
index
);
static
int32_t
tSQLBinaryExprCreateFromSqlExpr
(
tSQLSyntax
Node
**
pExpr
,
tSQLExpr
*
pAst
,
int32_t
*
num
,
static
int32_t
convertSyntaxTreeToExprTree
(
tExpr
Node
**
pExpr
,
tSQLExpr
*
pAst
,
int32_t
*
num
,
SColIndexEx
**
pColIndex
,
SSqlExprInfo
*
pExprInfo
);
/*
...
...
@@ -215,7 +215,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if
(
pQueryInfo
->
numOfTables
==
0
)
{
pTableMetaInfo
=
tscAddEmptyMetaInfo
(
pQueryInfo
);
}
else
{
pTableMetaInfo
=
&
pQueryInfo
->
pTableMetaInfo
[
0
];
pTableMetaInfo
=
pQueryInfo
->
pTableMetaInfo
[
0
];
}
pCmd
->
command
=
pInfo
->
type
;
...
...
@@ -1208,16 +1208,16 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
SSqlBinaryExprInfo
*
pBinExprInfo
=
&
pFuncExpr
->
binExprInfo
;
t
SQLSyntax
Node
*
pNode
=
NULL
;
t
Expr
Node
*
pNode
=
NULL
;
SColIndexEx
*
pColIndex
=
NULL
;
int32_t
ret
=
tSQLBinaryExprCreateFromSqlExpr
(
&
pNode
,
pItem
->
pNode
,
&
pBinExprInfo
->
numOfCols
,
&
pColIndex
,
&
pQueryInfo
->
exprsInfo
);
int32_t
ret
=
convertSyntaxTreeToExprTree
(
&
pNode
,
pItem
->
pNode
,
&
pBinExprInfo
->
numOfCols
,
&
pColIndex
,
&
pQueryInfo
->
exprsInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
t
SQLBinaryExprDestroy
(
&
pNode
->
pExpr
,
NULL
);
t
ExprTreeDestroy
(
&
pNode
,
NULL
);
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
"invalid expression in select clause"
);
}
pBinExprInfo
->
pBinExpr
=
pNode
->
pExpr
;
pBinExprInfo
->
pBinExpr
=
pNode
;
pBinExprInfo
->
pReqColumns
=
pColIndex
;
for
(
int32_t
k
=
0
;
k
<
pBinExprInfo
->
numOfCols
;
++
k
)
{
...
...
@@ -5807,20 +5807,20 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return
TSDB_CODE_SUCCESS
;
// Does not build query message here
}
static
int32_t
tSQLBinaryExprCreateFromSqlExpr
(
tSQLSyntax
Node
**
pExpr
,
tSQLExpr
*
pAst
,
int32_t
*
num
,
static
int32_t
convertSyntaxTreeToExprTree
(
tExpr
Node
**
pExpr
,
tSQLExpr
*
pAst
,
int32_t
*
num
,
SColIndexEx
**
pColIndex
,
SSqlExprInfo
*
pExprInfo
)
{
t
SQLSyntax
Node
*
pLeft
=
NULL
;
t
SQLSyntax
Node
*
pRight
=
NULL
;
t
Expr
Node
*
pLeft
=
NULL
;
t
Expr
Node
*
pRight
=
NULL
;
if
(
pAst
->
pLeft
!=
NULL
)
{
int32_t
ret
=
tSQLBinaryExprCreateFromSqlExpr
(
&
pLeft
,
pAst
->
pLeft
,
num
,
pColIndex
,
pExprInfo
);
int32_t
ret
=
convertSyntaxTreeToExprTree
(
&
pLeft
,
pAst
->
pLeft
,
num
,
pColIndex
,
pExprInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
}
if
(
pAst
->
pRight
!=
NULL
)
{
int32_t
ret
=
tSQLBinaryExprCreateFromSqlExpr
(
&
pRight
,
pAst
->
pRight
,
num
,
pColIndex
,
pExprInfo
);
int32_t
ret
=
convertSyntaxTreeToExprTree
(
&
pRight
,
pAst
->
pRight
,
num
,
pColIndex
,
pExprInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
...
...
@@ -5828,14 +5828,14 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr*
if
(
pAst
->
pLeft
==
NULL
)
{
if
(
pAst
->
nSQLOptr
>=
TK_TINYINT
&&
pAst
->
nSQLOptr
<=
TK_DOUBLE
)
{
*
pExpr
=
calloc
(
1
,
sizeof
(
t
SQLSyntax
Node
)
+
sizeof
(
tVariant
));
*
pExpr
=
calloc
(
1
,
sizeof
(
t
Expr
Node
)
+
sizeof
(
tVariant
));
(
*
pExpr
)
->
nodeType
=
TSQL_NODE_VALUE
;
(
*
pExpr
)
->
pVal
=
(
tVariant
*
)
((
char
*
)(
*
pExpr
)
+
sizeof
(
t
SQLSyntax
Node
));
(
*
pExpr
)
->
pVal
=
(
tVariant
*
)
((
char
*
)(
*
pExpr
)
+
sizeof
(
t
Expr
Node
));
tVariantAssign
((
*
pExpr
)
->
pVal
,
&
pAst
->
val
);
}
else
if
(
pAst
->
nSQLOptr
>=
TK_COUNT
&&
pAst
->
nSQLOptr
<=
TK_AVG_IRATE
)
{
*
pExpr
=
calloc
(
1
,
sizeof
(
t
SQLSyntax
Node
)
+
sizeof
(
SSchemaEx
));
*
pExpr
=
calloc
(
1
,
sizeof
(
t
Expr
Node
)
+
sizeof
(
SSchemaEx
));
(
*
pExpr
)
->
nodeType
=
TSQL_NODE_COL
;
(
*
pExpr
)
->
pSchema
=
(
SSchema
*
)((
char
*
)(
*
pExpr
)
+
sizeof
(
t
SQLSyntax
Node
));
(
*
pExpr
)
->
pSchema
=
(
SSchema
*
)((
char
*
)(
*
pExpr
)
+
sizeof
(
t
Expr
Node
));
strncpy
((
*
pExpr
)
->
pSchema
->
name
,
pAst
->
operand
.
z
,
pAst
->
operand
.
n
);
// set the input column data byte and type.
...
...
@@ -5850,28 +5850,21 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr*
return
TSDB_CODE_SUCCESS
;
}
(
*
pExpr
)
->
colId
=
-
1
;
*
pColIndex
=
realloc
(
*
pColIndex
,
(
++
(
*
num
))
*
sizeof
(
SColIndexEx
));
memset
(
&
(
*
pColIndex
)[(
*
num
)
-
1
],
0
,
sizeof
(
SColIndexEx
));
strncpy
((
*
pColIndex
)[(
*
num
)
-
1
].
name
,
pAst
->
operand
.
z
,
pAst
->
operand
.
n
);
}
else
{
tSQLBinaryExpr
*
pBinExpr
=
(
tSQLBinaryExpr
*
)
calloc
(
1
,
sizeof
(
tSQLBinaryExpr
));
pBinExpr
->
filterOnPrimaryKey
=
false
;
pBinExpr
->
pLeft
=
pLeft
;
pBinExpr
->
pRight
=
pRight
;
*
pExpr
=
(
tExprNode
*
)
calloc
(
1
,
sizeof
(
tExprNode
));
(
*
pExpr
)
->
_node
.
hasPK
=
false
;
(
*
pExpr
)
->
_node
.
pLeft
=
pLeft
;
(
*
pExpr
)
->
_node
.
pRight
=
pRight
;
SSQLToken
t
=
{.
type
=
pAst
->
nSQLOptr
};
pBinExpr
->
nSQLBinaryO
ptr
=
getBinaryExprOptr
(
&
t
);
(
*
pExpr
)
->
_node
.
o
ptr
=
getBinaryExprOptr
(
&
t
);
assert
(
pBinExpr
->
nSQLBinaryO
ptr
!=
0
);
assert
(
(
*
pExpr
)
->
_node
.
o
ptr
!=
0
);
(
*
pExpr
)
=
malloc
(
sizeof
(
tSQLSyntaxNode
));
(
*
pExpr
)
->
nodeType
=
TSQL_NODE_EXPR
;
(
*
pExpr
)
->
pExpr
=
pBinExpr
;
(
*
pExpr
)
->
colId
=
-
1
;
if
(
pBinExpr
->
nSQLBinaryOptr
==
TSDB_BINARY_OP_DIVIDE
)
{
if
((
*
pExpr
)
->
_node
.
optr
==
TSDB_BINARY_OP_DIVIDE
)
{
if
(
pRight
->
nodeType
==
TSQL_NODE_VALUE
)
{
if
(
pRight
->
pVal
->
nType
==
TSDB_DATA_TYPE_INT
&&
pRight
->
pVal
->
i64Key
==
0
)
{
return
TSDB_CODE_INVALID_SQL
;
...
...
src/client/src/tscServer.c
浏览文件 @
7f43265b
...
...
@@ -285,14 +285,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
pRes
->
rspType
=
rpcMsg
->
msgType
;
pRes
->
rspLen
=
rpcMsg
->
contLen
;
char
*
tmp
=
(
char
*
)
realloc
(
pRes
->
pRsp
,
pRes
->
rspLen
);
if
(
tmp
==
NULL
)
{
pRes
->
code
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
else
{
pRes
->
pRsp
=
tmp
;
if
(
pRes
->
rspLen
)
{
if
(
pRes
->
rspLen
>
0
)
{
char
*
tmp
=
(
char
*
)
realloc
(
pRes
->
pRsp
,
pRes
->
rspLen
);
if
(
tmp
==
NULL
)
{
pRes
->
code
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
else
{
pRes
->
pRsp
=
tmp
;
memcpy
(
pRes
->
pRsp
,
rpcMsg
->
pCont
,
pRes
->
rspLen
);
}
}
else
{
pRes
->
pRsp
=
NULL
;
}
// ignore the error information returned from mnode when set ignore flag in sql
...
...
@@ -325,9 +327,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
if
(
rpcMsg
->
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
void
*
taosres
=
tscKeepConn
[
pCmd
->
command
]
?
pSql
:
NULL
;
rpcMsg
->
code
=
pRes
->
code
?
-
pRes
->
code
:
pRes
->
numOfRows
;
rpcMsg
->
code
=
pRes
->
code
?
pRes
->
code
:
pRes
->
numOfRows
;
tscTrace
(
"%p Async SQL result:%s res:%p"
,
pSql
,
tstrerror
(
pRes
->
code
),
taosres
);
tscTrace
(
"%p Async SQL result:%s res:%p"
,
pSql
,
tstrerror
(
pRes
->
code
),
pSql
);
/*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
...
...
@@ -893,11 +895,6 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SCMCreateDbMsg
);
pCmd
->
msgType
=
TSDB_MSG_TYPE_CM_CREATE_DB
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"%p failed to malloc for query msg"
,
pSql
);
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
SCMCreateDbMsg
*
pCreateDbMsg
=
(
SCMCreateDbMsg
*
)
pCmd
->
payload
;
assert
(
pCmd
->
numOfClause
==
1
);
...
...
src/client/src/tscSql.c
浏览文件 @
7f43265b
...
...
@@ -155,6 +155,10 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
STscObj
*
pObj
=
(
STscObj
*
)
param
;
assert
(
pObj
!=
NULL
&&
pObj
->
pSql
!=
NULL
);
if
(
code
<
0
)
{
pObj
->
pSql
->
res
.
code
=
code
;
}
sem_post
(
&
pObj
->
pSql
->
rspSem
);
}
...
...
@@ -177,17 +181,17 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
sem_wait
(
&
pSql
->
rspSem
);
if
(
pSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
pSql
->
res
.
code
;
taos_close
(
pObj
);
return
NULL
;
}
tscTrace
(
"%p DB connection is opening"
,
pObj
);
// version compare only requires the first 3 segments of the version string
int
code
=
taosCheckVersion
(
version
,
taos_get_server_info
(
pObj
),
3
);
if
(
code
!=
0
)
{
pSql
->
res
.
code
=
code
;
terrno
=
code
;
taos_close
(
pObj
);
return
NULL
;
}
else
{
...
...
@@ -267,31 +271,29 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
return
pRes
->
code
;
}
static
void
syncQueryCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
STscObj
*
pObj
=
(
STscObj
*
)
param
;
assert
(
pObj
!=
NULL
&&
pObj
->
pSql
!=
NULL
)
;
static
void
waitForQueryRsp
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
assert
(
param
!=
NULL
)
;
SSqlObj
*
pSql
=
((
STscObj
*
)
param
)
->
pSql
;
sem_post
(
&
pObj
->
pSql
->
rspSem
);
// valid error code is less than 0
if
(
code
<
0
)
{
pSql
->
res
.
code
=
code
;
}
sem_post
(
&
pSql
->
rspSem
);
}
int
taos_query
(
TAOS
*
taos
,
const
char
*
sqlstr
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
globalCode
=
TSDB_CODE_DISCONNECTED
;
terrno
=
TSDB_CODE_DISCONNECTED
;
return
TSDB_CODE_DISCONNECTED
;
}
SSqlObj
*
pSql
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pSql
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
pObj
->
pSql
=
pSql
;
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
SSqlObj
*
pSql
=
pObj
->
pSql
;
int32
_t
sqlLen
=
strlen
(
sqlstr
);
doAsyncQuery
(
pObj
,
pObj
->
pSql
,
syncQueryCallback
,
taos
,
sqlstr
,
sqlLen
);
size
_t
sqlLen
=
strlen
(
sqlstr
);
doAsyncQuery
(
pObj
,
pObj
->
pSql
,
waitForQueryRsp
,
taos
,
sqlstr
,
sqlLen
);
// wait for the callback function to post the semaphore
sem_wait
(
&
pSql
->
rspSem
);
...
...
@@ -649,12 +651,12 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
return
pRes
->
tsrow
;
}
static
void
asyncFetchCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
static
void
waitForRetrieveRsp
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
if
(
numOfRows
<
0
)
{
// set the error code
pSql
->
res
.
code
=
-
numOfRows
;
}
sem_post
(
&
pSql
->
rspSem
);
}
...
...
@@ -677,7 +679,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current data are exhausted, fetch more data
if
(
pRes
->
data
==
NULL
||
(
pRes
->
data
!=
NULL
&&
pRes
->
row
>=
pRes
->
numOfRows
&&
pRes
->
completed
!=
true
&&
(
pCmd
->
command
==
TSDB_SQL_RETRIEVE
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_METRIC
||
pCmd
->
command
==
TSDB_SQL_FETCH
)))
{
taos_fetch_rows_a
(
res
,
asyncFetchCallback
,
pSql
->
pTscObj
);
taos_fetch_rows_a
(
res
,
waitForRetrieveRsp
,
pSql
->
pTscObj
);
sem_wait
(
&
pSql
->
rspSem
);
}
...
...
@@ -754,20 +756,18 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if
(
pRes
==
NULL
||
pRes
->
qhandle
==
0
)
{
/* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace
(
"%p qhandle is null, abort free, fp:%p"
,
pSql
,
pSql
->
fp
);
if
(
pSql
->
fp
!=
NULL
)
{
STscObj
*
pObj
=
pSql
->
pTscObj
;
if
(
pSql
==
pObj
->
pSql
)
{
pObj
->
pSql
=
NULL
;
tscFreeSqlObj
(
pSql
);
}
if
(
tscShouldFreeAsyncSqlObj
(
pSql
))
{
tscFreeSqlObj
(
pSql
);
tscTrace
(
"%p Async SqlObj is freed by app"
,
pSql
);
}
else
if
(
keepCmd
)
{
tscFreeSqlResult
(
pSql
);
}
else
{
tscFreeSqlObjPartial
(
pSql
);
if
(
keepCmd
)
{
tscFreeSqlResult
(
pSql
);
}
else
{
tscFreeSqlObjPartial
(
pSql
);
}
}
return
;
}
...
...
@@ -793,7 +793,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
* be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport
*/
if
(
pRes
->
code
!=
TSDB_CODE_QUERY_CANCELLED
&&
((
pRes
->
numOfRows
>
0
&&
pCmd
->
command
<
TSDB_SQL_LOCAL
)
||
((
pRes
->
numOfRows
>
0
&&
pCmd
->
command
<
TSDB_SQL_LOCAL
&&
pRes
->
completed
==
false
)
||
(
pRes
->
code
==
TSDB_CODE_SUCCESS
&&
pRes
->
numOfRows
==
0
&&
pCmd
->
command
==
TSDB_SQL_SELECT
&&
pSql
->
pStream
==
NULL
&&
pTableMetaInfo
->
pTableMeta
!=
NULL
)))
{
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
...
...
@@ -836,39 +836,37 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
}
}
else
{
// if no free resource msg is sent to vnode, we free this object immediately.
if
(
pSql
->
fp
)
{
bool
free
=
tscShouldFreeAsyncSqlObj
(
pSql
);
if
(
free
)
{
assert
(
pRes
->
numOfRows
==
0
||
(
pCmd
->
command
>
TSDB_SQL_LOCAL
));
tscFreeSqlObj
(
pSql
);
tscTrace
(
"%p Async sql result is freed by app"
,
pSql
);
}
else
if
(
keepCmd
)
{
tscFreeSqlResult
(
pSql
);
tscTrace
(
"%p sql result is freed while sql command is kept"
,
pSql
);
}
else
{
tscFreeSqlObjPartial
(
pSql
);
tscTrace
(
"%p sql result is freed"
,
pSql
);
if
(
keepCmd
)
{
tscFreeSqlResult
(
pSql
);
tscTrace
(
"%p sql result is freed while sql command is kept"
,
pSql
);
}
else
{
tscFreeSqlObjPartial
(
pSql
);
tscTrace
(
"%p sql result is freed by app"
,
pSql
);
}
}
}
}
void
taos_free_result
(
TAOS_RES
*
res
)
{
taos_free_result_imp
(
res
,
0
);
}
// todo should not be used in async query
int
taos_errno
(
TAOS
*
taos
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
int
code
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
return
globalCode
;
if
((
int8_t
)(
pObj
->
pSql
->
res
.
code
)
==
-
1
)
code
=
TSDB_CODE_OTHERS
;
else
code
=
pObj
->
pSql
->
res
.
code
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
return
terrno
;
}
return
code
;
return
pObj
->
pSql
->
res
.
code
;
}
static
bool
validErrorCode
(
int32_t
code
)
{
return
code
>=
TSDB_CODE_SUCCESS
&&
code
<
TSDB_CODE_MAX_ERROR_CODE
;
}
/*
* In case of invalid sql error, additional information is attached to explain
* why the sql is invalid
...
...
@@ -888,25 +886,19 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
return
z
!=
NULL
;
}
// todo should not be used in async model
char
*
taos_errstr
(
TAOS
*
taos
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
uint8_t
code
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
return
(
char
*
)
tstrerror
(
globalCode
);
return
(
char
*
)
tstrerror
(
terrno
);
SSqlObj
*
pSql
=
pObj
->
pSql
;
if
(
validErrorCode
(
pSql
->
res
.
code
))
{
code
=
pSql
->
res
.
code
;
}
else
{
code
=
TSDB_CODE_OTHERS
;
// unknown error
}
if
(
hasAdditionalErrorInfo
(
code
,
&
pSql
->
cmd
))
{
SSqlObj
*
pSql
=
pObj
->
pSql
;
if
(
hasAdditionalErrorInfo
(
pSql
->
res
.
code
,
&
pSql
->
cmd
))
{
return
pSql
->
cmd
.
payload
;
}
else
{
return
(
char
*
)
tstrerror
(
code
);
return
(
char
*
)
tstrerror
(
pSql
->
res
.
code
);
}
}
...
...
src/client/src/tscSub.c
浏览文件 @
7f43265b
...
...
@@ -124,7 +124,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSql
->
sqlstr
=
sqlstr
;
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
tsem_init
(
&
pSql
->
emptyRspSem
,
0
,
1
);
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
numOfRows
=
1
;
...
...
src/client/src/tscSubquery.c
浏览文件 @
7f43265b
...
...
@@ -392,10 +392,10 @@ void freeSubqueryObj(SSqlObj* pSql) {
static
void
doQuitSubquery
(
SSqlObj
*
pParentSql
)
{
freeSubqueryObj
(
pParentSql
);
tsem_wait
(
&
pParentSql
->
emptyRspSem
);
tsem_wait
(
&
pParentSql
->
emptyRspSem
);
//
tsem_wait(&pParentSql->emptyRspSem);
//
tsem_wait(&pParentSql->emptyRspSem);
tsem_post
(
&
pParentSql
->
rspSem
);
//
tsem_post(&pParentSql->rspSem);
}
static
void
quitAllSubquery
(
SSqlObj
*
pSqlObj
,
SJoinSubquerySupporter
*
pSupporter
)
{
...
...
@@ -567,7 +567,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
freeSubqueryObj
(
pParentSql
);
}
tsem_post
(
&
pParentSql
->
rspSem
);
//
tsem_post(&pParentSql->rspSem);
}
else
{
tscTrace
(
"%p sub:%p completed, completed:%d, total:%d"
,
pParentSql
,
tres
,
finished
,
numOfTotal
);
}
...
...
@@ -662,7 +662,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
}
// wait for all subquery completed
tsem_wait
(
&
pSql
->
rspSem
);
//
tsem_wait(&pSql->rspSem);
// update the records for each subquery
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
...
...
@@ -797,10 +797,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscProcessSql
(
pSql
);
}
else
{
// first retrieve from vnode during the secondary stage sub-query
if
(
pParentSql
->
fp
==
NULL
)
{
tsem_wait
(
&
pParentSql
->
emptyRspSem
);
tsem_wait
(
&
pParentSql
->
emptyRspSem
);
tsem_post
(
&
pParentSql
->
rspSem
);
// tsem_post(&pParentSql->rspSem);
}
else
{
// set the command flag must be after the semaphore been correctly set.
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
...
...
@@ -954,10 +951,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
}
tsem_post
(
&
pSql
->
emptyRspSem
);
tsem_wait
(
&
pSql
->
rspSem
);
tsem_post
(
&
pSql
->
emptyRspSem
);
// tsem_wait(&pSql->rspSem);
if
(
pSql
->
numOfSubs
<=
0
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
...
...
src/client/src/tscSystem.c
浏览文件 @
7f43265b
...
...
@@ -40,15 +40,10 @@ void * tscQhandle;
void
*
tscCheckDiskUsageTmr
;
int
tsInsertHeadSize
;
extern
int
tscEmbedded
;
int
tscNumOfThreads
;
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
static
pthread_mutex_t
tscMutex
;
int
tscNumOfThreads
;
extern
int
tsTscEnableRecordSql
;
extern
int
tsNumOfLogLines
;
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
void
taosInitNote
(
int
numOfNoteLines
,
int
maxNotes
,
char
*
lable
);
void
deltaToUtcInitOnce
();
void
tscCheckDiskUsage
(
void
*
para
,
void
*
unused
)
{
taosGetDisk
();
...
...
@@ -60,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
char
secretEncrypt
[
32
]
=
{
0
};
taosEncryptPass
((
uint8_t
*
)
secret
,
strlen
(
secret
),
secretEncrypt
);
pthread_mutex_lock
(
&
tscMutex
);
if
(
pVnodeConn
==
NULL
)
{
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
tsLocalIp
;
...
...
@@ -78,7 +72,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
pVnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
pVnodeConn
==
NULL
)
{
tscError
(
"failed to init connection to vnode"
);
pthread_mutex_unlock
(
&
tscMutex
);
return
-
1
;
}
}
...
...
@@ -100,12 +93,10 @@ int32_t tscInitRpc(const char *user, const char *secret) {
pTscMgmtConn
=
rpcOpen
(
&
rpcInit
);
if
(
pTscMgmtConn
==
NULL
)
{
tscError
(
"failed to init connection to mgmt"
);
pthread_mutex_unlock
(
&
tscMutex
);
return
-
1
;
}
}
pthread_mutex_unlock
(
&
tscMutex
);
return
0
;
}
...
...
@@ -113,7 +104,7 @@ void taos_init_imp() {
char
temp
[
128
];
struct
stat
dirstat
;
pthread_mutex_init
(
&
tscMutex
,
NULL
)
;
errno
=
TSDB_CODE_SUCCESS
;
srand
(
taosGetTimestampSec
());
deltaToUtcInitOnce
();
...
...
src/client/src/tscUtil.c
浏览文件 @
7f43265b
...
...
@@ -469,7 +469,7 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
pSql
->
freed
=
0
;
tscFreeSqlCmdData
(
pCmd
);
tscTrace
(
"%p
free sqlObj partial
completed"
,
pSql
);
tscTrace
(
"%p
partially free sqlObj
completed"
,
pSql
);
}
void
tscFreeSqlObj
(
SSqlObj
*
pSql
)
{
...
...
@@ -487,8 +487,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tfree
(
pCmd
->
payload
);
pCmd
->
allocSize
=
0
;
tsem_destroy
(
&
pSql
->
rspSem
);
free
(
pSql
);
}
...
...
@@ -820,7 +818,9 @@ void tscCloseTscObj(STscObj* pObj) {
taosTmrStopA
(
&
(
pObj
->
pTimer
));
tscFreeSqlObj
(
pSql
);
sem_destroy
(
&
pSql
->
rspSem
);
pthread_mutex_destroy
(
&
pObj
->
mutex
);
tscTrace
(
"%p DB connection is closed"
,
pObj
);
tfree
(
pObj
);
}
...
...
@@ -842,10 +842,9 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
if
(
pCmd
->
payload
==
NULL
)
{
assert
(
pCmd
->
allocSize
==
0
);
pCmd
->
payload
=
(
char
*
)
malloc
(
size
);
pCmd
->
payload
=
(
char
*
)
calloc
(
1
,
size
);
if
(
pCmd
->
payload
==
NULL
)
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
pCmd
->
allocSize
=
size
;
memset
(
pCmd
->
payload
,
0
,
pCmd
->
allocSize
);
}
else
{
if
(
pCmd
->
allocSize
<
size
)
{
char
*
b
=
realloc
(
pCmd
->
payload
,
size
);
...
...
@@ -853,6 +852,8 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
pCmd
->
payload
=
b
;
pCmd
->
allocSize
=
size
;
}
memset
(
pCmd
->
payload
,
0
,
pCmd
->
payloadLen
);
}
//memset(pCmd->payload, 0, pCmd->allocSize);
...
...
@@ -1105,7 +1106,7 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) {
for
(
int32_t
i
=
0
;
i
<
pFieldInfo
->
numOfOutputCols
;
++
i
)
{
if
(
pFieldInfo
->
pExpr
[
i
]
!=
NULL
)
{
t
SQLBinaryExpr
Destroy
(
&
pFieldInfo
->
pExpr
[
i
]
->
binExprInfo
.
pBinExpr
,
NULL
);
t
ExprTree
Destroy
(
&
pFieldInfo
->
pExpr
[
i
]
->
binExprInfo
.
pBinExpr
,
NULL
);
tfree
(
pFieldInfo
->
pExpr
[
i
]
->
binExprInfo
.
pReqColumns
);
tfree
(
pFieldInfo
->
pExpr
[
i
]);
}
...
...
@@ -1742,7 +1743,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
}
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
if
(
pSql
->
pStream
!=
NULL
||
pTscObj
->
pHb
==
pSql
)
{
if
(
pSql
->
pStream
!=
NULL
||
pTscObj
->
pHb
==
pSql
||
pTscObj
->
pSql
==
pSql
)
{
return
false
;
}
...
...
@@ -1929,7 +1930,6 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
}
pTableMetaInfo
->
pTableMeta
=
pTableMeta
;
// pTableMetaInfo->pMetricMeta = pMetricMeta;
pTableMetaInfo
->
numOfTags
=
numOfTags
;
if
(
tags
!=
NULL
)
{
...
...
@@ -1963,7 +1963,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
}
void
tscRemoveAllMeterMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
address
,
bool
removeFromCache
)
{
tscTrace
(
"%p deref the
metric/meter
meta in cache, numOfTables:%d"
,
address
,
pQueryInfo
->
numOfTables
);
tscTrace
(
"%p deref the
table
meta in cache, numOfTables:%d"
,
address
,
pQueryInfo
->
numOfTables
);
int32_t
index
=
pQueryInfo
->
numOfTables
;
while
(
index
>=
0
)
{
...
...
src/dnode/src/dnodeRead.c
浏览文件 @
7f43265b
...
...
@@ -286,6 +286,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
dnodeContinueExecuteQuery
(
pVnode
,
pQInfo
,
pMsg
);
}
else
{
// no further execution invoked, release the ref to vnode
dnodeProcessReadResult
(
pVnode
,
pMsg
);
dnodeReleaseVnode
(
pVnode
);
}
}
...
...
src/inc/taoserror.h
浏览文件 @
7f43265b
...
...
@@ -80,7 +80,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ALREADY_EXIST, 0, 34, "table already ex
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_USER
,
0
,
35
,
"invalid user"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_ACCT
,
0
,
36
,
"invalid account"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_PASS
,
0
,
37
,
"invalid password"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DB_NOT_SELECTED
,
0
,
38
,
"d
o
not selected"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DB_NOT_SELECTED
,
0
,
38
,
"d
b
not selected"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MEMORY_CORRUPTED
,
0
,
39
,
"memory corrupted"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_USER_ALREADY_EXIST
,
0
,
40
,
"user already exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_NO_RIGHTS
,
0
,
41
,
"no rights"
)
...
...
@@ -118,7 +118,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MONITOR_DB_FORBIDDEN, 0, 72, "monitor db forbi
TAOS_DEFINE_ERROR
(
TSDB_CODE_NO_DISK_PERMISSIONS
,
0
,
73
,
"no disk permissions"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VG_INIT_FAILED
,
0
,
74
,
"vg init failed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DATA_ALREADY_IMPORTED
,
0
,
75
,
"data already imported"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_OPS_NOT_SUPPORT
,
0
,
76
,
"ops not support"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_OPS_NOT_SUPPORT
,
0
,
76
,
"op
eration
s not support"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_QUERY_ID
,
0
,
77
,
"invalid query id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_STREAM_ID
,
0
,
78
,
"invalid stream id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_CONNECTION
,
0
,
79
,
"invalid connection"
)
...
...
src/mnode/inc/mgmtMnode.h
浏览文件 @
7f43265b
...
...
@@ -30,9 +30,6 @@ bool mgmtCheckRedirect(void *handle);
void
mgmtGetMnodePrivateIpList
(
SRpcIpSet
*
ipSet
);
void
mgmtGetMnodePublicIpList
(
SRpcIpSet
*
ipSet
);
int32_t
mgmtAddMnode
(
uint32_t
privateIp
,
uint32_t
publicIp
);
int32_t
mgmtRemoveMnode
(
uint32_t
privateIp
);
#ifdef __cplusplus
}
#endif
...
...
src/mnode/src/mgmtBalance.c
浏览文件 @
7f43265b
...
...
@@ -49,6 +49,7 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
pVgroup
->
vnodeGid
[
0
].
dnodeId
=
pSelDnode
->
dnodeId
;
pVgroup
->
vnodeGid
[
0
].
privateIp
=
pSelDnode
->
privateIp
;
pVgroup
->
vnodeGid
[
0
].
publicIp
=
pSelDnode
->
publicIp
;
mTrace
(
"dnode:%d, alloc one vnode to vgroup"
,
pSelDnode
->
dnodeId
);
mTrace
(
"dnode:%d, alloc one vnode to vgroup, openVnodes:%d"
,
pSelDnode
->
dnodeId
,
pSelDnode
->
openVnodes
);
return
TSDB_CODE_SUCCESS
;
}
src/mnode/src/mgmtDnode.c
浏览文件 @
7f43265b
...
...
@@ -29,10 +29,19 @@
static
void
mgmtProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
;
static
void
mgmtProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mgmtGetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
extern
int32_t
clusterInit
();
extern
void
clusterCleanUp
();
extern
int32_t
clusterGetDnodesNum
();
extern
void
*
clusterGetNextDnode
(
void
*
pNode
,
SDnodeObj
**
pDnode
);
extern
void
*
clusterGetNextDnode
(
void
*
pNode
,
void
**
pDnode
);
extern
SDnodeObj
*
clusterGetDnode
(
int32_t
dnodeId
);
extern
SDnodeObj
*
clusterGetDnodeByIp
(
uint32_t
ip
);
#ifndef _CLUSTER
...
...
@@ -43,7 +52,15 @@ int32_t mgmtInitDnodes() {
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONFIG_DNODE
,
mgmtProcessCfgDnodeMsg
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
,
mgmtProcessCfgDnodeMsgRsp
);
mgmtAddDServerMsgHandle
(
TSDB_MSG_TYPE_DM_STATUS
,
mgmtProcessDnodeStatusMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_MODULE
,
mgmtGetModuleMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_MODULE
,
mgmtRetrieveModules
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
mgmtGetConfigMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
mgmtRetrieveConfigs
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_VNODES
,
mgmtGetVnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_VNODES
,
mgmtRetrieveVnodes
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_DNODE
,
mgmtGetDnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_DNODE
,
mgmtRetrieveDnodes
);
#ifdef _CLUSTER
return
clusterInit
();
#else
...
...
@@ -251,3 +268,437 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
rpcSendResponse
(
&
rpcRsp
);
}
static
int32_t
mgmtGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
pAcct
->
user
,
"root"
)
!=
0
)
return
TSDB_CODE_NO_RIGHTS
;
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
schema
;
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"id"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
16
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"private ip"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
16
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"public ip"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create time"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
10
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"status"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"open vnodes"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"total vnodes"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
#ifdef _VPEER
pShow
->
bytes
[
cols
]
=
18
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"balance"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
#endif
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
mgmtGetDnodesNum
();
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
return
0
;
}
static
int32_t
mgmtRetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
char
*
pWrite
;
char
ipstr
[
32
];
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
mgmtGetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pDnode
->
dnodeId
;
cols
++
;
tinet_ntoa
(
ipstr
,
pDnode
->
privateIp
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
ipstr
);
cols
++
;
tinet_ntoa
(
ipstr
,
pDnode
->
publicIp
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
ipstr
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pDnode
->
createdTime
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetDnodeStatusStr
(
pDnode
->
status
)
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pDnode
->
openVnodes
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pDnode
->
numOfTotalVnodes
;
cols
++
;
#ifdef _VPEER
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetDnodeLbStatusStr
(
pDnode
->
lbStatus
));
cols
++
;
#endif
numOfRows
++
;
}
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
static
bool
clusterCheckModuleInDnode
(
SDnodeObj
*
pDnode
,
int32_t
moduleType
)
{
uint32_t
status
=
pDnode
->
moduleStatus
&
(
1
<<
moduleType
);
return
status
>
0
;
}
static
int32_t
mgmtGetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
return
TSDB_CODE_NO_RIGHTS
;
SSchema
*
pSchema
=
pMeta
->
schema
;
pShow
->
bytes
[
cols
]
=
16
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"IP"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
10
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"module type"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
10
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"module status"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
while
(
1
)
{
pShow
->
pNode
=
mgmtGetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
if
(
clusterCheckModuleInDnode
(
pDnode
,
moduleType
))
{
pShow
->
numOfRows
++
;
}
}
}
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
return
0
;
}
int32_t
mgmtRetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
char
*
pWrite
;
int32_t
cols
=
0
;
char
ipstr
[
20
];
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
mgmtGetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
if
(
!
clusterCheckModuleInDnode
(
pDnode
,
moduleType
))
{
continue
;
}
cols
=
0
;
tinet_ntoa
(
ipstr
,
pDnode
->
privateIp
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
ipstr
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
tsModule
[
moduleType
].
name
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetDnodeStatusStr
(
pDnode
->
status
)
);
cols
++
;
numOfRows
++
;
}
}
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
static
bool
clusterCheckConfigShow
(
SGlobalConfig
*
cfg
)
{
if
(
!
(
cfg
->
cfgType
&
TSDB_CFG_CTYPE_B_SHOW
))
return
false
;
return
true
;
}
static
int32_t
mgmtGetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
return
TSDB_CODE_NO_RIGHTS
;
SSchema
*
pSchema
=
pMeta
->
schema
;
pShow
->
bytes
[
cols
]
=
TSDB_CFG_OPTION_LEN
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"config name"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_CFG_VALUE_LEN
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"config value"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
pShow
->
numOfRows
=
0
;
for
(
int32_t
i
=
tsGlobalConfigNum
-
1
;
i
>=
0
;
--
i
)
{
SGlobalConfig
*
cfg
=
tsGlobalConfig
+
i
;
if
(
!
clusterCheckConfigShow
(
cfg
))
continue
;
pShow
->
numOfRows
++
;
}
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
return
0
;
}
static
int32_t
mgmtRetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
for
(
int32_t
i
=
tsGlobalConfigNum
-
1
;
i
>=
0
&&
numOfRows
<
rows
;
--
i
)
{
SGlobalConfig
*
cfg
=
tsGlobalConfig
+
i
;
if
(
!
clusterCheckConfigShow
(
cfg
))
continue
;
char
*
pWrite
;
int32_t
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
snprintf
(
pWrite
,
TSDB_CFG_OPTION_LEN
,
"%s"
,
cfg
->
option
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
switch
(
cfg
->
valType
)
{
case
TSDB_CFG_VTYPE_SHORT
:
snprintf
(
pWrite
,
TSDB_CFG_VALUE_LEN
,
"%d"
,
*
((
int16_t
*
)
cfg
->
ptr
));
numOfRows
++
;
break
;
case
TSDB_CFG_VTYPE_INT
:
snprintf
(
pWrite
,
TSDB_CFG_VALUE_LEN
,
"%d"
,
*
((
int32_t
*
)
cfg
->
ptr
));
numOfRows
++
;
break
;
case
TSDB_CFG_VTYPE_UINT
:
snprintf
(
pWrite
,
TSDB_CFG_VALUE_LEN
,
"%d"
,
*
((
uint32_t
*
)
cfg
->
ptr
));
numOfRows
++
;
break
;
case
TSDB_CFG_VTYPE_FLOAT
:
snprintf
(
pWrite
,
TSDB_CFG_VALUE_LEN
,
"%f"
,
*
((
float
*
)
cfg
->
ptr
));
numOfRows
++
;
break
;
case
TSDB_CFG_VTYPE_STRING
:
case
TSDB_CFG_VTYPE_IPSTR
:
case
TSDB_CFG_VTYPE_DIRECTORY
:
snprintf
(
pWrite
,
TSDB_CFG_VALUE_LEN
,
"%s"
,
(
char
*
)
cfg
->
ptr
);
numOfRows
++
;
break
;
default:
break
;
}
}
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
static
int32_t
mgmtGetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
return
TSDB_CODE_NO_RIGHTS
;
SSchema
*
pSchema
=
pMeta
->
schema
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"vnode"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
12
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"status"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
12
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"sync_status"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
SDnodeObj
*
pDnode
=
NULL
;
if
(
pShow
->
payloadLen
>
0
)
{
uint32_t
ip
=
ip2uint
(
pShow
->
payload
);
pDnode
=
mgmtGetDnodeByIp
(
ip
);
if
(
NULL
==
pDnode
)
{
return
TSDB_CODE_NODE_OFFLINE
;
}
SVnodeLoad
*
pVnode
;
pShow
->
numOfRows
=
0
;
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_VNODES
;
i
++
)
{
pVnode
=
&
pDnode
->
vload
[
i
];
if
(
0
!=
pVnode
->
vgId
)
{
pShow
->
numOfRows
++
;
}
}
pShow
->
pNode
=
pDnode
;
}
else
{
while
(
true
)
{
pShow
->
pNode
=
mgmtGetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
pShow
->
numOfRows
+=
pDnode
->
openVnodes
;
if
(
0
==
pShow
->
numOfRows
)
return
TSDB_CODE_NODE_OFFLINE
;
}
pShow
->
pNode
=
NULL
;
}
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
return
0
;
}
static
int32_t
mgmtRetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
char
*
pWrite
;
int32_t
cols
=
0
;
if
(
0
==
rows
)
return
0
;
if
(
pShow
->
payloadLen
)
{
// output the vnodes info of the designated dnode. And output all vnodes of this dnode, instead of rows (max 100)
pDnode
=
(
SDnodeObj
*
)(
pShow
->
pNode
);
if
(
pDnode
!=
NULL
)
{
SVnodeLoad
*
pVnode
;
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_VNODES
;
i
++
)
{
pVnode
=
&
pDnode
->
vload
[
i
];
if
(
0
==
pVnode
->
vgId
)
{
continue
;
}
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
uint32_t
*
)
pWrite
=
pVnode
->
vgId
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetVnodeStatusStr
(
pVnode
->
status
));
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetVnodeSyncStatusStr
(
pVnode
->
syncStatus
));
cols
++
;
numOfRows
++
;
}
}
}
else
{
// TODO: output all vnodes of all dnodes
numOfRows
=
0
;
}
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
\ No newline at end of file
src/mnode/src/mgmtMnode.c
浏览文件 @
7f43265b
...
...
@@ -15,122 +15,82 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tstatus.h"
#include "trpc.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtUser.h"
int32_t
(
*
mpeerAddMnodeFp
)(
uint32_t
privateIp
,
uint32_t
publicIp
)
=
NULL
;
int32_t
(
*
mpeerRemoveMnodeFp
)(
uint32_t
privateIp
)
=
NULL
;
int32_t
(
*
mpeerGetMnodesNumFp
)()
=
NULL
;
void
*
(
*
mpeerGetNextMnodeFp
)(
SShowObj
*
pShow
,
SMnodeObj
**
pMnode
)
=
NULL
;
int32_t
(
*
mpeerInitMnodesFp
)()
=
NULL
;
void
(
*
mpeerCleanUpMnodesFp
)()
=
NULL
;
#ifndef _MPEER
static
SMnodeObj
tsMnodeObj
=
{
0
};
static
bool
tsMnodeIsMaster
=
false
;
static
bool
tsMnodeIsServing
=
false
;
static
int32_t
mgmtGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
char
*
mgmtMnodeStatusStr
[]
=
{
"offline"
,
"unsynced"
,
"syncing"
,
"serving"
,
"null"
};
static
char
*
mgmtMnodeRoleStr
[]
=
{
"unauthed"
,
"undecided"
,
"master"
,
"slave"
,
"null"
};
int32_t
mgmtInitMnodes
()
{
if
(
mpeerInitMnodesFp
)
{
return
(
*
mpeerInitMnodesFp
)();
}
else
{
tsMnodeIsServing
=
true
;
tsMnodeIsMaster
=
true
;
return
0
;
}
}
void
mgmtCleanupMnodes
()
{
if
(
mpeerCleanUpMnodesFp
)
{
(
*
mpeerCleanUpMnodesFp
)();
}
}
bool
mgmtInServerStatus
()
{
return
tsMnodeIsServing
;
}
bool
mgmtIsMaster
()
{
return
tsMnodeIsMaster
;
}
bool
mgmtCheckRedirect
(
void
*
handle
)
{
return
false
;
}
int32_t
mgmtAddMnode
(
uint32_t
privateIp
,
uint32_t
publicIp
)
{
if
(
mpeerAddMnodeFp
)
{
return
(
*
mpeerAddMnodeFp
)(
privateIp
,
publicIp
);
}
else
{
return
0
;
}
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_MNODE
,
mgmtGetMnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_MNODE
,
mgmtRetrieveMnodes
);
tsMnodeObj
.
mnodeId
=
1
;
tsMnodeObj
.
privateIp
=
inet_addr
(
tsPrivateIp
);
tsMnodeObj
.
publicIp
=
inet_addr
(
tsPublicIp
);
tsMnodeObj
.
createdTime
=
taosGetTimestampMs
();
tsMnodeObj
.
role
=
TSDB_MN_ROLE_MASTER
;
tsMnodeObj
.
status
=
TSDB_MN_STATUS_SERVING
;
tsMnodeObj
.
numOfMnodes
=
1
;
sprintf
(
tsMnodeObj
.
mnodeName
,
"%d"
,
tsMnodeObj
.
mnodeId
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
mgmtRemoveMnode
(
uint32_t
privateIp
)
{
if
(
mpeerRemoveMnodeFp
)
{
return
(
*
mpeerRemoveMnodeFp
)(
privateIp
);
}
else
{
return
0
;
}
}
void
mgmtCleanupMnodes
()
{}
bool
mgmtInServerStatus
()
{
return
tsMnodeObj
.
status
==
TSDB_MN_STATUS_SERVING
;
}
bool
mgmtIsMaster
()
{
return
tsMnodeObj
.
role
==
TSDB_MN_ROLE_MASTER
;
}
bool
mgmtCheckRedirect
(
void
*
handle
)
{
return
false
;
}
static
int32_t
mgmtGetMnodesNum
()
{
if
(
mpeerGetMnodesNumFp
)
{
return
(
*
mpeerGetMnodesNumFp
)();
}
else
{
return
1
;
}
return
1
;
}
static
void
*
mgmtGetNextMnode
(
SShowObj
*
pShow
,
SMnodeObj
**
pMnode
)
{
if
(
mpeerGetNextMnodeFp
)
{
return
(
*
mpeerGetNextMnodeFp
)(
pShow
,
pMnode
)
;
static
void
*
mgmtGetNextMnode
(
void
*
pNode
,
SMnodeObj
**
pMnode
)
{
if
(
*
pMnode
==
NULL
)
{
*
pMnode
=
&
tsMnodeObj
;
}
else
{
if
(
*
pMnode
==
NULL
)
{
*
pMnode
=
&
tsMnodeObj
;
}
else
{
*
pMnode
=
NULL
;
}
*
pMnode
=
NULL
;
}
return
*
pMnode
;
}
static
int32_t
mgmtGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
return
TSDB_CODE_NO_RIGHTS
;
if
(
strcmp
(
pUser
->
pAcct
->
user
,
"root"
)
!=
0
)
return
TSDB_CODE_NO_RIGHTS
;
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
schema
;
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"id"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
16
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"private ip"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
16
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"public ip"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create time"
);
...
...
@@ -149,12 +109,6 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
16
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"public ip"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
...
...
@@ -163,7 +117,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
mgmtGet
M
nodesNum
();
pShow
->
numOfRows
=
mgmtGet
D
nodesNum
();
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
...
...
@@ -171,38 +125,42 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
}
static
int32_t
mgmtRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
SMnodeObj
*
pMnode
=
NULL
;
char
*
pWrite
;
char
ipstr
[
32
];
char
*
pWrite
;
char
ipstr
[
32
];
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
mgmtGetNextMnode
(
pShow
,
(
SMnodeObj
**
)
&
pMnode
);
pShow
->
pNode
=
mgmtGetNextMnode
(
pShow
->
pNode
,
(
SMnodeObj
**
)
&
pMnode
);
if
(
pMnode
==
NULL
)
break
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pMnode
->
mnodeId
;
cols
++
;
tinet_ntoa
(
ipstr
,
pMnode
->
privateIp
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
ipstr
);
cols
++
;
tinet_ntoa
(
ipstr
,
pMnode
->
publicIp
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pMnode
->
createdTime
;
strcpy
(
pWrite
,
ipstr
)
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
mgmtMnodeStatusStr
[
pMnode
->
status
])
;
*
(
int64_t
*
)
pWrite
=
pMnode
->
createdTime
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
mgmtMnodeRoleStr
[
pMnode
->
role
]
);
strcpy
(
pWrite
,
taosGetMnodeStatusStr
(
pMnode
->
status
)
);
cols
++
;
tinet_ntoa
(
ipstr
,
pMnode
->
publicIp
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
ipstr
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetMnodeRoleStr
(
pMnode
->
role
));
cols
++
;
numOfRows
++
;
...
...
@@ -216,12 +174,14 @@ void mgmtGetMnodePrivateIpList(SRpcIpSet *ipSet) {
ipSet
->
inUse
=
0
;
ipSet
->
port
=
htons
(
tsMnodeDnodePort
);
ipSet
->
numOfIps
=
1
;
ipSet
->
ip
[
0
]
=
htonl
(
inet_addr
(
tsMasterIp
)
);
ipSet
->
ip
[
0
]
=
htonl
(
tsMnodeObj
.
privateIp
);
}
void
mgmtGetMnodePublicIpList
(
SRpcIpSet
*
ipSet
)
{
ipSet
->
inUse
=
0
;
ipSet
->
port
=
htons
(
tsMnodeDnodePort
);
ipSet
->
numOfIps
=
1
;
ipSet
->
ip
[
0
]
=
htonl
(
inet_addr
(
tsMasterIp
));
}
\ No newline at end of file
ipSet
->
ip
[
0
]
=
htonl
(
tsMnodeObj
.
publicIp
);
}
#endif
\ No newline at end of file
src/mnode/src/mgmtShell.c
浏览文件 @
7f43265b
...
...
@@ -48,6 +48,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg);
static
void
mgmtProcessRetrieveMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessHeartBeatMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessConnectMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessUseMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
*
tsMgmtShellRpc
=
NULL
;
static
void
*
tsMgmtTranQhandle
=
NULL
;
...
...
@@ -60,7 +61,8 @@ int32_t mgmtInitShell() {
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_RETRIEVE
,
mgmtProcessRetrieveMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_HEARTBEAT
,
mgmtProcessHeartBeatMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mgmtProcessConnectMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_USE_DB
,
mgmtProcessUseMsg
);
tsMgmtTranQhandle
=
taosInitScheduler
(
tsMaxShellConns
,
1
,
"mnodeT"
);
int32_t
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
/
4
.
0
;
...
...
@@ -416,6 +418,23 @@ connect_over:
rpcSendResponse
(
&
rpcRsp
);
}
static
void
mgmtProcessUseMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMUseDbMsg
*
pUseDbMsg
=
pMsg
->
pCont
;
// todo check for priority of current user
SDbObj
*
pDbObj
=
mgmtGetDb
(
pUseDbMsg
->
db
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pDbObj
==
NULL
)
{
code
=
TSDB_CODE_INVALID_DB
;
}
rpcRsp
.
code
=
code
;
rpcSendResponse
(
&
rpcRsp
);
}
/**
* check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one.
*/
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
7f43265b
...
...
@@ -52,6 +52,14 @@ static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) {
if
(
pVgroup
->
tableList
)
{
tfree
(
pVgroup
->
tableList
);
}
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
if
(
pDnode
)
{
atomic_sub_fetch_32
(
&
pDnode
->
openVnodes
,
1
);
}
}
tfree
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -87,6 +95,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
pVgroup
->
vnodeGid
[
i
].
privateIp
=
pDnode
->
privateIp
;
pVgroup
->
vnodeGid
[
i
].
publicIp
=
pDnode
->
publicIp
;
pVgroup
->
vnodeGid
[
i
].
vnode
=
pVgroup
->
vgId
;
atomic_add_fetch_32
(
&
pDnode
->
openVnodes
,
1
);
}
mgmtAddVgroupIntoDb
(
pVgroup
);
...
...
src/os/linux/src/tlinux.c
浏览文件 @
7f43265b
...
...
@@ -236,7 +236,7 @@ void *taosProcessAlarmSignal(void *tharg) {
void
(
*
callback
)(
int
)
=
tharg
;
timer_t
timerId
;
struct
sigevent
sevent
;
struct
sigevent
sevent
=
{
0
}
;
#ifdef _ALPINE
sevent
.
sigev_notify
=
SIGEV_THREAD
;
...
...
src/query/inc/qast.h
浏览文件 @
7f43265b
...
...
@@ -16,6 +16,7 @@
#ifndef TDENGINE_TAST_H
#define TDENGINE_TAST_H
#include <tbuffer.h>
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -27,14 +28,14 @@ extern "C" {
#include "taosdef.h"
#include "tvariant.h"
struct
t
SQLBinaryExpr
;
struct
t
ExprNode
;
struct
SSchema
;
struct
tSkipList
;
struct
tSkipListNode
;
enum
{
TSQL_NODE_EXPR
=
0x1
,
TSQL_NODE_COL
=
0x2
,
TSQL_NODE_EXPR
=
0x1
,
TSQL_NODE_COL
=
0x2
,
TSQL_NODE_VALUE
=
0x4
,
};
...
...
@@ -60,50 +61,41 @@ typedef struct SBinaryFilterSupp {
void
*
pExtInfo
;
}
SBinaryFilterSupp
;
typedef
struct
t
SQLSyntax
Node
{
typedef
struct
t
Expr
Node
{
uint8_t
nodeType
;
int16_t
colId
;
// for schema, the id of column
union
{
struct
tSQLBinaryExpr
*
pExpr
;
struct
SSchema
*
pSchema
;
tVariant
*
pVal
;
struct
{
uint8_t
optr
;
// filter operator
uint8_t
hasPK
;
// 0: do not contain primary filter, 1: contain
void
*
info
;
// support filter operation on this expression only available for leaf node
struct
tExprNode
*
pLeft
;
// left child pointer
struct
tExprNode
*
pRight
;
// right child pointer
}
_node
;
struct
SSchema
*
pSchema
;
tVariant
*
pVal
;
};
}
t
SQLSyntax
Node
;
}
t
Expr
Node
;
typedef
struct
tSQLBinaryExpr
{
uint8_t
nSQLBinaryOptr
;
// filter operator
uint8_t
filterOnPrimaryKey
;
// 0: do not contain primary filter, 1: contain
void
tSQLBinaryExprFromString
(
tExprNode
**
pExpr
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
char
*
src
,
int32_t
len
);
/*
* provide the information to support filter operation on this expression
* only available for leaf node
*/
void
*
info
;
tSQLSyntaxNode
*
pLeft
;
// left child pointer
tSQLSyntaxNode
*
pRight
;
// right child pointer
}
tSQLBinaryExpr
;
void
tSQLBinaryExprToString
(
tExprNode
*
pExpr
,
char
*
dst
,
int32_t
*
len
);
typedef
struct
tQueryResultset
{
void
**
pRes
;
int64_t
num
;
}
tQueryResultset
;
void
tExprTreeDestroy
(
tExprNode
**
pExprs
,
void
(
*
fp
)(
void
*
));
void
tSQLBinaryExpr
FromString
(
tSQLBinaryExpr
**
pExpr
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
char
*
src
,
int32_t
len
);
void
tSQLBinaryExpr
Traverse
(
tExprNode
*
pExpr
,
SSkipList
*
pSkipList
,
SArray
*
result
,
SBinaryFilterSupp
*
param
);
void
tSQLBinaryExprToString
(
tSQLBinaryExpr
*
pExpr
,
char
*
dst
,
int32_t
*
len
);
void
tSQLBinaryExprDestroy
(
tSQLBinaryExpr
**
pExprs
,
void
(
*
fp
)(
void
*
));
void
tSQLBinaryExprTraverse
(
tSQLBinaryExpr
*
pExpr
,
SSkipList
*
pSkipList
,
SArray
*
result
,
SBinaryFilterSupp
*
param
);
void
tSQLBinaryExprCalcTraverse
(
tSQLBinaryExpr
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
void
tSQLBinaryExprCalcTraverse
(
tExprNode
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
char
*
(
*
cb
)(
void
*
,
char
*
,
int32_t
));
void
tSQLBinaryExprTrv
(
tSQLBinaryExpr
*
pExprs
,
int32_t
*
val
,
int16_t
*
ids
);
void
tQueryResultClean
(
tQueryResultset
*
pRes
);
void
tSQLBinaryExprTrv
(
tExprNode
*
pExprs
,
int32_t
*
val
,
int16_t
*
ids
);
uint8_t
getBinaryExprOptr
(
SSQLToken
*
pToken
);
SBuffer
exprTreeToBinary
(
tExprNode
*
pExprTree
);
tExprNode
*
exprTreeFromBinary
(
const
void
*
pBuf
,
size_t
size
);
#ifdef __cplusplus
}
#endif
...
...
src/query/inc/qsqlparser.h
浏览文件 @
7f43265b
...
...
@@ -13,13 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_Q
ASTDEF
_H
#define TDENGINE_Q
ASTDEF
_H
#ifndef TDENGINE_Q
SQLPARSER
_H
#define TDENGINE_Q
SQLPARSER
_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <tstrbuild.h>
#include "taos.h"
#include "taosmsg.h"
#include "tstoken.h"
...
...
@@ -187,38 +188,19 @@ typedef struct SSqlInfo {
}
SSqlInfo
;
typedef
struct
tSQLExpr
{
/*
* for single operand:
* TK_ALL
* TK_ID
* TK_SUM
* TK_AVG
* TK_MIN
* TK_MAX
* TK_FIRST
* TK_LAST
* TK_BOTTOM
* TK_TOP
* TK_STDDEV
* TK_PERCENTILE
*
* for binary operand:
* TK_LESS
* TK_LARGE
* TK_EQUAL etc...
*/
uint32_t
nSQLOptr
;
// TK_FUNCTION: sql function, TK_LE: less than(binary expr)
// TK_FUNCTION: sql function, TK_LE: less than(binary expr)
uint32_t
nSQLOptr
;
// the full sql string of function(col, param), which is actually the raw
// field name, since the function name is kept in nSQLOptr already
SSQLToken
operand
;
struct
tSQLExprList
*
pParam
;
// function parameters
SSQLToken
operand
;
SSQLToken
colInfo
;
// field id
tVariant
val
;
// value only for string, float, int
SSQLToken
colInfo
;
// field i
d
tVariant
val
;
// value only for string, float, int
struct
tSQLExpr
*
pLeft
;
// left chil
d
struct
tSQLExpr
*
pRight
;
// right child
struct
tSQLExpr
*
pLeft
;
// left child
struct
tSQLExpr
*
pRight
;
// right child
struct
tSQLExprList
*
pParam
;
// function parameters
}
tSQLExpr
;
// used in select clause. select <tSQLExprList> from xxx
...
...
@@ -326,18 +308,20 @@ void tSQLSetColumnInfo(TAOS_FIELD *pField, SSQLToken *pName, TAOS_FIELD *pType);
void
tSQLSetColumnType
(
TAOS_FIELD
*
pField
,
SSQLToken
*
pToken
);
void
*
ParseAlloc
(
void
*
(
*
mallocProc
)(
size_t
));
// convert the sql filter expression into binary data
int32_t
tSQLExprToBinary
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
sb
);
enum
{
TSQL_NODE_TYPE_EXPR
=
0x1
,
TSQL_NODE_TYPE_ID
=
0x2
,
TSQL_NODE_TYPE_EXPR
=
0x1
,
TSQL_NODE_TYPE_ID
=
0x2
,
TSQL_NODE_TYPE_VALUE
=
0x4
,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
#define NORMAL_ARITHMETIC
1
#define AGG_ARIGHTMEIC
2
int32_t
tSQLParse
(
SSqlInfo
*
pSQLInfo
,
const
char
*
pSql
);
...
...
@@ -345,4 +329,4 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql);
}
#endif
#endif // TDENGINE_Q
ASTDEF
_H
#endif // TDENGINE_Q
SQLPARSER
_H
src/query/src/qast.c
浏览文件 @
7f43265b
此差异已折叠。
点击以展开。
src/query/src/qparserImpl.c
浏览文件 @
7f43265b
...
...
@@ -13,17 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <qsqltype.h>
#include "os.h"
#include "qsqlparser.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tstoken.h"
#include "ttime.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
#include "qsqltype.h"
#include "tstrbuild.h"
int32_t
tSQLParse
(
SSqlInfo
*
pSQLInfo
,
const
char
*
pStr
)
{
void
*
pParser
=
ParseAlloc
(
malloc
);
...
...
@@ -900,3 +901,173 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
memset
(
&
pDBInfo
->
precision
,
0
,
sizeof
(
SSQLToken
));
}
static
bool
isExprLeafNode
(
tSQLExpr
*
pExpr
)
{
return
(
pExpr
->
pRight
==
NULL
&&
pExpr
->
pLeft
==
NULL
)
&&
(
pExpr
->
nSQLOptr
==
TK_ID
||
(
pExpr
->
nSQLOptr
>=
TK_BOOL
&&
pExpr
->
nSQLOptr
<=
TK_NCHAR
)
||
pExpr
->
nSQLOptr
==
TK_SET
);
}
static
bool
isExprParentOfLeafNode
(
tSQLExpr
*
pExpr
)
{
return
(
pExpr
->
pLeft
!=
NULL
&&
pExpr
->
pRight
!=
NULL
)
&&
(
isExprLeafNode
(
pExpr
->
pLeft
)
&&
isExprLeafNode
(
pExpr
->
pRight
));
}
static
int32_t
tSQLExprNodeToString
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
if
(
pExpr
->
nSQLOptr
==
TK_ID
)
{
// column name
// strncpy(*str, pExpr->colInfo.z, pExpr->colInfo.n);
// *str += pExpr->colInfo.n;
}
else
if
(
pExpr
->
nSQLOptr
>=
TK_BOOL
&&
pExpr
->
nSQLOptr
<=
TK_STRING
)
{
// value
// *str += tVariantToString(&pExpr->val, *str);
// taosStringBuilderAppendStringLen()
}
else
if
(
pExpr
->
nSQLOptr
>=
TK_COUNT
&&
pExpr
->
nSQLOptr
<=
TK_AVG_IRATE
)
{
taosStringBuilderAppendStringLen
(
pBuilder
,
pExpr
->
operand
.
z
,
pExpr
->
operand
.
n
);
}
else
{
// not supported operation
assert
(
false
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
optrToString
(
tSQLExpr
*
pExpr
,
char
**
exprString
)
{
const
char
*
le
=
"<="
;
const
char
*
ge
=
">="
;
const
char
*
ne
=
"<>"
;
const
char
*
likeOptr
=
"LIKE"
;
switch
(
pExpr
->
nSQLOptr
)
{
case
TK_LE
:
{
*
(
int16_t
*
)(
*
exprString
)
=
*
(
int16_t
*
)
le
;
*
exprString
+=
1
;
break
;
}
case
TK_GE
:
{
*
(
int16_t
*
)(
*
exprString
)
=
*
(
int16_t
*
)
ge
;
*
exprString
+=
1
;
break
;
}
case
TK_NE
:
{
*
(
int16_t
*
)(
*
exprString
)
=
*
(
int16_t
*
)
ne
;
*
exprString
+=
1
;
break
;
}
case
TK_LT
:
*
(
*
exprString
)
=
'<'
;
break
;
case
TK_GT
:
*
(
*
exprString
)
=
'>'
;
break
;
case
TK_EQ
:
*
(
*
exprString
)
=
'='
;
break
;
case
TK_PLUS
:
*
(
*
exprString
)
=
'+'
;
break
;
case
TK_MINUS
:
*
(
*
exprString
)
=
'-'
;
break
;
case
TK_STAR
:
*
(
*
exprString
)
=
'*'
;
break
;
case
TK_DIVIDE
:
*
(
*
exprString
)
=
'/'
;
break
;
case
TK_REM
:
*
(
*
exprString
)
=
'%'
;
break
;
case
TK_LIKE
:
{
int32_t
len
=
sprintf
(
*
exprString
,
" %s "
,
likeOptr
);
*
exprString
+=
(
len
-
1
);
break
;
}
default:
return
TSDB_CODE_INVALID_SQL
;
}
*
exprString
+=
1
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tSQLExprLeafToBinary
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
if
(
!
isExprParentOfLeafNode
(
pExpr
))
{
return
TSDB_CODE_INVALID_SQL
;
}
tSQLExpr
*
pLeft
=
pExpr
->
pLeft
;
tSQLExpr
*
pRight
=
pExpr
->
pRight
;
// if (addParentheses) {
// *(*output) = '(';
// *output += 1;
// }
tSQLExprNodeToString
(
pLeft
,
pBuilder
);
tSQLExprNodeToString
(
pRight
,
pBuilder
);
if
(
optrToString
(
pExpr
,
pBuilder
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_INVALID_SQL
;
}
// if (addParentheses) {
// *(*output) = ')';
// *output += 1;
// }
return
TSDB_CODE_SUCCESS
;
}
static
void
relToString
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
assert
(
pExpr
->
nSQLOptr
==
TK_AND
||
pExpr
->
nSQLOptr
==
TK_OR
);
const
char
*
or
=
"OR"
;
const
char
*
and
=
"AND"
;
// if (pQueryInfo->tagCond.relType == TSQL_STABLE_QTYPE_COND) {
// if (pExpr->nSQLOptr == TK_AND) {
// strcpy(*str, and);
// *str += strlen(and);
// } else {
// strcpy(*str, or);
// *str += strlen(or);
// }
}
static
int32_t
doSQLExprToBinary
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
if
(
pExpr
==
NULL
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
!
isExprParentOfLeafNode
(
pExpr
))
{
// *(*str) = '(';
// *str += 1;
int32_t
ret
=
doSQLExprToBinary
(
pExpr
->
pLeft
,
pBuilder
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
ret
=
doSQLExprToBinary
(
pExpr
->
pRight
,
pBuilder
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
relToString
(
pExpr
,
pBuilder
);
// *(*str) = ')';
// *str += 1;
return
ret
;
}
return
tSQLExprLeafToBinary
(
pExpr
,
pBuilder
);
}
// post order seralize to binary data
int32_t
tSQLExprToBinary
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
assert
(
pExpr
!=
NULL
&&
pBuilder
!=
NULL
);
}
\ No newline at end of file
src/query/src/qtokenizer.c
浏览文件 @
7f43265b
...
...
@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "hash.h"
#include "hashfunc.h"
#include "os.h"
#include "shash.h"
#include "taosdef.h"
#include "tstoken.h"
#include "ttokendef.h"
...
...
src/query/src/queryExecutor.c
浏览文件 @
7f43265b
...
...
@@ -2603,13 +2603,12 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
static
int64_t
doScanAllDataBlocks
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int64_t
cnt
=
0
;
dTrace
(
"QInfo:%p query start, qrange:%"
PRId64
"-%"
PRId64
", lastkey:%"
PRId64
", order:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
lastKey
,
pQuery
->
order
.
order
);
tsdb_query_handle_t
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
// check if query is killed or not set the status of query to pass the status check
if
(
isQueryKilled
(
GET_QINFO_ADDR
(
pRuntimeEnv
)))
{
...
...
@@ -3595,8 +3594,8 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery
->
window
.
ekey
=
ekey
;
STimeWindow
win
=
{.
skey
=
pQuery
->
window
.
skey
,
.
ekey
=
pQuery
->
window
.
ekey
};
tsdbResetQuery
(
pRuntimeEnv
->
pQueryHandle
,
&
win
,
current
,
pQuery
->
order
.
order
);
tsdbNextDataBlock
(
pRuntimeEnv
->
pQueryHandle
);
//
tsdbResetQuery(pRuntimeEnv->pQueryHandle, &win, current, pQuery->order.order);
//
tsdbNextDataBlock(pRuntimeEnv->pQueryHandle);
}
void
doFinalizeResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
...
...
@@ -5149,7 +5148,7 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
int64_t
st
=
taosGetTimestampUs
();
// group by normal column, sliding window query, interval query are handled by interval query processor
if
(
pQuery
->
intervalTime
!=
0
||
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
// interval (down sampling operation)
if
(
isIntervalQuery
(
pQuery
)
||
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
// interval (down sampling operation)
tableIntervalProcessor
(
pQInfo
);
}
else
{
if
(
isFixedOutputQuery
(
pQuery
))
{
...
...
@@ -5461,7 +5460,7 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs
SSqlBinaryExprInfo
*
pBinaryExprInfo
=
&
pExpr
->
binExprInfo
;
SColumnInfo
*
pColMsg
=
pQueryMsg
->
colList
;
#if 0
t
SQLBinaryExpr
* pBinExpr = NULL;
t
ExprNode
* pBinExpr = NULL;
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
...
...
@@ -5962,7 +5961,7 @@ static void freeQInfo(SQInfo *pQInfo) {
if
(
pBinExprInfo
->
numOfCols
>
0
)
{
tfree
(
pBinExprInfo
->
pReqColumns
);
t
SQLBinaryExpr
Destroy
(
&
pBinExprInfo
->
pBinExpr
,
NULL
);
t
ExprTree
Destroy
(
&
pBinExprInfo
->
pBinExpr
,
NULL
);
}
}
...
...
src/query/src/queryUtil.c
浏览文件 @
7f43265b
...
...
@@ -21,10 +21,10 @@
#include "ttime.h"
#include "qinterpolation.h"
//#include "tscJoinProcess.h"
#include "ttime.h"
#include "queryExecutor.h"
#include "queryUtil.h"
int32_t
initWindowResInfo
(
SWindowResInfo
*
pWindowResInfo
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
size
,
int32_t
threshold
,
int16_t
type
)
{
...
...
src/query/tests/astTest.cpp
0 → 100644
浏览文件 @
7f43265b
此差异已折叠。
点击以展开。
src/util/inc/tbuffer.h
浏览文件 @
7f43265b
...
...
@@ -13,14 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stddef.h>
#include <stdint.h>
#include <stdbool.h>
#include <setjmp.h>
#ifndef TDENGINE_TBUFFER_H
#define TDENGINE_TBUFFER_H
#include "setjmp.h"
#include "os.h"
#ifdef __cplusplus
extern
"C"
{
#endif
/*
SBuffer can be used to read or write a buffer, but cannot be used for both
...
...
@@ -80,37 +81,33 @@ int main(int argc, char** argv) {
*/
typedef
struct
{
jmp_buf
jb
;
char
*
data
;
size_t
pos
;
size_t
size
;
char
*
data
;
size_t
pos
;
size_t
size
;
}
SBuffer
;
// common functions can be used in both read & write
#define tbufThrowError(buf, code) longjmp((buf)->jb, (code))
size_t
tbufTell
(
SBuffer
*
buf
);
size_t
tbufSeekTo
(
SBuffer
*
buf
,
size_t
pos
);
size_t
tbufSkip
(
SBuffer
*
buf
,
size_t
size
);
void
tbufClose
(
SBuffer
*
buf
,
bool
keepData
);
void
tbufClose
(
SBuffer
*
buf
,
bool
keepData
);
// basic read functions
#define tbufBeginRead(buf,
data, len) (((buf)->data = (char*)data), ((buf)->pos = 0), ((buf)->size = ((
data) == NULL) ? 0 : (len)), setjmp((buf)->jb))
char
*
tbufRead
(
SBuffer
*
buf
,
size_t
size
);
void
tbufReadToBuffer
(
SBuffer
*
buf
,
void
*
dst
,
size_t
size
);
#define tbufBeginRead(buf,
_data, len) ((buf)->data = (char*)(_data), ((buf)->pos = 0), ((buf)->size = ((_
data) == NULL) ? 0 : (len)), setjmp((buf)->jb))
char
*
tbufRead
(
SBuffer
*
buf
,
size_t
size
);
void
tbufReadToBuffer
(
SBuffer
*
buf
,
void
*
dst
,
size_t
size
);
const
char
*
tbufReadString
(
SBuffer
*
buf
,
size_t
*
len
);
size_t
tbufReadToString
(
SBuffer
*
buf
,
char
*
dst
,
size_t
size
);
size_t
tbufReadToString
(
SBuffer
*
buf
,
char
*
dst
,
size_t
size
);
// basic write functions
#define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb))
void
tbufEnsureCapacity
(
SBuffer
*
buf
,
size_t
size
);
void
tbufEnsureCapacity
(
SBuffer
*
buf
,
size_t
size
);
char
*
tbufGetData
(
SBuffer
*
buf
,
bool
takeOver
);
void
tbufWrite
(
SBuffer
*
buf
,
const
void
*
data
,
size_t
size
);
void
tbufWriteAt
(
SBuffer
*
buf
,
size_t
pos
,
const
void
*
data
,
size_t
size
);
void
tbufWriteStringLen
(
SBuffer
*
buf
,
const
char
*
str
,
size_t
len
);
void
tbufWriteString
(
SBuffer
*
buf
,
const
char
*
str
);
void
tbufWrite
(
SBuffer
*
buf
,
const
void
*
data
,
size_t
size
);
void
tbufWriteAt
(
SBuffer
*
buf
,
size_t
pos
,
const
void
*
data
,
size_t
size
);
void
tbufWriteStringLen
(
SBuffer
*
buf
,
const
char
*
str
,
size_t
len
);
void
tbufWriteString
(
SBuffer
*
buf
,
const
char
*
str
);
// read & write function for primitive types
#ifndef TBUFFER_DEFINE_FUNCTION
...
...
@@ -120,17 +117,21 @@ void tbufWriteString(SBuffer* buf, const char* str);
void tbufWrite##name##At(SBuffer* buf, size_t pos, type data);
#endif
TBUFFER_DEFINE_FUNCTION
(
bool
,
Bool
)
TBUFFER_DEFINE_FUNCTION
(
char
,
Char
)
TBUFFER_DEFINE_FUNCTION
(
int8_t
,
Int8
)
TBUFFER_DEFINE_FUNCTION
(
uint8_t
,
Unt8
)
TBUFFER_DEFINE_FUNCTION
(
int16_t
,
Int16
)
TBUFFER_DEFINE_FUNCTION
(
uint16_t
,
Uint16
)
TBUFFER_DEFINE_FUNCTION
(
int32_t
,
Int32
)
TBUFFER_DEFINE_FUNCTION
(
uint32_t
,
Uint32
)
TBUFFER_DEFINE_FUNCTION
(
int64_t
,
Int64
)
TBUFFER_DEFINE_FUNCTION
(
uint64_t
,
Uint64
)
TBUFFER_DEFINE_FUNCTION
(
float
,
Float
)
TBUFFER_DEFINE_FUNCTION
(
double
,
Double
)
TBUFFER_DEFINE_FUNCTION
(
bool
,
Bool
)
TBUFFER_DEFINE_FUNCTION
(
char
,
Char
)
TBUFFER_DEFINE_FUNCTION
(
int8_t
,
Int8
)
TBUFFER_DEFINE_FUNCTION
(
uint8_t
,
Unt8
)
TBUFFER_DEFINE_FUNCTION
(
int16_t
,
Int16
)
TBUFFER_DEFINE_FUNCTION
(
uint16_t
,
Uint16
)
TBUFFER_DEFINE_FUNCTION
(
int32_t
,
Int32
)
TBUFFER_DEFINE_FUNCTION
(
uint32_t
,
Uint32
)
TBUFFER_DEFINE_FUNCTION
(
int64_t
,
Int64
)
TBUFFER_DEFINE_FUNCTION
(
uint64_t
,
Uint64
)
TBUFFER_DEFINE_FUNCTION
(
float
,
Float
)
TBUFFER_DEFINE_FUNCTION
(
double
,
Double
)
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
src/util/inc/tstatus.h
浏览文件 @
7f43265b
...
...
@@ -62,6 +62,19 @@ enum _TSDB_VN_DROP_STATUS {
TSDB_VN_DROP_STATUS_DROPPING
};
enum
_TSDB_MN_STATUS
{
TSDB_MN_STATUS_OFFLINE
,
TSDB_MN_STATUS_UNSYNCED
,
TSDB_MN_STATUS_SYNCING
,
TSDB_MN_STATUS_SERVING
};
enum
_TSDB_MN_ROLE
{
TSDB_MN_ROLE_UNDECIDED
,
TSDB_MN_ROLE_SLAVE
,
TSDB_MN_ROLE_MASTER
};
enum
_TSDB_DN_STATUS
{
TSDB_DN_STATUS_OFFLINE
,
TSDB_DN_STATUS_READY
...
...
@@ -104,6 +117,8 @@ char* taosGetVgroupLbStatusStr(int32_t vglbStatus);
char
*
taosGetVnodeStreamStatusStr
(
int32_t
vnodeStreamStatus
);
char
*
taosGetTableStatusStr
(
int32_t
tableStatus
);
char
*
taosGetShowTypeStr
(
int32_t
showType
);
char
*
taosGetMnodeStatusStr
(
int32_t
mnodeStatus
);
char
*
taosGetMnodeRoleStr
(
int32_t
mnodeRole
);
#ifdef __cplusplus
}
...
...
src/util/src/.tqueue.c.swp
已删除
100644 → 0
浏览文件 @
d2effbae
文件已删除
src/util/src/tbuffer.c
浏览文件 @
7f43265b
...
...
@@ -30,7 +30,7 @@
tbufWriteAt(buf, pos, &data, sizeof(data));\
}
#include "
../inc/
tbuffer.h"
#include "tbuffer.h"
////////////////////////////////////////////////////////////////////////////////
...
...
@@ -119,13 +119,14 @@ void tbufEnsureCapacity(SBuffer* buf, size_t size) {
}
char
*
tbufGetData
(
SBuffer
*
buf
,
bool
takeOver
)
{
char
*
ret
=
buf
->
data
;
if
(
takeOver
)
{
buf
->
pos
=
0
;
buf
->
size
=
0
;
buf
->
data
=
NULL
;
}
return
ret
;
char
*
ret
=
buf
->
data
;
if
(
takeOver
)
{
buf
->
pos
=
0
;
buf
->
size
=
0
;
buf
->
data
=
NULL
;
}
return
ret
;
}
void
tbufEndWrite
(
SBuffer
*
buf
)
{
...
...
src/util/src/tstatus.c
浏览文件 @
7f43265b
...
...
@@ -136,3 +136,22 @@ char *taosGetShowTypeStr(int32_t showType) {
default:
return
"undefined"
;
}
}
char
*
taosGetMnodeStatusStr
(
int32_t
mnodeStatus
)
{
switch
(
mnodeStatus
)
{
case
TSDB_MN_STATUS_OFFLINE
:
return
"offline"
;
case
TSDB_MN_STATUS_UNSYNCED
:
return
"unsynced"
;
case
TSDB_MN_STATUS_SYNCING
:
return
"syncing"
;
case
TSDB_MN_STATUS_SERVING
:
return
"serving"
;
default:
return
"undefined"
;
}
}
char
*
taosGetMnodeRoleStr
(
int32_t
mnodeRole
)
{
switch
(
mnodeRole
)
{
case
TSDB_MN_ROLE_UNDECIDED
:
return
"undicided"
;
case
TSDB_MN_ROLE_SLAVE
:
return
"slave"
;
case
TSDB_MN_ROLE_MASTER
:
return
"master"
;
default:
return
"undefined"
;
}
}
src/vnode/tsdb/src/tsdbFile.c
浏览文件 @
7f43265b
...
...
@@ -147,6 +147,11 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct
}
void
tsdbSeekFileGroupIter
(
SFileGroupIter
*
pIter
,
int
fid
)
{
if
(
pIter
->
numOfFGroups
==
0
)
{
assert
(
pIter
->
pFileGroup
==
NULL
);
return
;
}
int
flags
=
(
pIter
->
direction
==
TSDB_FGROUP_ITER_FORWARD
)
?
TD_GE
:
TD_LE
;
void
*
ptr
=
taosbsearch
(
&
fid
,
pIter
->
base
,
sizeof
(
SFileGroup
),
pIter
->
numOfFGroups
,
compFGroupKey
,
flags
);
if
(
ptr
==
NULL
)
{
...
...
src/vnode/tsdb/src/tsdbRead.c
浏览文件 @
7f43265b
此差异已折叠。
点击以展开。
tests/script/general/show/dnodes.sim
0 → 100644
浏览文件 @
7f43265b
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== unsupport
sql_error create dnode 192.168.0.2
sql_error drop dnode 192.168.0.2
print =============== show dnodes
sql show dnodes;
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录