Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a82b728b
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a82b728b
编写于
5月 10, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-14481-3.0
上级
20ce1b79
56b25298
变更
54
展开全部
隐藏空白更改
内联
并排
Showing
54 changed file
with
974 addition
and
599 deletion
+974
-599
include/common/tmsgcb.h
include/common/tmsgcb.h
+1
-0
include/common/tname.h
include/common/tname.h
+13
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+7
-5
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+2
-1
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+2
-0
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+3
-1
source/client/src/clientSml.c
source/client/src/clientSml.c
+44
-61
source/common/src/tname.c
source/common/src/tname.c
+41
-0
source/dnode/mgmt/implement/inc/dmImp.h
source/dnode/mgmt/implement/inc/dmImp.h
+4
-2
source/dnode/mgmt/implement/src/dmExec.c
source/dnode/mgmt/implement/src/dmExec.c
+26
-15
source/dnode/mgmt/implement/src/dmHandle.c
source/dnode/mgmt/implement/src/dmHandle.c
+3
-2
source/dnode/mgmt/implement/src/dmObj.c
source/dnode/mgmt/implement/src/dmObj.c
+0
-3
source/dnode/mgmt/implement/src/dmTransport.c
source/dnode/mgmt/implement/src/dmTransport.c
+58
-75
source/dnode/mgmt/interface/inc/dmInt.h
source/dnode/mgmt/interface/inc/dmInt.h
+1
-1
source/dnode/mgmt/interface/src/dmInt.c
source/dnode/mgmt/interface/src/dmInt.c
+6
-4
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+0
-1
source/dnode/mnode/impl/src/mndQuery.c
source/dnode/mnode/impl/src/mndQuery.c
+1
-1
source/dnode/qnode/src/qnode.c
source/dnode/qnode/src/qnode.c
+1
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-0
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+2
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+3
-3
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-6
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+105
-103
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+6
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+3
-24
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+4
-1
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+5
-2
source/libs/function/inc/functionMgtInt.h
source/libs/function/inc/functionMgtInt.h
+1
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+20
-18
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+504
-213
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+10
-9
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+5
-0
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+1
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+3
-0
source/libs/parser/src/parAstParser.c
source/libs/parser/src/parAstParser.c
+1
-1
source/libs/parser/src/parTokenizer.c
source/libs/parser/src/parTokenizer.c
+2
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+6
-2
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+4
-1
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+1
-4
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+2
-1
source/libs/planner/test/planBasicTest.cpp
source/libs/planner/test/planBasicTest.cpp
+2
-0
source/libs/planner/test/planOptimizeTest.cpp
source/libs/planner/test/planOptimizeTest.cpp
+2
-0
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+2
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+2
-0
source/util/test/encodeTest.cpp
source/util/test/encodeTest.cpp
+1
-0
tests/script/sh/exec.sh
tests/script/sh/exec.sh
+2
-2
tests/script/test.sh
tests/script/test.sh
+2
-2
tests/script/tsim/insert/basic0.sim
tests/script/tsim/insert/basic0.sim
+2
-1
tests/system-test/2-query/diff.py
tests/system-test/2-query/diff.py
+47
-26
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+2
-2
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
include/common/tmsgcb.h
浏览文件 @
a82b728b
...
...
@@ -57,6 +57,7 @@ typedef struct {
RegisterBrokenLinkArgFp
registerBrokenLinkArgFp
;
ReleaseHandleFp
releaseHandleFp
;
ReportStartup
reportStartupFp
;
void
*
clientRpc
;
}
SMsgCb
;
void
tmsgSetDefaultMsgCb
(
const
SMsgCb
*
pMsgCb
);
...
...
include/common/tname.h
浏览文件 @
a82b728b
...
...
@@ -17,6 +17,7 @@
#define _TD_COMMON_NAME_H_
#include "tdef.h"
#include "tarray.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -62,6 +63,18 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId);
bool
tNameDBNameEqual
(
SName
*
left
,
SName
*
right
);
typedef
struct
{
// input
SArray
*
tags
;
// element is SSmlKV
const
char
*
sTableName
;
// super table name
uint8_t
sTableNameLen
;
// the length of super table name
// output
char
*
childTableName
;
// must have size of TSDB_TABLE_NAME_LEN;
uint64_t
uid
;
// child table uid, may be useful
}
RandTableName
;
void
buildChildTableName
(
RandTableName
*
rName
);
#ifdef __cplusplus
}
...
...
include/libs/executor/executor.h
浏览文件 @
a82b728b
...
...
@@ -22,6 +22,7 @@ extern "C" {
#include "query.h"
#include "tcommon.h"
#include "tmsgcb.h"
typedef
void
*
qTaskInfo_t
;
typedef
void
*
DataSinkHandle
;
...
...
@@ -29,11 +30,12 @@ struct SRpcMsg;
struct
SSubplan
;
typedef
struct
SReadHandle
{
void
*
reader
;
void
*
meta
;
void
*
config
;
void
*
vnode
;
void
*
mnd
;
void
*
reader
;
void
*
meta
;
void
*
config
;
void
*
vnode
;
void
*
mnd
;
SMsgCb
*
pMsgCb
;
}
SReadHandle
;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
...
...
include/libs/function/functionMgt.h
浏览文件 @
a82b728b
...
...
@@ -154,6 +154,7 @@ bool fmIsWindowClauseFunc(int32_t funcId);
bool
fmIsSpecialDataRequiredFunc
(
int32_t
funcId
);
bool
fmIsDynamicScanOptimizedFunc
(
int32_t
funcId
);
bool
fmIsMultiResFunc
(
int32_t
funcId
);
bool
fmIsRepeatScanFunc
(
int32_t
funcId
);
bool
fmIsUserDefinedFunc
(
int32_t
funcId
);
typedef
enum
EFuncDataRequired
{
...
...
@@ -170,7 +171,7 @@ int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t
fmGetUdafExecFuncs
(
int32_t
funcId
,
SFuncExecFuncs
*
pFpSet
);
int32_t
fmSetInvertFunc
(
int32_t
funcId
,
SFuncExecFuncs
*
pFpSet
);
int32_t
fmSetNormalFunc
(
int32_t
funcId
,
SFuncExecFuncs
*
pFpSet
);
bool
fmIsInvertible
(
int32_t
funcId
);
bool
fmIsInvertible
(
int32_t
funcId
);
#ifdef __cplusplus
}
...
...
include/libs/nodes/nodes.h
浏览文件 @
a82b728b
...
...
@@ -206,6 +206,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_FILL
,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
a82b728b
...
...
@@ -274,6 +274,8 @@ typedef struct SIntervalPhysiNode {
int8_t
slidingUnit
;
}
SIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamIntervalPhysiNode
;
typedef
struct
SFillPhysiNode
{
SPhysiNode
node
;
EFillMode
mode
;
...
...
include/libs/nodes/querynodes.h
浏览文件 @
a82b728b
...
...
@@ -233,6 +233,7 @@ typedef struct SSelectStmt {
uint8_t
precision
;
bool
isEmptyResult
;
bool
hasAggFuncs
;
bool
hasRepeatScanFuncs
;
bool
isTimeOrderQuery
;
}
SSelectStmt
;
...
...
include/libs/qcom/query.h
浏览文件 @
a82b728b
...
...
@@ -24,6 +24,7 @@ extern "C" {
#include "thash.h"
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
typedef
enum
{
JOB_TASK_STATUS_NULL
=
0
,
...
...
@@ -149,7 +150,8 @@ int32_t cleanupTaskQueue();
*/
int32_t
taosAsyncExec
(
__async_exec_fn_t
execFn
,
void
*
execParam
,
int32_t
*
code
);
int32_t
asyncSendMsgToServerExt
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
,
bool
persistHandle
,
void
*
ctx
);
int32_t
asyncSendMsgToServerExt
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
,
bool
persistHandle
,
void
*
ctx
);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
...
...
source/client/src/clientSml.c
浏览文件 @
a82b728b
...
...
@@ -15,6 +15,7 @@
#include "tcommon.h"
#include "catalog.h"
#include "clientInt.h"
#include "tname.h"
//=================================================================================================
#define SPACE ' '
...
...
@@ -97,6 +98,21 @@ typedef struct {
char
*
buf
;
}
SSmlMsgBuf
;
typedef
struct
{
int32_t
code
;
int32_t
lineNum
;
int32_t
numOfSTables
;
int32_t
numOfCTables
;
int32_t
numOfCreateSTables
;
int64_t
parseTime
;
int64_t
schemaTime
;
int64_t
insertBindTime
;
int64_t
insertRpcTime
;
int64_t
endTime
;
}
SSmlCostInfo
;
typedef
struct
{
uint64_t
id
;
...
...
@@ -114,6 +130,7 @@ typedef struct {
SRequestObj
*
pRequest
;
SQuery
*
pQuery
;
SSmlCostInfo
cost
;
int32_t
affectedRows
;
SSmlMsgBuf
msgBuf
;
SHashObj
*
dumplicateKey
;
// for dumplicate key
...
...
@@ -147,45 +164,6 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const
return
TSDB_CODE_SML_INVALID_DATA
;
}
static
int
smlCompareKv
(
const
void
*
p1
,
const
void
*
p2
)
{
SSmlKv
*
kv1
=
*
(
SSmlKv
**
)
p1
;
SSmlKv
*
kv2
=
*
(
SSmlKv
**
)
p2
;
int32_t
kvLen1
=
kv1
->
keyLen
;
int32_t
kvLen2
=
kv2
->
keyLen
;
int32_t
res
=
strncasecmp
(
kv1
->
key
,
kv2
->
key
,
TMIN
(
kvLen1
,
kvLen2
));
if
(
res
!=
0
)
{
return
res
;
}
else
{
return
kvLen1
-
kvLen2
;
}
}
static
void
smlBuildChildTableName
(
SSmlTableInfo
*
tags
)
{
int32_t
size
=
taosArrayGetSize
(
tags
->
tags
);
ASSERT
(
size
>
0
);
taosArraySort
(
tags
->
tags
,
smlCompareKv
);
SStringBuilder
sb
=
{
0
};
taosStringBuilderAppendStringLen
(
&
sb
,
tags
->
sTableName
,
tags
->
sTableNameLen
);
for
(
int
j
=
0
;
j
<
size
;
++
j
)
{
SSmlKv
*
tagKv
=
taosArrayGetP
(
tags
->
tags
,
j
);
taosStringBuilderAppendStringLen
(
&
sb
,
tagKv
->
key
,
tagKv
->
keyLen
);
taosStringBuilderAppendStringLen
(
&
sb
,
tagKv
->
value
,
tagKv
->
valueLen
);
}
size_t
len
=
0
;
char
*
keyJoined
=
taosStringBuilderGetResult
(
&
sb
,
&
len
);
T_MD5_CTX
context
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
keyJoined
,
(
uint32_t
)
len
);
tMD5Final
(
&
context
);
uint64_t
digest1
=
*
(
uint64_t
*
)(
context
.
digest
);
//uint64_t digest2 = *(uint64_t*)(context.digest + 8);
//snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
snprintf
(
tags
->
childTableName
,
TSDB_TABLE_NAME_LEN
,
"t_%016"
PRIx64
,
digest1
);
taosStringBuilderDestroy
(
&
sb
);
tags
->
uid
=
digest1
;
}
static
int32_t
smlGenerateSchemaAction
(
SSchema
*
pointColField
,
SHashObj
*
dbAttrHash
,
SArray
*
dbAttrArray
,
bool
isTag
,
char
sTableName
[],
SSchemaAction
*
action
,
bool
*
actionNeeded
,
SSmlHandle
*
info
)
{
// char fieldName[TSDB_COL_NAME_LEN] = {0};
...
...
@@ -444,6 +422,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
uError
(
"SML:0x%"
PRIx64
" catalogGetSTableMeta failed. super table name %s"
,
info
->
id
,
schemaAction
.
createSTable
.
sTableName
);
return
code
;
}
info
->
cost
.
numOfCreateSTables
++
;
}
else
if
(
code
==
TSDB_CODE_SUCCESS
)
{
}
else
{
uError
(
"SML:0x%"
PRIx64
" load table meta error: %s"
,
info
->
id
,
tstrerror
(
code
));
...
...
@@ -926,20 +905,6 @@ static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
return
false
;
}
static
bool
checkDuplicateKey
(
char
*
key
,
SHashObj
*
pHash
,
SSmlHandle
*
info
)
{
char
*
val
=
NULL
;
val
=
taosHashGet
(
pHash
,
key
,
strlen
(
key
));
if
(
val
)
{
uError
(
"SML:0x%"
PRIx64
" Duplicate key detected:%s"
,
info
->
id
,
key
);
return
true
;
}
uint8_t
dummy_val
=
0
;
taosHashPut
(
pHash
,
key
,
strlen
(
key
),
&
dummy_val
,
sizeof
(
uint8_t
));
return
false
;
}
static
int32_t
smlParseString
(
const
char
*
sql
,
SSmlLineInfo
*
elements
,
SSmlMsgBuf
*
msg
){
if
(
!
sql
)
return
TSDB_CODE_SML_INVALID_DATA
;
while
(
*
sql
!=
'\0'
)
{
// jump the space at the begining
...
...
@@ -1546,8 +1511,10 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
tinfo
->
sTableName
=
elements
.
measure
;
tinfo
->
sTableNameLen
=
elements
.
measureLen
;
smlBuildChildTableName
(
tinfo
);
uDebug
(
"SML:0x%"
PRIx64
" child table name: %s"
,
info
->
id
,
tinfo
->
childTableName
);
RandTableName
rName
=
{.
tags
=
tinfo
->
tags
,
.
sTableName
=
tinfo
->
sTableName
,
.
sTableNameLen
=
tinfo
->
sTableNameLen
,
.
childTableName
=
tinfo
->
childTableName
};
buildChildTableName
(
&
rName
);
tinfo
->
uid
=
rName
.
uid
;
SSmlSTableMeta
**
tableMeta
=
taosHashGet
(
info
->
superTables
,
elements
.
measure
,
elements
.
measureLen
);
if
(
tableMeta
){
// update meta
...
...
@@ -1604,7 +1571,7 @@ static void smlDestroyInfo(SSmlHandle* info){
static
SSmlHandle
*
smlBuildSmlInfo
(
TAOS
*
taos
,
SRequestObj
*
request
,
SMLProtocolType
protocol
,
int8_t
precision
,
bool
dataFormat
){
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSmlHandle
*
info
=
taosMemory
Malloc
(
sizeof
(
SSmlHandle
));
SSmlHandle
*
info
=
taosMemory
Calloc
(
1
,
sizeof
(
SSmlHandle
));
if
(
NULL
==
info
)
{
return
NULL
;
}
...
...
@@ -1699,12 +1666,23 @@ static int32_t smlInsertData(SSmlHandle* info) {
}
smlBuildOutput
(
info
->
exec
,
info
->
pVgHash
);
info
->
cost
.
insertRpcTime
=
taosGetTimestampUs
();
launchQueryImpl
(
info
->
pRequest
,
info
->
pQuery
,
TSDB_CODE_SUCCESS
,
true
);
info
->
affectedRows
=
taos_affected_rows
(
info
->
pRequest
);
return
info
->
pRequest
->
code
;
}
static
void
smlPrintStatisticInfo
(
SSmlHandle
*
info
){
uError
(
"SML:0x%"
PRIx64
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
parse cost:%"
PRId64
",schema cost:%"
PRId64
",bind cost:%"
PRId64
",rpc cost:%"
PRId64
",total cost:%"
PRId64
""
,
info
->
id
,
info
->
cost
.
code
,
info
->
cost
.
lineNum
,
info
->
cost
.
numOfSTables
,
info
->
cost
.
numOfCTables
,
info
->
cost
.
numOfCreateSTables
,
info
->
cost
.
schemaTime
-
info
->
cost
.
parseTime
,
info
->
cost
.
insertBindTime
-
info
->
cost
.
schemaTime
,
info
->
cost
.
insertRpcTime
-
info
->
cost
.
insertBindTime
,
info
->
cost
.
endTime
-
info
->
cost
.
insertRpcTime
,
info
->
cost
.
endTime
-
info
->
cost
.
parseTime
);
}
static
int
smlInsertLines
(
SSmlHandle
*
info
,
char
*
lines
[],
int
numLines
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -1714,6 +1692,7 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
goto
cleanup
;
}
info
->
cost
.
parseTime
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
numLines
;
++
i
)
{
code
=
smlParseLine
(
info
,
lines
[
i
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1721,24 +1700,29 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
goto
cleanup
;
}
}
uDebug
(
"SML:0x%"
PRIx64
" smlInsertLines parse success. tables %d"
,
info
->
id
,
taosHashGetSize
(
info
->
childTables
));
uDebug
(
"SML:0x%"
PRIx64
" smlInsertLines parse success. super tables %d"
,
info
->
id
,
taosHashGetSize
(
info
->
superTables
));
info
->
cost
.
lineNum
=
numLines
;
info
->
cost
.
numOfSTables
=
taosHashGetSize
(
info
->
superTables
);
info
->
cost
.
numOfCTables
=
taosHashGetSize
(
info
->
childTables
);
info
->
cost
.
schemaTime
=
taosGetTimestampUs
();
code
=
smlModifyDBSchemas
(
info
);
if
(
code
!=
0
)
{
uError
(
"SML:0x%"
PRIx64
" smlModifyDBSchemas error : %s"
,
info
->
id
,
tstrerror
(
code
));
goto
cleanup
;
}
info
->
cost
.
insertBindTime
=
taosGetTimestampUs
();
code
=
smlInsertData
(
info
);
if
(
code
!=
0
)
{
uError
(
"SML:0x%"
PRIx64
" smlInsertData error : %s"
,
info
->
id
,
tstrerror
(
code
));
goto
cleanup
;
}
uDebug
(
"SML:0x%"
PRIx64
" smlInsertLines finish inserting %d lines."
,
info
->
id
,
numLines
);
info
->
cost
.
endTime
=
taosGetTimestampUs
();
cleanup:
info
->
cost
.
code
=
code
;
smlPrintStatisticInfo
(
info
);
return
code
;
}
...
...
@@ -1790,7 +1774,6 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
}
smlDestroyInfo
(
info
);
end:
return
(
TAOS_RES
*
)
request
;
}
source/common/src/tname.c
浏览文件 @
a82b728b
...
...
@@ -15,6 +15,8 @@
#define _DEFAULT_SOURCE
#include "tname.h"
#include "tcommon.h"
#include "tstrbuild.h"
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
...
...
@@ -294,4 +296,43 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return
0
;
}
static
int
compareKv
(
const
void
*
p1
,
const
void
*
p2
)
{
SSmlKv
*
kv1
=
*
(
SSmlKv
**
)
p1
;
SSmlKv
*
kv2
=
*
(
SSmlKv
**
)
p2
;
int32_t
kvLen1
=
kv1
->
keyLen
;
int32_t
kvLen2
=
kv2
->
keyLen
;
int32_t
res
=
strncasecmp
(
kv1
->
key
,
kv2
->
key
,
TMIN
(
kvLen1
,
kvLen2
));
if
(
res
!=
0
)
{
return
res
;
}
else
{
return
kvLen1
-
kvLen2
;
}
}
/*
* use stable name and tags to grearate child table name
*/
void
buildChildTableName
(
RandTableName
*
rName
)
{
int32_t
size
=
taosArrayGetSize
(
rName
->
tags
);
ASSERT
(
size
>
0
);
taosArraySort
(
rName
->
tags
,
compareKv
);
SStringBuilder
sb
=
{
0
};
taosStringBuilderAppendStringLen
(
&
sb
,
rName
->
sTableName
,
rName
->
sTableNameLen
);
for
(
int
j
=
0
;
j
<
size
;
++
j
)
{
SSmlKv
*
tagKv
=
taosArrayGetP
(
rName
->
tags
,
j
);
taosStringBuilderAppendStringLen
(
&
sb
,
tagKv
->
key
,
tagKv
->
keyLen
);
taosStringBuilderAppendStringLen
(
&
sb
,
tagKv
->
value
,
tagKv
->
valueLen
);
}
size_t
len
=
0
;
char
*
keyJoined
=
taosStringBuilderGetResult
(
&
sb
,
&
len
);
T_MD5_CTX
context
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
keyJoined
,
(
uint32_t
)
len
);
tMD5Final
(
&
context
);
uint64_t
digest1
=
*
(
uint64_t
*
)(
context
.
digest
);
uint64_t
digest2
=
*
(
uint64_t
*
)(
context
.
digest
+
8
);
snprintf
(
rName
->
childTableName
,
TSDB_TABLE_NAME_LEN
,
"t_%016"
PRIx64
"%016"
PRIx64
,
digest1
,
digest2
);
taosStringBuilderDestroy
(
&
sb
);
rName
->
uid
=
digest1
;
}
source/dnode/mgmt/implement/inc/dmImp.h
浏览文件 @
a82b728b
...
...
@@ -26,8 +26,10 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper);
void
dmCloseNode
(
SMgmtWrapper
*
pWrapper
);
// dmTransport.c
int32_t
dmInitTrans
(
SDnode
*
pDnode
);
void
dmCleanupTrans
(
SDnode
*
pDnode
);
int32_t
dmInitServer
(
SDnode
*
pDnode
);
void
dmCleanupServer
(
SDnode
*
pDnode
);
int32_t
dmInitClient
(
SDnode
*
pDnode
);
void
dmCleanupClient
(
SDnode
*
pDnode
);
SProcCfg
dmGenProcCfg
(
SMgmtWrapper
*
pWrapper
);
SMsgCb
dmGetMsgcb
(
SMgmtWrapper
*
pWrapper
);
int32_t
dmInitMsgHandle
(
SDnode
*
pDnode
);
...
...
source/dnode/mgmt/implement/src/dmExec.c
浏览文件 @
a82b728b
...
...
@@ -213,10 +213,12 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
}
pWrapper
->
procType
=
DND_PROC_CHILD
;
if
(
dmInitClient
(
pDnode
)
!=
0
)
{
return
-
1
;
}
SMsgCb
msgCb
=
pDnode
->
data
.
msgCb
;
msgCb
.
pWrapper
=
pWrapper
;
tmsgSetDefaultMsgCb
(
&
msgCb
);
pDnode
->
data
.
msgCb
=
dmGetMsgcb
(
pWrapper
);
tmsgSetDefaultMsgCb
(
&
pDnode
->
data
.
msgCb
);
if
(
dmOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to open since %s"
,
pWrapper
->
name
,
terrstr
());
...
...
@@ -234,6 +236,15 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
pWrapper
->
procType
=
DND_PROC_SINGLE
;
}
if
(
n
==
DNODE
)
{
if
(
dmInitClient
(
pDnode
)
!=
0
)
{
return
-
1
;
}
pDnode
->
data
.
msgCb
=
dmGetMsgcb
(
pWrapper
);
tmsgSetDefaultMsgCb
(
&
pDnode
->
data
.
msgCb
);
}
if
(
dmOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to open since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -281,21 +292,21 @@ static void dmProcessProcHandle(void *handle) {
}
static
void
dmWatchNodes
(
SDnode
*
pDnode
)
{
if
(
pDnode
->
ptype
!=
DND_PROC_PARENT
)
return
;
if
(
pDnode
->
ntype
==
NODE_END
)
return
;
taosThreadMutexLock
(
&
pDnode
->
mutex
);
if
(
pDnode
->
ptype
==
DND_PROC_PARENT
)
{
for
(
EDndNodeType
n
=
DNODE
+
1
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
pWrapper
->
procType
!=
DND_PROC_PARENT
)
continue
;
if
(
pDnode
->
ntype
==
NODE_END
)
continue
;
for
(
EDndNodeType
n
=
DNODE
+
1
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
pWrapper
->
procType
!=
DND_PROC_PARENT
)
continue
;
if
(
pWrapper
->
procId
<=
0
||
!
taosProcExist
(
pWrapper
->
procId
))
{
dWarn
(
"node:%s, process:%d is killed and needs to be restarted"
,
pWrapper
->
name
,
pWrapper
->
procId
);
if
(
pWrapper
->
procObj
)
{
taosProcCloseHandles
(
pWrapper
->
procObj
,
dmProcessProcHandle
);
}
dmNewNodeProc
(
pWrapper
,
n
);
if
(
pWrapper
->
procId
<=
0
||
!
taosProcExist
(
pWrapper
->
procId
))
{
dWarn
(
"node:%s, process:%d is killed and needs to be restarted"
,
pWrapper
->
name
,
pWrapper
->
procId
);
if
(
pWrapper
->
procObj
)
{
taosProcCloseHandles
(
pWrapper
->
procObj
,
dmProcessProcHandle
);
}
dmNewNodeProc
(
pWrapper
,
n
);
}
}
taosThreadMutexUnlock
(
&
pDnode
->
mutex
);
...
...
source/dnode/mgmt/implement/src/dmHandle.c
浏览文件 @
a82b728b
...
...
@@ -242,7 +242,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
return
-
1
;
}
if
(
dmInit
Trans
(
pDnode
)
!=
0
)
{
if
(
dmInit
Server
(
pDnode
)
!=
0
)
{
dError
(
"failed to init transport since %s"
,
terrstr
());
return
-
1
;
}
...
...
@@ -275,7 +275,8 @@ static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
}
taosWUnLockLatch
(
&
pDnode
->
data
.
latch
);
dmCleanupTrans
(
pDnode
);
dmCleanupClient
(
pDnode
);
dmCleanupServer
(
pDnode
);
dInfo
(
"dnode-mgmt is cleaned up"
);
}
...
...
source/dnode/mgmt/implement/src/dmObj.c
浏览文件 @
a82b728b
...
...
@@ -124,9 +124,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto
_OVER
;
}
pDnode
->
data
.
msgCb
=
dmGetMsgcb
(
&
pDnode
->
wrappers
[
DNODE
]);
tmsgSetDefaultMsgCb
(
&
pDnode
->
data
.
msgCb
);
dInfo
(
"dnode is created, data:%p"
,
pDnode
);
code
=
0
;
...
...
source/dnode/mgmt/implement/src/dmTransport.c
浏览文件 @
a82b728b
...
...
@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE
#include "dmImp.h"
#include "qworker.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
...
...
@@ -85,17 +87,14 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
if
((
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)))
==
NULL
)
goto
_OVER
;
if
(
dmBuildMsg
(
pMsg
,
pRpc
)
!=
0
)
goto
_OVER
;
if
(
pWrapper
->
procType
==
DND_PROC_SINGLE
)
{
if
(
pWrapper
->
procType
!=
DND_PROC_PARENT
)
{
dTrace
(
"msg:%p, created, type:%s handle:%p user:%s"
,
pMsg
,
TMSG_INFO
(
msgType
),
pRpc
->
handle
,
pMsg
->
user
);
code
=
(
*
msgFp
)(
pWrapper
,
pMsg
);
}
else
if
(
pWrapper
->
procType
==
DND_PROC_PARENT
)
{
}
else
{
dTrace
(
"msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d"
,
pMsg
,
TMSG_INFO
(
msgType
),
pRpc
->
handle
,
pMsg
->
rpcMsg
.
code
&
0XFFFF
,
pMsg
->
user
,
pRpc
->
contLen
);
code
=
taosProcPutToChildQ
(
pWrapper
->
procObj
,
pMsg
,
sizeof
(
SNodeMsg
),
pRpc
->
pCont
,
pRpc
->
contLen
,
(
isReq
&&
(
pMsg
->
rpcMsg
.
code
==
0
))
?
pRpc
->
handle
:
NULL
,
pRpc
->
refId
,
PROC_FUNC_REQ
);
}
else
{
dTrace
(
"msg:%p, should not processed in child process, handle:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pMsg
->
user
);
ASSERT
(
1
);
}
_OVER:
...
...
@@ -108,7 +107,7 @@ _OVER:
}
else
{
dError
(
"msg:%p, type:%s handle:%p failed to process since 0x%04x:%s"
,
pMsg
,
TMSG_INFO
(
msgType
),
pRpc
->
handle
,
code
&
0XFFFF
,
terrstr
());
if
(
msgType
&
1U
)
{
if
(
isReq
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
if
(
code
==
TSDB_CODE_NODE_NOT_DEPLOYED
||
code
==
TSDB_CODE_NODE_OFFLINE
)
{
if
(
msgType
>
TDMT_MND_MSG
&&
msgType
<
TDMT_VND_MSG
)
{
...
...
@@ -130,22 +129,27 @@ _OVER:
}
static
void
dmProcessMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
tmsg_t
msgType
=
pMsg
->
msgType
;
bool
isReq
=
msgType
&
1u
;
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMgmtWrapper
*
pWrapper
=
pHandle
->
pNdWrapper
;
if
(
msgType
==
TDMT_DND_SERVER_STATUS
)
{
dTrace
(
"server status req will be processed, handle:%p, app:%p"
,
pMsg
->
handle
,
pMsg
->
ahandle
);
dmProcessServerStatusReq
(
pDnode
,
pMsg
);
return
;
}
if
(
msgType
==
TDMT_DND_NET_TEST
)
{
dTrace
(
"net test req will be processed, handle:%p, app:%p"
,
pMsg
->
handle
,
pMsg
->
ahandle
);
dmProcessServerStatusReq
(
pDnode
,
pMsg
);
return
;
switch
(
msgType
)
{
case
TDMT_DND_SERVER_STATUS
:
dTrace
(
"server status req will be processed, handle:%p, app:%p"
,
pMsg
->
handle
,
pMsg
->
ahandle
);
dmProcessServerStatusReq
(
pDnode
,
pMsg
);
return
;
case
TDMT_DND_NET_TEST
:
dTrace
(
"net test req will be processed, handle:%p, app:%p"
,
pMsg
->
handle
,
pMsg
->
ahandle
);
dmProcessNetTestReq
(
pDnode
,
pMsg
);
return
;
case
TDMT_MND_SYSTABLE_RETRIEVE_RSP
:
case
TDMT_VND_FETCH_RSP
:
dTrace
(
"retrieve rsp is received"
);
qWorkerProcessFetchRsp
(
NULL
,
NULL
,
pMsg
);
pMsg
->
pCont
=
NULL
;
// already freed in qworker
return
;
}
if
(
pDnode
->
status
!=
DND_STAT_RUNNING
)
{
...
...
@@ -233,16 +237,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return
0
;
}
static
inline
int32_t
dmSendRpcReq
(
SDnode
*
pDnode
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
if
(
pDnode
->
trans
.
clientRpc
==
NULL
)
{
terrno
=
TSDB_CODE_NODE_OFFLINE
;
return
-
1
;
}
rpcSendRequest
(
pDnode
->
trans
.
clientRpc
,
pEpSet
,
pReq
,
NULL
);
return
0
;
}
static
void
dmSendRpcRedirectRsp
(
SDnode
*
pDnode
,
const
SRpcMsg
*
pReq
)
{
SEpSet
epSet
=
{
0
};
dmGetMnodeEpSet
(
pDnode
,
&
epSet
);
...
...
@@ -288,28 +282,20 @@ void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
}
static
inline
int32_t
dmSendReq
(
SMgmtWrapper
*
pWrapper
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
if
(
pWrapper
->
pDnode
->
status
!=
DND_STAT_RUNNING
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
if
(
pDnode
->
status
!=
DND_STAT_RUNNING
)
{
terrno
=
TSDB_CODE_NODE_OFFLINE
;
dError
(
"failed to send rpc msg since %s, handle:%p"
,
terrstr
(),
pReq
->
handle
);
return
-
1
;
}
if
(
pWrapper
->
procType
!=
DND_PROC_CHILD
)
{
return
dmSendRpcReq
(
pWrapper
->
pDnode
,
pEpSet
,
pReq
);
}
else
{
char
*
pHead
=
taosMemoryMalloc
(
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
));
if
(
pHead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
pHead
,
pReq
,
sizeof
(
SRpcMsg
));
memcpy
(
pHead
+
sizeof
(
SRpcMsg
),
pEpSet
,
sizeof
(
SEpSet
));
taosProcPutToParentQ
(
pWrapper
->
procObj
,
pHead
,
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
),
pReq
->
pCont
,
pReq
->
contLen
,
PROC_FUNC_REQ
);
taosMemoryFree
(
pHead
);
return
0
;
if
(
pDnode
->
trans
.
clientRpc
==
NULL
)
{
terrno
=
TSDB_CODE_NODE_OFFLINE
;
return
-
1
;
}
rpcSendRequest
(
pDnode
->
trans
.
clientRpc
,
pEpSet
,
pReq
,
NULL
);
return
0
;
}
static
inline
void
dmSendRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
)
{
...
...
@@ -396,9 +382,10 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
pMsg
->
pCont
=
pCont
;
if
(
ftype
==
PROC_FUNC_REQ
)
{
ASSERT
(
1
);
dTrace
(
"msg:%p, get from parent queue, send req:%s handle:%p code:0x%04x, app:%p"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
handle
,
code
,
pMsg
->
ahandle
);
dmSendR
pcReq
(
pWrapper
->
pDnode
,
(
SEpSet
*
)((
char
*
)
pMsg
+
sizeof
(
SRpcMsg
)),
pMsg
);
dmSendR
eq
(
pWrapper
,
(
SEpSet
*
)((
char
*
)
pMsg
+
sizeof
(
SRpcMsg
)),
pMsg
);
}
else
if
(
ftype
==
PROC_FUNC_RSP
)
{
dTrace
(
"msg:%p, get from parent queue, rsp handle:%p code:0x%04x, app:%p"
,
pMsg
,
pMsg
->
handle
,
code
,
pMsg
->
ahandle
);
pMsg
->
refId
=
taosProcRemoveHandle
(
pWrapper
->
procObj
,
pMsg
->
handle
);
...
...
@@ -421,23 +408,25 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
}
SProcCfg
dmGenProcCfg
(
SMgmtWrapper
*
pWrapper
)
{
SProcCfg
cfg
=
{.
childConsumeFp
=
(
ProcConsumeFp
)
dmConsumeChildQueue
,
.
childMallocHeadFp
=
(
ProcMallocFp
)
taosAllocateQitem
,
.
childFreeHeadFp
=
(
ProcFreeFp
)
taosFreeQitem
,
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentConsumeFp
=
(
ProcConsumeFp
)
dmConsumeParentQueue
,
.
parentMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
procShm
,
.
parent
=
pWrapper
,
.
name
=
pWrapper
->
name
};
SProcCfg
cfg
=
{
.
childConsumeFp
=
(
ProcConsumeFp
)
dmConsumeChildQueue
,
.
childMallocHeadFp
=
(
ProcMallocFp
)
taosAllocateQitem
,
.
childFreeHeadFp
=
(
ProcFreeFp
)
taosFreeQitem
,
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentConsumeFp
=
(
ProcConsumeFp
)
dmConsumeParentQueue
,
.
parentMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
procShm
,
.
parent
=
pWrapper
,
.
name
=
pWrapper
->
name
,
};
return
cfg
;
}
bool
rpcRfp
(
int32_t
code
)
{
static
bool
rpcRfp
(
int32_t
code
)
{
if
(
code
==
TSDB_CODE_RPC_REDIRECT
)
{
return
true
;
}
else
{
...
...
@@ -445,7 +434,7 @@ bool rpcRfp(int32_t code) {
}
}
static
int32_t
dmInitClient
(
SDnode
*
pDnode
)
{
int32_t
dmInitClient
(
SDnode
*
pDnode
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
=
{
0
};
...
...
@@ -471,11 +460,15 @@ static int32_t dmInitClient(SDnode *pDnode) {
return
-
1
;
}
pDnode
->
data
.
msgCb
=
dmGetMsgcb
(
&
pDnode
->
wrappers
[
DNODE
]);
tmsgSetDefaultMsgCb
(
&
pDnode
->
data
.
msgCb
);
dDebug
(
"dnode rpc client is initialized"
);
return
0
;
}
static
void
dmCleanupClient
(
SDnode
*
pDnode
)
{
void
dmCleanupClient
(
SDnode
*
pDnode
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
if
(
pTrans
->
clientRpc
)
{
rpcClose
(
pTrans
->
clientRpc
);
...
...
@@ -517,7 +510,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq
authReq
=
{
0
};
tstrncpy
(
authReq
.
user
,
user
,
TSDB_USER_LEN
);
int32_t
contLen
=
tSerializeSAuthReq
(
NULL
,
0
,
&
authReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSAuthReq
(
pReq
,
contLen
,
&
authReq
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_AUTH
,
.
ahandle
=
(
void
*
)
9528
};
...
...
@@ -543,7 +536,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
return
rpcRsp
.
code
;
}
static
int32_t
dmInitServer
(
SDnode
*
pDnode
)
{
int32_t
dmInitServer
(
SDnode
*
pDnode
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
=
{
0
};
...
...
@@ -569,7 +562,7 @@ static int32_t dmInitServer(SDnode *pDnode) {
return
0
;
}
static
void
dmCleanupServer
(
SDnode
*
pDnode
)
{
void
dmCleanupServer
(
SDnode
*
pDnode
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
if
(
pTrans
->
serverRpc
)
{
rpcClose
(
pTrans
->
serverRpc
);
...
...
@@ -578,17 +571,6 @@ static void dmCleanupServer(SDnode *pDnode) {
}
}
int32_t
dmInitTrans
(
SDnode
*
pDnode
)
{
if
(
dmInitServer
(
pDnode
)
!=
0
)
return
-
1
;
if
(
dmInitClient
(
pDnode
)
!=
0
)
return
-
1
;
return
0
;
}
void
dmCleanupTrans
(
SDnode
*
pDnode
)
{
dmCleanupServer
(
pDnode
);
dmCleanupClient
(
pDnode
);
}
SMsgCb
dmGetMsgcb
(
SMgmtWrapper
*
pWrapper
)
{
SMsgCb
msgCb
=
{
.
sendReqFp
=
dmSendReq
,
...
...
@@ -598,6 +580,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
.
releaseHandleFp
=
dmReleaseHandle
,
.
reportStartupFp
=
dmReportStartupByWrapper
,
.
pWrapper
=
pWrapper
,
.
clientRpc
=
pWrapper
->
pDnode
->
trans
.
clientRpc
,
};
return
msgCb
;
}
source/dnode/mgmt/interface/inc/dmInt.h
浏览文件 @
a82b728b
...
...
@@ -37,7 +37,7 @@ void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgF
void
dmReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
);
void
dmReportStartupByWrapper
(
SMgmtWrapper
*
pWrapper
,
const
char
*
pName
,
const
char
*
pDesc
);
void
dmProcessServerStatusReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmProcessNet
t
estReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmProcessNet
T
estReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmGetMonitorSysInfo
(
SMonSysInfo
*
pInfo
);
// dmFile.c
...
...
source/dnode/mgmt/interface/src/dmInt.c
浏览文件 @
a82b728b
...
...
@@ -171,16 +171,17 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
}
}
void
dmProcessNet
testReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpc
)
{
void
dmProcessNet
TestReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
dDebug
(
"net test req is received"
);
SRpcMsg
rsp
=
{.
handle
=
pR
pc
->
handle
,
.
refId
=
pRpc
->
refId
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
0
};
rsp
.
pCont
=
rpcMallocCont
(
pR
pc
->
contLen
);
SRpcMsg
rsp
=
{.
handle
=
pR
eq
->
handle
,
.
refId
=
pReq
->
refId
,
.
ahandle
=
pReq
->
ahandle
,
.
code
=
0
};
rsp
.
pCont
=
rpcMallocCont
(
pR
eq
->
contLen
);
if
(
rsp
.
pCont
==
NULL
)
{
rsp
.
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
rsp
.
contLen
=
pR
pc
->
contLen
;
rsp
.
contLen
=
pR
eq
->
contLen
;
}
rpcSendResponse
(
&
rsp
);
rpcFreeCont
(
pReq
->
pCont
);
}
void
dmProcessServerStatusReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
...
...
@@ -208,6 +209,7 @@ void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
_OVER:
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pReq
->
pCont
);
}
void
dmGetMonitorSysInfo
(
SMonSysInfo
*
pInfo
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
a82b728b
...
...
@@ -289,7 +289,6 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY
,
vmProcessQueryMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY_CONTINUE
,
vmProcessQueryMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_FETCH
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_FETCH_RSP
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_ALTER_TABLE
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_UPDATE_TAG_VAL
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_TABLE_META
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mnode/impl/src/mndQuery.c
浏览文件 @
a82b728b
...
...
@@ -20,7 +20,7 @@
int32_t
mndProcessQueryMsg
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SReadHandle
handle
=
{.
mnd
=
pMnode
};
SReadHandle
handle
=
{.
mnd
=
pMnode
,
.
pMsgCb
=
&
pMnode
->
msgCb
};
mTrace
(
"msg:%p, in query queue is processing"
,
pReq
);
switch
(
pReq
->
rpcMsg
.
msgType
)
{
...
...
source/dnode/qnode/src/qnode.c
浏览文件 @
a82b728b
...
...
@@ -51,7 +51,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
int32_t
qndProcessQueryMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
)
{
qTrace
(
"message in qnode query queue is processing"
);
SReadHandle
handle
=
{
0
};
SReadHandle
handle
=
{
.
pMsgCb
=
&
pQnode
->
msgCb
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
{
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
a82b728b
...
...
@@ -34,6 +34,7 @@
#include "tlockfree.h"
#include "tlosertree.h"
#include "tmallocator.h"
#include "tmsgcb.h"
#include "tskiplist.h"
#include "tstream.h"
#include "ttime.h"
...
...
@@ -121,7 +122,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
// sma
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
);
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
,
SMsgCb
*
pMsgCb
);
int32_t
tsdbFetchTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
**
ppStore
,
tb_uid_t
suid
,
tb_uid_t
uid
);
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pUidStore
);
void
tsdbUidStoreDestory
(
STbUidStore
*
pStore
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a82b728b
...
...
@@ -378,6 +378,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pTopic
->
buffer
.
output
[
j
].
pReadHandle
=
pReadHandle
;
pTopic
->
buffer
.
output
[
j
].
task
=
qCreateStreamExecTaskInfo
(
pTopic
->
qmsg
,
&
handle
);
...
...
@@ -857,6 +858,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SReadHandle
handle
=
{
.
reader
=
pExec
->
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pExec
->
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pExec
->
qmsg
,
&
handle
);
ASSERT
(
pExec
->
task
[
i
]);
...
...
@@ -914,6 +916,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
SReadHandle
handle
=
{
.
reader
=
pStreamReader
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pTask
->
exec
.
runners
[
i
].
inputHandle
=
pStreamReader
;
pTask
->
exec
.
runners
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
a82b728b
...
...
@@ -1698,7 +1698,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
* @param pReq
* @return int32_t
*/
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
)
{
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
,
SMsgCb
*
pMsgCb
)
{
if
(
!
pReq
->
rollup
)
{
tsdbDebug
(
"vgId:%d return directly since no rollup for stable %s %"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1742,6 +1742,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) {
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pMeta
,
.
pMsgCb
=
pMsgCb
,
};
if
(
param
->
qmsg1
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
a82b728b
...
...
@@ -142,9 +142,9 @@ _err:
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in vnode query queue is processing"
);
#if 0
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode};
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode
, .pMsgCb = &pVnode->msgCb
};
#endif
SReadHandle
handle
=
{.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
,
.
vnode
=
pVnode
};
SReadHandle
handle
=
{.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
,
.
vnode
=
pVnode
,
.
pMsgCb
=
&
pVnode
->
msgCb
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
return
qWorkerProcessQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
);
...
...
@@ -305,7 +305,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto
_err
;
}
tsdbRegisterRSma
(
pVnode
->
pTsdb
,
pVnode
->
pMeta
,
&
req
);
tsdbRegisterRSma
(
pVnode
->
pTsdb
,
pVnode
->
pMeta
,
&
req
,
&
pVnode
->
msgCb
);
tDecoderClear
(
&
coder
);
return
0
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
a82b728b
...
...
@@ -392,10 +392,7 @@ typedef struct SStreamBlockScanInfo {
}
SStreamBlockScanInfo
;
typedef
struct
SSysTableScanInfo
{
union
{
void
*
pTransporter
;
SReadHandle
readHandle
;
};
SReadHandle
readHandle
;
SRetrieveMetaTableRsp
*
pRsp
;
SRetrieveTableReq
req
;
...
...
@@ -663,8 +660,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
groupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pDataReader
,
SQueryTableDataCond
*
pCond
,
int32_t
numOfOutput
,
int32_t
dataLoadFlag
,
const
uint8_t
*
scanInfo
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SInterval
*
pInterval
,
double
sampleRatio
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
a82b728b
此差异已折叠。
点击以展开。
source/libs/executor/src/groupoperator.c
浏览文件 @
a82b728b
...
...
@@ -262,6 +262,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
...
...
@@ -289,7 +291,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
pScalarExprInfo
!=
NULL
)
{
projectApplyFunctions
(
pInfo
->
pScalarExprInfo
,
pBlock
,
pBlock
,
pInfo
->
pScalarFuncCtx
,
pInfo
->
numOfScalarExpr
,
NULL
);
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
pScalarExprInfo
,
pBlock
,
pBlock
,
pInfo
->
pScalarFuncCtx
,
pInfo
->
numOfScalarExpr
,
NULL
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
a82b728b
...
...
@@ -1057,7 +1057,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pMsgSendInfo
->
fp
=
loadSysTableCallback
;
int64_t
transporterId
=
0
;
int32_t
code
=
asyncSendMsgToServer
(
pInfo
->
pTransporter
,
&
pInfo
->
epSet
,
&
transporterId
,
pMsgSendInfo
);
int32_t
code
=
asyncSendMsgToServer
(
pInfo
->
readHandle
.
pMsgCb
->
clientRpc
,
&
pInfo
->
epSet
,
&
transporterId
,
pMsgSendInfo
);
tsem_wait
(
&
pInfo
->
ready
);
if
(
pTaskInfo
->
code
)
{
...
...
@@ -1182,29 +1183,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
}
else
{
tsem_init
(
&
pInfo
->
ready
,
0
,
0
);
pInfo
->
epSet
=
epset
;
#if 1
{
// todo refactor
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"DB-META"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
qProcessFetchRsp
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
"root"
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
"dcc5bed04851fec854c035b2e40263b6"
;
pInfo
->
pTransporter
=
rpcOpen
(
&
rpcInit
);
if
(
pInfo
->
pTransporter
==
NULL
)
{
return
NULL
;
// todo
}
}
#endif
pInfo
->
readHandle
=
*
(
SReadHandle
*
)
readHandle
;
}
pOperator
->
name
=
"SysTableScanOperator"
;
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
a82b728b
...
...
@@ -114,7 +114,10 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo
*
pOperator
=
param
;
SSortOperatorInfo
*
pSort
=
pOperator
->
info
;
if
(
pOperator
->
pExpr
!=
NULL
)
{
projectApplyFunctions
(
pOperator
->
pExpr
,
pBlock
,
pBlock
,
pSort
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
NULL
);
int32_t
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pBlock
,
pBlock
,
pSort
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pOperator
->
pTaskInfo
->
env
,
code
);
}
}
}
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
a82b728b
...
...
@@ -37,11 +37,11 @@ bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t
sumFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sumInvertFunction
(
SqlFunctionCtx
*
pCtx
);
bool
minFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
maxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
minmaxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
getMinmaxFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
minFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
maxFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
minmaxFunctionFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getAvgFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
avgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
...
...
@@ -70,6 +70,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
bool
getTopBotFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
);
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
bottomFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
topBotFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getSpreadFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
...
...
@@ -82,6 +83,8 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn
int32_t
histogramFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
histogramFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getSelectivityFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/function/inc/functionMgtInt.h
浏览文件 @
a82b728b
...
...
@@ -40,6 +40,7 @@ extern "C" {
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
#define FUNC_MGT_SCAN_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(12)
#define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13)
#define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
...
...
source/libs/function/src/builtins.c
浏览文件 @
a82b728b
...
...
@@ -207,7 +207,8 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
static
int32_t
translateBottom
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// todo
SDataType
*
pType
=
&
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
;
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
pType
->
bytes
,
.
type
=
pType
->
type
};
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -241,7 +242,7 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
pFunc
->
node
.
resType
=
(
SDataType
)
{
.
bytes
=
512
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
pFunc
->
node
.
resType
=
(
SDataType
)
{.
bytes
=
512
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -273,7 +274,8 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
SExprNode
*
p1
=
(
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
!
IS_NUMERIC_TYPE
(
p1
->
resType
.
type
))
{
if
(
!
IS_SIGNED_NUMERIC_TYPE
(
p1
->
resType
.
type
)
&&
!
IS_FLOAT_TYPE
(
p1
->
resType
.
type
)
&&
TSDB_DATA_TYPE_BOOL
!=
p1
->
resType
.
type
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
pFunc
->
node
.
resType
=
p1
->
resType
;
...
...
@@ -509,9 +511,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
translateFunc
=
translateInOutNum
,
.
dataRequiredFunc
=
statisDataRequired
,
.
getEnvFunc
=
getMinmaxFuncEnv
,
.
initFunc
=
minFunctionSetup
,
.
initFunc
=
min
max
FunctionSetup
,
.
processFunc
=
minFunction
,
.
finalizeFunc
=
f
unctionFinalize
.
finalizeFunc
=
minmaxF
unctionFinalize
},
{
.
name
=
"max"
,
...
...
@@ -520,9 +522,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
translateFunc
=
translateInOutNum
,
.
dataRequiredFunc
=
statisDataRequired
,
.
getEnvFunc
=
getMinmaxFuncEnv
,
.
initFunc
=
maxFunctionSetup
,
.
initFunc
=
m
inm
axFunctionSetup
,
.
processFunc
=
maxFunction
,
.
finalizeFunc
=
f
unctionFinalize
.
finalizeFunc
=
minmaxF
unctionFinalize
},
{
.
name
=
"stddev"
,
...
...
@@ -549,7 +551,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"percentile"
,
.
type
=
FUNCTION_TYPE_PERCENTILE
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_REPEAT_SCAN_FUNC
,
.
translateFunc
=
translatePercentile
,
.
getEnvFunc
=
getPercentileFuncEnv
,
.
initFunc
=
percentileFunctionSetup
,
...
...
@@ -562,14 +564,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
translateFunc
=
translateApercentile
,
.
getEnvFunc
=
getMinmaxFuncEnv
,
.
initFunc
=
maxFunctionSetup
,
.
initFunc
=
m
inm
axFunctionSetup
,
.
processFunc
=
maxFunction
,
.
finalizeFunc
=
functionFinalize
},
{
.
name
=
"top"
,
.
type
=
FUNCTION_TYPE_TOP
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
,
.
translateFunc
=
translateTop
,
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
functionSetup
,
...
...
@@ -579,12 +581,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"bottom"
,
.
type
=
FUNCTION_TYPE_BOTTOM
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
,
.
translateFunc
=
translateBottom
,
.
getEnvFunc
=
get
Minmax
FuncEnv
,
.
initFunc
=
maxF
unctionSetup
,
.
processFunc
=
max
Function
,
.
finalizeFunc
=
function
Finalize
.
getEnvFunc
=
get
TopBot
FuncEnv
,
.
initFunc
=
f
unctionSetup
,
.
processFunc
=
bottom
Function
,
.
finalizeFunc
=
topBot
Finalize
},
{
.
name
=
"spread"
,
...
...
@@ -603,7 +605,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_MULTI_RES_FUNC
,
.
translateFunc
=
translateLastRow
,
.
getEnvFunc
=
getMinmaxFuncEnv
,
.
initFunc
=
maxFunctionSetup
,
.
initFunc
=
m
inm
axFunctionSetup
,
.
processFunc
=
maxFunction
,
.
finalizeFunc
=
functionFinalize
},
...
...
@@ -1032,8 +1034,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
type
=
FUNCTION_TYPE_SELECT_VALUE
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
,
.
translateFunc
=
translateSelectValue
,
.
getEnvFunc
=
NULL
,
.
initFunc
=
NULL
,
.
getEnvFunc
=
getSelectivityFuncEnv
,
// todo remove this function later.
.
initFunc
=
functionSetup
,
.
sprocessFunc
=
NULL
,
.
finalizeFunc
=
NULL
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
a82b728b
此差异已折叠。
点击以展开。
source/libs/function/src/functionMgt.c
浏览文件 @
a82b728b
...
...
@@ -169,6 +169,8 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId) {
bool
fmIsMultiResFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_MULTI_RES_FUNC
);
}
bool
fmIsRepeatScanFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_REPEAT_SCAN_FUNC
);
}
bool
fmIsUserDefinedFunc
(
int32_t
funcId
)
{
return
funcId
>
FUNC_UDF_ID_START
;
}
void
fmFuncMgtDestroy
()
{
...
...
@@ -197,15 +199,14 @@ int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet) {
bool
fmIsInvertible
(
int32_t
funcId
)
{
bool
res
=
false
;
switch
(
funcMgtBuiltins
[
funcId
].
type
)
{
case
FUNCTION_TYPE_COUNT
:
case
FUNCTION_TYPE_SUM
:
case
FUNCTION_TYPE_STDDEV
:
case
FUNCTION_TYPE_AVG
:
res
=
true
;
break
;
default:
break
;
case
FUNCTION_TYPE_COUNT
:
case
FUNCTION_TYPE_SUM
:
case
FUNCTION_TYPE_STDDEV
:
case
FUNCTION_TYPE_AVG
:
res
=
true
;
break
;
default:
break
;
}
return
res
;
}
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
a82b728b
...
...
@@ -222,6 +222,8 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiSort"
;
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
"PhysiInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
"PhysiStreamInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_FILL
:
return
"PhysiFill"
;
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
...
...
@@ -2893,6 +2895,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
physiSortNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
physiIntervalNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_FILL
:
return
physiFillNodeToJson
(
pObj
,
pJson
);
...
...
@@ -2983,6 +2986,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
jsonToPhysiSortNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
jsonToPhysiIntervalNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_FILL
:
return
jsonToPhysiFillNode
(
pJson
,
pObj
);
...
...
@@ -3099,6 +3103,7 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode) {
return
TSDB_CODE_FAILED
;
}
int32_t
code
=
makeNodeByJson
(
pJson
,
pNode
);
tjsonDelete
(
pJson
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyNode
(
*
pNode
);
*
pNode
=
NULL
;
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
a82b728b
...
...
@@ -513,6 +513,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
res
=
walkWindowPhysi
((
SWinodwPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
a82b728b
...
...
@@ -252,6 +252,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SSortPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SStreamIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_FILL
:
return
makeNode
(
type
,
sizeof
(
SFillPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
...
...
@@ -644,6 +646,7 @@ void nodesDestroyNode(SNodeptr pNode) {
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
destroyWinodwPhysiNode
((
SWinodwPhysiNode
*
)
pNode
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
...
...
source/libs/parser/src/parAstParser.c
浏览文件 @
a82b728b
...
...
@@ -64,8 +64,8 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
goto
abort_parse
;
}
default:
Parse
(
pParser
,
t0
.
type
,
t0
,
&
cxt
);
// ParseTrace(stdout, "");
Parse
(
pParser
,
t0
.
type
,
t0
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
goto
abort_parse
;
}
...
...
source/libs/parser/src/parTokenizer.c
浏览文件 @
a82b728b
...
...
@@ -590,6 +590,8 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
if
(
seg
==
4
)
{
// ip address
*
tokenId
=
TK_NK_IPTOKEN
;
return
i
;
}
else
if
(
seg
>
2
)
{
break
;
}
if
((
z
[
i
]
==
'e'
||
z
[
i
]
==
'E'
)
&&
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
a82b728b
...
...
@@ -699,6 +699,10 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
if
(
isCountStar
(
pFunc
))
{
pCxt
->
errCode
=
rewriteCountStar
(
pCxt
,
pFunc
);
}
if
(
fmIsRepeatScanFunc
(
pFunc
->
funcId
))
{
pCxt
->
pCurrStmt
->
hasRepeatScanFuncs
=
true
;
}
}
return
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
?
DEAL_RES_CONTINUE
:
DEAL_RES_ERROR
;
}
...
...
@@ -2255,8 +2259,8 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, SN
code
=
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_DUPLICATED_COLUMN
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
((
TSDB_DATA_TYPE_VARCHAR
==
pCol
->
dataType
.
type
&&
pCol
->
dataType
.
bytes
>
TSDB_MAX_BINARY_LEN
)
||
(
TSDB_DATA_TYPE_NCHAR
==
pCol
->
dataType
.
type
&&
pCol
->
dataType
.
bytes
>
TSDB_MAX_NCHAR_LEN
))
{
if
((
TSDB_DATA_TYPE_VARCHAR
==
pCol
->
dataType
.
type
&&
calcTypeBytes
(
pCol
->
dataType
)
>
TSDB_MAX_BINARY_LEN
)
||
(
TSDB_DATA_TYPE_NCHAR
==
pCol
->
dataType
.
type
&&
calcTypeBytes
(
pCol
->
dataType
)
>
TSDB_MAX_NCHAR_LEN
))
{
code
=
code
=
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
);
}
}
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
a82b728b
...
...
@@ -54,6 +54,9 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
pCol
->
node
.
resType
=
pToBeRewrittenExpr
->
resType
;
strcpy
(
pCol
->
node
.
aliasName
,
pToBeRewrittenExpr
->
aliasName
);
strcpy
(
pCol
->
colName
,
((
SExprNode
*
)
pExpr
)
->
aliasName
);
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
pExpr
)
&&
FUNCTION_TYPE_WSTARTTS
==
((
SFunctionNode
*
)
pExpr
)
->
funcType
)
{
pCol
->
colId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
}
nodesDestroyNode
(
*
pNode
);
*
pNode
=
(
SNode
*
)
pCol
;
return
DEAL_RES_IGNORE_CHILD
;
...
...
@@ -253,7 +256,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
TSWAP
(
pScan
->
pMeta
,
pRealTable
->
pMeta
);
TSWAP
(
pScan
->
pVgroupList
,
pRealTable
->
pVgroupList
);
pScan
->
scanSeq
[
0
]
=
1
;
pScan
->
scanSeq
[
0
]
=
pSelect
->
hasRepeatScanFuncs
?
2
:
1
;
pScan
->
scanSeq
[
1
]
=
0
;
pScan
->
scanRange
=
TSWINDOW_INITIALIZER
;
pScan
->
tableName
.
type
=
TSDB_TABLE_NAME_T
;
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
a82b728b
...
...
@@ -754,10 +754,7 @@ static int32_t opkDoOptimized(SOptimizeContext* pCxt, SSortLogicNode* pSort, SNo
EOrder
order
=
opkGetPrimaryKeyOrder
(
pSort
);
if
(
ORDER_DESC
==
order
)
{
SNode
*
pScan
=
NULL
;
FOREACH
(
pScan
,
pScanNodes
)
{
((
SScanLogicNode
*
)
pScan
)
->
scanSeq
[
0
]
=
0
;
((
SScanLogicNode
*
)
pScan
)
->
scanSeq
[
1
]
=
1
;
}
FOREACH
(
pScan
,
pScanNodes
)
{
TSWAP
(((
SScanLogicNode
*
)
pScan
)
->
scanSeq
[
0
],
((
SScanLogicNode
*
)
pScan
)
->
scanSeq
[
1
]);
}
}
if
(
NULL
==
pSort
->
node
.
pParent
)
{
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
a82b728b
...
...
@@ -869,7 +869,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
static
int32_t
createIntervalPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SIntervalPhysiNode
*
pInterval
=
(
SIntervalPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pWindowLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
);
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pWindowLogicNode
,
(
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
));
if
(
NULL
==
pInterval
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
source/libs/planner/test/planBasicTest.cpp
浏览文件 @
a82b728b
...
...
@@ -48,4 +48,6 @@ TEST_F(PlanBasicTest, func) {
useDb
(
"root"
,
"test"
);
run
(
"SELECT DIFF(c1) FROM t1"
);
run
(
"SELECT PERCENTILE(c1, 60) FROM t1"
);
}
source/libs/planner/test/planOptimizeTest.cpp
浏览文件 @
a82b728b
...
...
@@ -37,4 +37,6 @@ TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
run
(
"SELECT * FROM t1 ORDER BY ts DESC"
);
run
(
"SELECT c1 FROM t1 ORDER BY ts"
);
run
(
"SELECT c1 FROM t1 ORDER BY ts DESC"
);
run
(
"SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTARTTS DESC"
);
}
source/libs/qcom/src/queryUtil.c
浏览文件 @
a82b728b
...
...
@@ -136,7 +136,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return
0
;
}
int32_t
asyncSendMsgToServerExt
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
,
bool
persistHandle
,
void
*
rpcCtx
)
{
int32_t
asyncSendMsgToServerExt
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
,
bool
persistHandle
,
void
*
rpcCtx
)
{
char
*
pMsg
=
rpcMallocCont
(
pInfo
->
msgInfo
.
len
);
if
(
NULL
==
pMsg
)
{
qError
(
"0x%"
PRIx64
" msg:%s malloc failed"
,
pInfo
->
requestId
,
TMSG_INFO
(
pInfo
->
msgType
));
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
a82b728b
...
...
@@ -522,6 +522,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
ret
=
raftStoreClose
(
pSyncNode
->
pRaftStore
);
assert
(
ret
==
0
);
syncRespMgrDestroy
(
pSyncNode
->
pSyncRespMgr
);
voteGrantedDestroy
(
pSyncNode
->
pVotesGranted
);
votesRespondDestory
(
pSyncNode
->
pVotesRespond
);
syncIndexMgrDestroy
(
pSyncNode
->
pNextIndex
);
...
...
@@ -1138,6 +1139,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
syncNodeReplicate
(
ths
);
}
syncEntryDestory
(
pEntry
);
return
ret
;
}
...
...
source/util/test/encodeTest.cpp
浏览文件 @
a82b728b
...
...
@@ -440,6 +440,7 @@ TEST(td_encode_test, compound_struct_encode_test) {
tCoderClear(&decoder);
}
#endif
#pragma GCC diagnostic pop
#endif
tests/script/sh/exec.sh
浏览文件 @
a82b728b
...
...
@@ -101,8 +101,8 @@ if [ "$EXEC_OPTON" = "start" ]; then
if
[
"
$VALGRIND_OPTION
"
=
"true"
]
;
then
TT
=
`
date
+%s
`
#mkdir ${LOG_DIR}/${TT}
echo
"nohup valgrind --log-file=
${
LOG_DIR
}
/valgrind-taosd-
${
NODE_NAME
}
-
${
TT
}
.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes
$EXE_DIR
/taosd -c
$CFG_DIR
> /dev/null 2>&1 &"
nohup
valgrind
--log-file
=
${
LOG_DIR
}
/valgrind-taosd-
${
NODE_NAME
}
-
${
TT
}
.log
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
$EXE_DIR
/taosd
-c
$CFG_DIR
>
/dev/null 2>&1 &
echo
"nohup valgrind --log-file=
${
LOG_DIR
}
/valgrind-taosd-
${
NODE_NAME
}
-
${
TT
}
.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all
--num-callers=20 -v
-v --workaround-gcc296-bugs=yes
$EXE_DIR
/taosd -c
$CFG_DIR
> /dev/null 2>&1 &"
nohup
valgrind
--log-file
=
${
LOG_DIR
}
/valgrind-taosd-
${
NODE_NAME
}
-
${
TT
}
.log
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
--num-callers
=
20
-v
-v
--workaround-gcc296-bugs
=
yes
$EXE_DIR
/taosd
-c
$CFG_DIR
>
/dev/null 2>&1 &
else
echo
"nohup
$EXE_DIR
/taosd -c
$CFG_DIR
> /dev/null 2>&1 &"
nohup
$EXE_DIR
/taosd
-c
$CFG_DIR
>
/dev/null 2>&1 &
...
...
tests/script/test.sh
浏览文件 @
a82b728b
...
...
@@ -131,8 +131,8 @@ if [ -n "$FILE_NAME" ]; then
FLAG
=
"-v"
fi
echo
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--child-silent-after-fork
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tsim.log
$PROGRAM
-c
$CFG_DIR
-f
$FILE_NAME
$FLAG
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--child-silent-after-fork
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tsim.log
$PROGRAM
-c
$CFG_DIR
-f
$FILE_NAME
$FLAG
echo
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--child-silent-after-fork
=
yes
--show-leak-kinds
=
all
--num-callers
=
20
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tsim.log
$PROGRAM
-c
$CFG_DIR
-f
$FILE_NAME
$FLAG
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--child-silent-after-fork
=
yes
--show-leak-kinds
=
all
--num-callers
=
20
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tsim.log
$PROGRAM
-c
$CFG_DIR
-f
$FILE_NAME
$FLAG
else
if
[[
$MULTIPROCESS
-eq
1
]]
;
then
echo
"ExcuteCmd(multiprocess):"
$PROGRAM
-m
-c
$CFG_DIR
-f
$FILE_NAME
...
...
tests/script/tsim/insert/basic0.sim
浏览文件 @
a82b728b
...
...
@@ -140,7 +140,8 @@ endi
if $data00 != -13 then
return -1
endi
if $data01 != -2.30000 then
if $data01 != -2.30000 then
print expect -2.30000, actual: $data01
return -1
endi
if $data02 != -3.300000000 then
...
...
tests/system-test/2-query/diff.py
浏览文件 @
a82b728b
from
wsgiref.headers
import
tspecials
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
...
...
@@ -27,6 +28,36 @@ class TDTestCase:
def
run
(
self
):
tdSql
.
prepare
()
tdSql
.
execute
(
"create table ntb(ts timestamp,c1 int,c2 double,c3 float)"
)
tdSql
.
execute
(
"insert into ntb values(now,1,1.0,10.5)(now+1s,10,-100.0,5.1)(now+10s,-1,15.1,5.0)"
)
tdSql
.
query
(
"select diff(c1,0) from ntb"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
9
)
tdSql
.
checkData
(
1
,
0
,
-
11
)
tdSql
.
query
(
"select diff(c1,1) from ntb"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
9
)
tdSql
.
checkData
(
1
,
0
,
None
)
tdSql
.
query
(
"select diff(c2,0) from ntb"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
-
101
)
tdSql
.
checkData
(
1
,
0
,
115.1
)
tdSql
.
query
(
"select diff(c2,1) from ntb"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
None
)
tdSql
.
checkData
(
1
,
0
,
115.1
)
tdSql
.
query
(
"select diff(c3,0) from ntb"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
-
5.4
)
tdSql
.
checkData
(
1
,
0
,
-
0.1
)
tdSql
.
query
(
"select diff(c3,1) from ntb"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
None
)
tdSql
.
checkData
(
1
,
0
,
None
)
tdSql
.
execute
(
'''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))'''
)
...
...
@@ -58,12 +89,6 @@ class TDTestCase:
tdSql
.
error
(
"select diff(ts) from stb"
)
tdSql
.
error
(
"select diff(ts) from stb_1"
)
tdSql
.
error
(
"select diff(col1) from stb"
)
tdSql
.
error
(
"select diff(col2) from stb"
)
tdSql
.
error
(
"select diff(col3) from stb"
)
tdSql
.
error
(
"select diff(col4) from stb"
)
tdSql
.
error
(
"select diff(col5) from stb"
)
tdSql
.
error
(
"select diff(col6) from stb"
)
tdSql
.
error
(
"select diff(col7) from stb"
)
tdSql
.
error
(
"select diff(col7) from stb_1"
)
tdSql
.
error
(
"select diff(col8) from stb"
)
...
...
@@ -74,10 +99,6 @@ class TDTestCase:
tdSql
.
error
(
"select diff(col12) from stb_1"
)
tdSql
.
error
(
"select diff(col13) from stb_1"
)
tdSql
.
error
(
"select diff(col14) from stb_1"
)
tdSql
.
error
(
"select diff(col11) from stb"
)
tdSql
.
error
(
"select diff(col12) from stb"
)
tdSql
.
error
(
"select diff(col13) from stb"
)
tdSql
.
error
(
"select diff(col14) from stb"
)
tdSql
.
query
(
"select ts,diff(col1),ts from stb_1"
)
tdSql
.
checkRows
(
10
)
...
...
@@ -88,14 +109,14 @@ class TDTestCase:
tdSql
.
checkData
(
9
,
1
,
"2018-09-17 09:00:00.009"
)
tdSql
.
checkData
(
9
,
3
,
"2018-09-17 09:00:00.009"
)
tdSql
.
query
(
"select ts,diff(col1),ts from stb group by tbname"
)
tdSql
.
checkRows
(
10
)
tdSql
.
checkData
(
0
,
0
,
"2018-09-17 09:00:00.000"
)
tdSql
.
checkData
(
0
,
1
,
"2018-09-17 09:00:00.000"
)
tdSql
.
checkData
(
0
,
3
,
"2018-09-17 09:00:00.000"
)
tdSql
.
checkData
(
9
,
0
,
"2018-09-17 09:00:00.009"
)
tdSql
.
checkData
(
9
,
1
,
"2018-09-17 09:00:00.009"
)
tdSql
.
checkData
(
9
,
3
,
"2018-09-17 09:00:00.009"
)
#
tdSql.query("select ts,diff(col1),ts from stb group by tbname")
#
tdSql.checkRows(10)
#
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
#
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
#
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
#
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
#
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
#
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
tdSql
.
query
(
"select ts,diff(col1),ts from stb_1"
)
tdSql
.
checkRows
(
10
)
...
...
@@ -106,14 +127,14 @@ class TDTestCase:
tdSql
.
checkData
(
9
,
1
,
"2018-09-17 09:00:00.009"
)
tdSql
.
checkData
(
9
,
3
,
"2018-09-17 09:00:00.009"
)
tdSql
.
query
(
"select ts,diff(col1),ts from stb group by tbname"
)
tdSql
.
checkRows
(
10
)
tdSql
.
checkData
(
0
,
0
,
"2018-09-17 09:00:00.000"
)
tdSql
.
checkData
(
0
,
1
,
"2018-09-17 09:00:00.000"
)
tdSql
.
checkData
(
0
,
3
,
"2018-09-17 09:00:00.000"
)
tdSql
.
checkData
(
9
,
0
,
"2018-09-17 09:00:00.009"
)
tdSql
.
checkData
(
9
,
1
,
"2018-09-17 09:00:00.009"
)
tdSql
.
checkData
(
9
,
3
,
"2018-09-17 09:00:00.009"
)
#
tdSql.query("select ts,diff(col1),ts from stb group by tbname")
#
tdSql.checkRows(10)
#
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
#
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
#
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
#
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
#
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
#
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
tdSql
.
query
(
"select diff(col1) from stb_1"
)
tdSql
.
checkRows
(
10
)
...
...
tests/system-test/fulltest.sh
浏览文件 @
a82b728b
...
...
@@ -31,8 +31,8 @@ python3 ./test.py -f 2-query/last.py
#python3 ./test.py -f 2-query/To_iso8601.py
python3 ./test.py
-f
2-query/To_unixtimestamp.py
python3 ./test.py
-f
2-query/timetruncate.py
#
python3 ./test.py -f 2-query/Timediff.py
# python3 ./test.py -f 2-query/diff.py
python3 ./test.py
-f
2-query/Timediff.py
#python3 ./test.py -f 2-query/cast.py
...
...
taos-tools
@
0ae9f872
比较
2f3dfddd
...
0ae9f872
Subproject commit
2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c
Subproject commit
0ae9f872c26d5da8cb61aa9eb00b5c7aeba10ec4
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录