Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
90371894
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
90371894
编写于
12月 01, 2022
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into szhou/table-count-scan
上级
45ecc2ba
de87d901
变更
40
展开全部
隐藏空白更改
内联
并排
Showing
40 changed file
with
2072 addition
and
1876 deletion
+2072
-1876
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
include/common/tglobal.h
include/common/tglobal.h
+1
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+0
-7
include/libs/sync/sync.h
include/libs/sync/sync.h
+1
-0
include/os/osSocket.h
include/os/osSocket.h
+3
-0
include/util/tarray.h
include/util/tarray.h
+2
-13
source/client/src/clientSml.c
source/client/src/clientSml.c
+38
-25
source/common/src/systable.c
source/common/src/systable.c
+8
-9
source/common/src/tglobal.c
source/common/src/tglobal.c
+5
-0
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+2
-2
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+0
-2
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+3
-2
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+12
-3
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+1
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+118
-79
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+1
-0
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+10
-0
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-22
source/libs/executor/inc/tfill.h
source/libs/executor/inc/tfill.h
+10
-6
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+0
-11
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+10
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+14
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+0
-349
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+1494
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+10
-25
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+2
-1111
source/libs/executor/src/timesliceoperator.c
source/libs/executor/src/timesliceoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+1
-11
source/libs/function/src/detail/tavgfunction.c
source/libs/function/src/detail/tavgfunction.c
+80
-31
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+3
-1
source/libs/stream/src/streamUpdate.c
source/libs/stream/src/streamUpdate.c
+6
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+14
-1
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+4
-0
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+1
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+10
-0
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+20
-5
source/os/src/osSocket.c
source/os/src/osSocket.c
+27
-0
source/util/src/tarray.c
source/util/src/tarray.c
+1
-8
tests/system-test/0-others/compatibility.py
tests/system-test/0-others/compatibility.py
+154
-149
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
90371894
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
cf30c86
GIT_TAG
d5df76d
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
include/common/tglobal.h
浏览文件 @
90371894
...
...
@@ -129,6 +129,7 @@ extern char tsUdfdLdLibPath[];
extern
char
tsSmlChildTableName
[];
extern
char
tsSmlTagName
[];
extern
bool
tsSmlDataFormat
;
extern
int32_t
tsSmlBatchSize
;
// wal
extern
int64_t
tsWalFsyncDataSizeLimit
;
...
...
include/libs/executor/executor.h
浏览文件 @
90371894
...
...
@@ -160,13 +160,6 @@ int32_t qAsyncKillTask(qTaskInfo_t tinfo);
*/
void
qDestroyTask
(
qTaskInfo_t
tinfo
);
/**
* Get the queried table uid
* @param qHandle
* @return
*/
int64_t
qGetQueriedTableUid
(
qTaskInfo_t
tinfo
);
/**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
*
...
...
include/libs/sync/sync.h
浏览文件 @
90371894
...
...
@@ -67,6 +67,7 @@ typedef struct SWal SWal;
typedef
struct
SSyncRaftEntry
SSyncRaftEntry
;
typedef
enum
{
TAOS_SYNC_STATE_OFFLINE
=
0
,
TAOS_SYNC_STATE_FOLLOWER
=
100
,
TAOS_SYNC_STATE_CANDIDATE
=
101
,
TAOS_SYNC_STATE_LEADER
=
102
,
...
...
include/os/osSocket.h
浏览文件 @
90371894
...
...
@@ -169,6 +169,9 @@ void taosSetMaskSIGPIPE();
uint32_t
taosInetAddr
(
const
char
*
ipAddr
);
const
char
*
taosInetNtoa
(
struct
in_addr
ipInt
,
char
*
dstStr
,
int32_t
len
);
uint64_t
taosHton64
(
uint64_t
val
);
uint64_t
taosNtoh64
(
uint64_t
val
);
#ifdef __cplusplus
}
#endif
...
...
include/util/tarray.h
浏览文件 @
90371894
...
...
@@ -220,15 +220,10 @@ void taosArrayClear(SArray* pArray);
*/
void
taosArrayClearEx
(
SArray
*
pArray
,
void
(
*
fp
)(
void
*
));
/**
* clear the array (remove all element)
* @param pArray
* @param fp
*/
void
taosArrayClearP
(
SArray
*
pArray
,
FDelete
fp
);
void
*
taosArrayDestroy
(
SArray
*
pArray
);
void
taosArrayDestroyP
(
SArray
*
pArray
,
FDelete
fp
);
void
taosArrayDestroyEx
(
SArray
*
pArray
,
FDelete
fp
);
/**
...
...
@@ -238,12 +233,6 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp);
*/
void
taosArraySort
(
SArray
*
pArray
,
__compar_fn_t
comparFn
);
/**
* sort string array
* @param pArray
*/
void
taosArraySortString
(
SArray
*
pArray
,
__compar_fn_t
comparFn
);
/**
* search the array
* @param pArray
...
...
source/client/src/clientSml.c
浏览文件 @
90371894
...
...
@@ -79,7 +79,6 @@
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
#define MAX_RETRY_TIMES 5
#define LINE_BATCH 2000
//=================================================================================================
typedef
TSDB_SML_PROTOCOL_TYPE
SMLProtocolType
;
...
...
@@ -467,6 +466,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto
end
;
}
info
->
cost
.
numOfCreateSTables
++
;
taosMemoryFreeClear
(
pTableMeta
);
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" catalogGetSTableMeta failed. super table name %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
}
else
if
(
code
==
TSDB_CODE_SUCCESS
)
{
hashTmp
=
taosHashInit
(
pTableMeta
->
tableInfo
.
numOfTags
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
...
...
@@ -505,16 +511,16 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uError
(
"SML:0x%"
PRIx64
" smlSendMetaMsg failed. can not create %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
}
taosMemoryFreeClear
(
pTableMeta
);
code
=
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
taosMemoryFreeClear
(
pTableMeta
);
code
=
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
}
taosHashClear
(
hashTmp
);
...
...
@@ -552,12 +558,18 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uError
(
"SML:0x%"
PRIx64
" smlSendMetaMsg failed. can not create %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
}
code
=
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
code
=
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" catalogGetSTableMeta failed. super table name %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
}
needCheckMeta
=
true
;
taosHashCleanup
(
hashTmp
);
hashTmp
=
NULL
;
...
...
@@ -565,13 +577,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uError
(
"SML:0x%"
PRIx64
" load table meta error: %s"
,
info
->
id
,
tstrerror
(
code
));
goto
end
;
}
taosMemoryFreeClear
(
pTableMeta
);
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" catalogGetSTableMeta failed. super table name %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
if
(
needCheckMeta
)
{
code
=
smlCheckMeta
(
&
(
pTableMeta
->
schema
[
pTableMeta
->
tableInfo
.
numOfColumns
]),
pTableMeta
->
tableInfo
.
numOfTags
,
...
...
@@ -596,7 +601,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
end:
taosHashCleanup
(
hashTmp
);
taosMemoryFreeClear
(
pTableMeta
);
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
1
);
//
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
return
code
;
}
...
...
@@ -815,6 +820,11 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) {
}
static
int64_t
smlParseInfluxTime
(
SSmlHandle
*
info
,
const
char
*
data
,
int32_t
len
)
{
void
*
tmp
=
taosMemoryCalloc
(
1
,
len
+
1
);
memcpy
(
tmp
,
data
,
len
);
uDebug
(
"SML:0x%"
PRIx64
" smlParseInfluxTime tslen:%d, ts:%s"
,
info
->
id
,
len
,
(
char
*
)
tmp
);
taosMemoryFree
(
tmp
);
if
(
len
==
0
||
(
len
==
1
&&
data
[
0
]
==
'0'
))
{
return
taosGetTimestampNs
();
}
...
...
@@ -2066,7 +2076,10 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
static
int32_t
smlParseInfluxLine
(
SSmlHandle
*
info
,
const
char
*
sql
,
const
int
len
)
{
SSmlLineInfo
elements
=
{
0
};
uDebug
(
"SML:0x%"
PRIx64
" smlParseInfluxLine sql:%s"
,
info
->
id
,
(
info
->
isRawLine
?
"rawdata"
:
sql
));
void
*
tmp
=
taosMemoryCalloc
(
1
,
len
+
1
);
memcpy
(
tmp
,
sql
,
len
);
uDebug
(
"SML:0x%"
PRIx64
" smlParseInfluxLine raw:%d, len:%d, sql:%s"
,
info
->
id
,
info
->
isRawLine
,
len
,
(
info
->
isRawLine
?
(
char
*
)
tmp
:
sql
));
taosMemoryFree
(
tmp
);
int
ret
=
smlParseInfluxString
(
sql
,
sql
+
len
,
&
elements
,
&
info
->
msgBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2562,7 +2575,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
goto
end
;
}
batchs
=
ceil
(((
double
)
numLines
)
/
LINE_BATCH
);
batchs
=
ceil
(((
double
)
numLines
)
/
tsSmlBatchSize
);
params
.
total
=
batchs
;
for
(
int
i
=
0
;
i
<
batchs
;
++
i
)
{
SRequestObj
*
req
=
(
SRequestObj
*
)
createRequest
(
pTscObj
->
id
,
TSDB_SQL_INSERT
,
0
);
...
...
@@ -2581,7 +2594,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
info
->
isRawLine
=
(
rawLine
==
NULL
);
info
->
ttl
=
ttl
;
int32_t
perBatch
=
LINE_BATCH
;
int32_t
perBatch
=
tsSmlBatchSize
;
if
(
numLines
>
perBatch
)
{
numLines
-=
perBatch
;
...
...
source/common/src/systable.c
浏览文件 @
90371894
...
...
@@ -206,16 +206,15 @@ static const SSysDbTableSchema vgroupsSchema[] = {
{.
name
=
"vgroup_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"db_name"
,
.
bytes
=
SYSTABLE_SCH_DB_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"tables"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"v1_dnode"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"v1_status"
,
.
bytes
=
10
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"v2_dnode"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"v2_status"
,
.
bytes
=
10
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"v3_dnode"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"v3_status"
,
.
bytes
=
10
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"status"
,
.
bytes
=
12
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"v1_dnode"
,
.
bytes
=
2
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
true
},
{.
name
=
"v1_status"
,
.
bytes
=
9
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"v2_dnode"
,
.
bytes
=
2
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
true
},
{.
name
=
"v2_status"
,
.
bytes
=
9
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"v3_dnode"
,
.
bytes
=
2
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
true
},
{.
name
=
"v3_status"
,
.
bytes
=
9
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"v4_dnode"
,
.
bytes
=
2
,
.
type
=
TSDB_DATA_TYPE_SMALLINT
,
.
sysInfo
=
true
},
{.
name
=
"v4_status"
,
.
bytes
=
9
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"cacheload"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"nfiles"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"file_size"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"tsma"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
,
.
sysInfo
=
true
},
};
...
...
source/common/src/tglobal.c
浏览文件 @
90371894
...
...
@@ -75,6 +75,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
// If set to empty system will generate table name using MD5 hash.
// true means that the name and order of cols in each line are the same(only for influx protocol)
bool
tsSmlDataFormat
=
false
;
int32_t
tsSmlBatchSize
=
10000
;
// query
int32_t
tsQueryPolicy
=
1
;
...
...
@@ -306,6 +307,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddString
(
pCfg
,
"smlChildTableName"
,
""
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"smlTagName"
,
tsSmlTagName
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"smlDataFormat"
,
tsSmlDataFormat
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"smlBatchSize"
,
tsSmlBatchSize
,
1
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"maxMemUsedByInsert"
,
tsMaxMemUsedByInsert
,
1
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"rpcRetryLimit"
,
tsRpcRetryLimit
,
1
,
100000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"rpcRetryInterval"
,
tsRpcRetryInterval
,
1
,
100000
,
0
)
!=
0
)
return
-
1
;
...
...
@@ -648,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy
(
tsSmlTagName
,
cfgGetItem
(
pCfg
,
"smlTagName"
)
->
str
,
TSDB_COL_NAME_LEN
);
tsSmlDataFormat
=
cfgGetItem
(
pCfg
,
"smlDataFormat"
)
->
bval
;
tsSmlBatchSize
=
cfgGetItem
(
pCfg
,
"smlBatchSize"
)
->
i32
;
tsMaxMemUsedByInsert
=
cfgGetItem
(
pCfg
,
"maxMemUsedByInsert"
)
->
i32
;
tsShellActivityTimer
=
cfgGetItem
(
pCfg
,
"shellActivityTimer"
)
->
i32
;
...
...
@@ -1021,6 +1024,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tstrncpy
(
tsSmlTagName
,
cfgGetItem
(
pCfg
,
"smlTagName"
)
->
str
,
TSDB_COL_NAME_LEN
);
}
else
if
(
strcasecmp
(
"smlDataFormat"
,
name
)
==
0
)
{
tsSmlDataFormat
=
cfgGetItem
(
pCfg
,
"smlDataFormat"
)
->
bval
;
}
else
if
(
strcasecmp
(
"smlBatchSize"
,
name
)
==
0
)
{
tsSmlBatchSize
=
cfgGetItem
(
pCfg
,
"smlBatchSize"
)
->
i32
;
}
else
if
(
strcasecmp
(
"shellActivityTimer"
,
name
)
==
0
)
{
tsShellActivityTimer
=
cfgGetItem
(
pCfg
,
"shellActivityTimer"
)
->
i32
;
}
else
if
(
strcasecmp
(
"supportVnodes"
,
name
)
==
0
)
{
...
...
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
浏览文件 @
90371894
...
...
@@ -150,7 +150,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
SServerStatusRsp
statusRsp
=
{
0
};
SMonMloadInfo
minfo
=
{
0
};
(
*
pMgmt
->
getMnodeLoadsFp
)(
&
minfo
);
if
(
minfo
.
isMnode
&&
minfo
.
load
.
syncState
==
TAOS_SYNC_STATE_ERROR
)
{
if
(
minfo
.
isMnode
&&
(
minfo
.
load
.
syncState
==
TAOS_SYNC_STATE_ERROR
||
minfo
.
load
.
syncState
==
TAOS_SYNC_STATE_OFFLINE
)
)
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_SERVICE_DEGRADED
;
snprintf
(
pStatus
->
details
,
sizeof
(
pStatus
->
details
),
"mnode sync state is %s"
,
syncStr
(
minfo
.
load
.
syncState
));
return
;
...
...
@@ -160,7 +160,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
(
*
pMgmt
->
getVnodeLoadsFp
)(
&
vinfo
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
vinfo
.
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
vinfo
.
pVloads
,
i
);
if
(
pLoad
->
syncState
==
TAOS_SYNC_STATE_ERROR
)
{
if
(
pLoad
->
syncState
==
TAOS_SYNC_STATE_ERROR
||
pLoad
->
syncState
==
TAOS_SYNC_STATE_OFFLINE
)
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_SERVICE_DEGRADED
;
snprintf
(
pStatus
->
details
,
sizeof
(
pStatus
->
details
),
"vnode:%d sync state is %s"
,
pLoad
->
vgId
,
syncStr
(
pLoad
->
syncState
));
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
90371894
...
...
@@ -36,8 +36,6 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray
*
mndBuildDnodesArray
(
SMnode
*
,
int32_t
exceptDnodeId
);
int32_t
mndAllocSmaVgroup
(
SMnode
*
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
int32_t
mndAllocVgroup
(
SMnode
*
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
);
int32_t
mndAddVnodeToVgroup
(
SMnode
*
,
SVgObj
*
pVgroup
,
SArray
*
pArray
);
int32_t
mndRemoveVnodeFromVgroup
(
SMnode
*
,
SVgObj
*
pVgroup
,
SArray
*
pArray
,
SVnodeGid
*
pDelVgid
);
int32_t
mndAddCreateVnodeAction
(
SMnode
*
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SVnodeGid
*
pVgid
);
int32_t
mndAddAlterVnodeConfirmAction
(
SMnode
*
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
int32_t
mndAddAlterVnodeAction
(
SMnode
*
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
tmsg_t
msgType
);
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
90371894
...
...
@@ -538,7 +538,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t
code
=
-
1
;
SArray
*
newSub
=
subscribe
.
topicNames
;
taosArraySort
String
(
newSub
,
taosArrayCompareString
);
taosArraySort
(
newSub
,
taosArrayCompareString
);
taosArrayRemoveDuplicateP
(
newSub
,
taosArrayCompareString
,
taosMemoryFree
);
int32_t
newTopicNum
=
taosArrayGetSize
(
newSub
);
...
...
@@ -850,7 +850,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
// add to current topic
taosArrayPush
(
pOldConsumer
->
currentTopics
,
&
addedTopic
);
taosArraySortString
(
pOldConsumer
->
currentTopics
,
taosArrayCompareString
);
taosArraySort
(
pOldConsumer
->
currentTopics
,
taosArrayCompareString
);
// set status
if
(
taosArrayGetSize
(
pOldConsumer
->
rebNewTopics
)
==
0
&&
taosArrayGetSize
(
pOldConsumer
->
rebRemovedTopics
)
==
0
)
{
if
(
pOldConsumer
->
status
==
MQ_CONSUMER_STATUS__MODIFY
||
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
90371894
...
...
@@ -151,10 +151,10 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
bool
roleChanged
=
false
;
for
(
int32_t
vg
=
0
;
vg
<
pVgroup
->
replica
;
++
vg
)
{
if
(
pVgroup
->
vnodeGid
[
vg
].
dnodeId
==
dnodeId
)
{
if
(
pVgroup
->
vnodeGid
[
vg
].
syncState
!=
TAOS_SYNC_STATE_
ERROR
)
{
if
(
pVgroup
->
vnodeGid
[
vg
].
syncState
!=
TAOS_SYNC_STATE_
OFFLINE
)
{
mInfo
(
"vgId:%d, state changed by offline check, old state:%s restored:%d new state:error restored:0"
,
pVgroup
->
vgId
,
syncStr
(
pVgroup
->
vnodeGid
[
vg
].
syncState
),
pVgroup
->
vnodeGid
[
vg
].
syncRestore
);
pVgroup
->
vnodeGid
[
vg
].
syncState
=
TAOS_SYNC_STATE_
ERROR
;
pVgroup
->
vnodeGid
[
vg
].
syncState
=
TAOS_SYNC_STATE_
OFFLINE
;
pVgroup
->
vnodeGid
[
vg
].
syncRestore
=
0
;
roleChanged
=
true
;
}
...
...
@@ -491,6 +491,15 @@ void mndPreClose(SMnode *pMnode) {
if
(
pMnode
!=
NULL
)
{
syncLeaderTransfer
(
pMnode
->
syncMgmt
.
sync
);
syncPreStop
(
pMnode
->
syncMgmt
.
sync
);
while
(
syncSnapshotRecving
(
pMnode
->
syncMgmt
.
sync
))
{
mInfo
(
"vgId:1, snapshot is recving"
);
taosMsleep
(
300
);
}
while
(
syncSnapshotSending
(
pMnode
->
syncMgmt
.
sync
))
{
mInfo
(
"vgId:1, snapshot is sending"
);
taosMsleep
(
300
);
}
}
}
...
...
@@ -747,7 +756,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
tstrncpy
(
desc
.
status
,
"ready"
,
sizeof
(
desc
.
status
));
pClusterInfo
->
vgroups_alive
++
;
}
if
(
pVgid
->
syncState
!=
TAOS_SYNC_STATE_ERROR
)
{
if
(
pVgid
->
syncState
!=
TAOS_SYNC_STATE_ERROR
&&
pVgid
->
syncState
!=
TAOS_SYNC_STATE_OFFLINE
)
{
pClusterInfo
->
vnodes_alive
++
;
}
pClusterInfo
->
vnodes_total
++
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
90371894
...
...
@@ -185,7 +185,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
return
-
1
;
}
pObj
->
syncState
=
TAOS_SYNC_STATE_
ERROR
;
pObj
->
syncState
=
TAOS_SYNC_STATE_
OFFLINE
;
mndReloadSyncConfig
(
pSdb
->
pMnode
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
90371894
...
...
@@ -179,6 +179,22 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
pOld
->
hashEnd
=
pNew
->
hashEnd
;
pOld
->
replica
=
pNew
->
replica
;
pOld
->
isTsma
=
pNew
->
isTsma
;
for
(
int32_t
i
=
0
;
i
<
pNew
->
replica
;
++
i
)
{
SVnodeGid
*
pNewGid
=
&
pNew
->
vnodeGid
[
i
];
for
(
int32_t
j
=
0
;
j
<
pOld
->
replica
;
++
j
)
{
SVnodeGid
*
pOldGid
=
&
pOld
->
vnodeGid
[
j
];
if
(
pNewGid
->
dnodeId
==
pOldGid
->
dnodeId
)
{
pNewGid
->
syncState
=
pOldGid
->
syncState
;
pNewGid
->
syncRestore
=
pOldGid
->
syncRestore
;
}
}
}
pNew
->
numOfTables
=
pOld
->
numOfTables
;
pNew
->
numOfTimeSeries
=
pOld
->
numOfTimeSeries
;
pNew
->
totalStorage
=
pOld
->
totalStorage
;
pNew
->
compStorage
=
pOld
->
compStorage
;
pNew
->
pointsWritten
=
pOld
->
pointsWritten
;
pNew
->
compact
=
pOld
->
compact
;
memcpy
(
pOld
->
vnodeGid
,
pNew
->
vnodeGid
,
TSDB_MAX_REPLICA
*
sizeof
(
SVnodeGid
));
return
0
;
}
...
...
@@ -659,11 +675,12 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
numOfTables
,
false
);
// default 3 replica
for
(
int32_t
i
=
0
;
i
<
3
;
++
i
)
{
// default 3 replica
, add 1 replica if move vnode
for
(
int32_t
i
=
0
;
i
<
4
;
++
i
)
{
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
if
(
i
<
pVgroup
->
replica
)
{
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
vnodeGid
[
i
].
dnodeId
,
false
);
int16_t
dnodeId
=
(
int16_t
)
pVgroup
->
vnodeGid
[
i
].
dnodeId
;
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
dnodeId
,
false
);
bool
exist
=
false
;
bool
online
=
false
;
...
...
@@ -695,16 +712,8 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
}
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppendNULL
(
pColInfo
,
numOfRows
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
cacheUsage
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppendNULL
(
pColInfo
,
numOfRows
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppendNULL
(
pColInfo
,
numOfRows
);
int32_t
cacheUsage
=
(
int32_t
)
pVgroup
->
cacheUsage
;
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
cacheUsage
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
isTsma
,
false
);
...
...
@@ -851,7 +860,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
sdbCancelFetch
(
pSdb
,
pIter
);
}
int32_t
mndAddVnodeToVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
)
{
static
int32_t
mndAddVnodeToVgroup
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SVgObj
*
pVgroup
,
SArray
*
pArray
)
{
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
...
...
@@ -887,12 +896,21 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
}
pVgid
->
dnodeId
=
pDnode
->
id
;
pVgid
->
syncState
=
TAOS_SYNC_STATE_
ERROR
;
pVgid
->
syncState
=
TAOS_SYNC_STATE_
OFFLINE
;
mInfo
(
"db:%s, vgId:%d, vn:%d is added, memory:%"
PRId64
", dnode:%d avail:%"
PRId64
" used:%"
PRId64
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
pVgroup
->
replica
,
vgMem
,
pVgid
->
dnodeId
,
pDnode
->
memAvail
,
pDnode
->
memUsed
);
pVgroup
->
replica
++
;
pDnode
->
numOfVnodes
++
;
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pVgRaw
)
!=
0
)
{
sdbFreeRaw
(
pVgRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_READY
);
return
0
;
}
...
...
@@ -901,7 +919,8 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
return
-
1
;
}
int32_t
mndRemoveVnodeFromVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
,
SVnodeGid
*
pDelVgid
)
{
static
int32_t
mndRemoveVnodeFromVgroup
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SVgObj
*
pVgroup
,
SArray
*
pArray
,
SVnodeGid
*
pDelVgid
)
{
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
...
...
@@ -941,6 +960,15 @@ _OVER:
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
vn
];
mInfo
(
"db:%s, vgId:%d, vn:%d dnode:%d is reserved"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
vn
,
pVgid
->
dnodeId
);
}
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pVgRaw
)
!=
0
)
{
sdbFreeRaw
(
pVgRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_READY
);
return
0
;
}
...
...
@@ -1088,7 +1116,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
if
(
!
force
)
{
mInfo
(
"vgId:%d, will add 1 vnode"
,
pVgroup
->
vgId
);
if
(
mndAddVnodeToVgroup
(
pMnode
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
-
1
;
++
i
)
{
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
newVg
.
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
}
...
...
@@ -1100,6 +1128,16 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
SVnodeGid
del
=
newVg
.
vnodeGid
[
vnIndex
];
newVg
.
vnodeGid
[
vnIndex
]
=
newVg
.
vnodeGid
[
newVg
.
replica
];
memset
(
&
newVg
.
vnodeGid
[
newVg
.
replica
],
0
,
sizeof
(
SVnodeGid
));
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
}
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
&
del
,
true
)
!=
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
;
++
i
)
{
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
newVg
.
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
...
...
@@ -1107,11 +1145,20 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
)
!=
0
)
return
-
1
;
}
else
{
mInfo
(
"vgId:%d, will add 1 vnode and force remove 1 vnode"
,
pVgroup
->
vgId
);
if
(
mndAddVnodeToVgroup
(
pMnode
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
newVg
.
replica
--
;
SVnodeGid
del
=
newVg
.
vnodeGid
[
vnIndex
];
newVg
.
vnodeGid
[
vnIndex
]
=
newVg
.
vnodeGid
[
newVg
.
replica
];
memset
(
&
newVg
.
vnodeGid
[
newVg
.
replica
],
0
,
sizeof
(
SVnodeGid
));
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
}
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
;
++
i
)
{
if
(
i
!=
vnIndex
)
{
...
...
@@ -1128,16 +1175,12 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
||
mndTransAppendRedolog
(
pTrans
,
pRaw
)
!=
0
)
return
-
1
;
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
pRaw
=
NULL
;
}
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
return
-
1
;
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
pRaw
=
NULL
;
}
mInfo
(
"vgId:%d, vgroup info after move, replica:%d"
,
newVg
.
vgId
,
newVg
.
replica
);
...
...
@@ -1193,7 +1236,15 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
SVnodeGid
*
pGid
=
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
];
pVgroup
->
replica
++
;
pGid
->
dnodeId
=
newDnodeId
;
pGid
->
syncState
=
TAOS_SYNC_STATE_ERROR
;
pGid
->
syncState
=
TAOS_SYNC_STATE_OFFLINE
;
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pVgRaw
)
!=
0
)
{
sdbFreeRaw
(
pVgRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_READY
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
replica
-
1
;
++
i
)
{
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
pVgroup
,
pVgroup
->
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
...
...
@@ -1224,6 +1275,14 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
memcpy
(
pGid
,
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
],
sizeof
(
SVnodeGid
));
memset
(
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
],
0
,
sizeof
(
SVnodeGid
));
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pVgRaw
)
!=
0
)
{
sdbFreeRaw
(
pVgRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_READY
);
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
pVgroup
,
&
delGid
,
true
)
!=
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
pVgroup
,
pVgroup
->
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
...
...
@@ -1236,9 +1295,8 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
static
int32_t
mndRedistributeVgroup
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SDnodeObj
*
pNew1
,
SDnodeObj
*
pOld1
,
SDnodeObj
*
pNew2
,
SDnodeObj
*
pOld2
,
SDnodeObj
*
pNew3
,
SDnodeObj
*
pOld3
)
{
int32_t
code
=
-
1
;
SSdbRaw
*
pRaw
=
NULL
;
STrans
*
pTrans
=
NULL
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
NULL
;
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
pReq
,
"red-vgroup"
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
...
...
@@ -1319,17 +1377,13 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
}
{
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
||
mndTransAppendRedolog
(
pTrans
,
pRaw
)
!=
0
)
goto
_OVER
;
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
pRaw
=
NULL
;
}
{
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
goto
_OVER
;
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
pRaw
=
NULL
;
}
mInfo
(
"vgId:%d, vgroup info after redistribute, replica:%d"
,
newVg
.
vgId
,
newVg
.
replica
);
...
...
@@ -1342,7 +1396,6 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
_OVER:
mndTransDrop
(
pTrans
);
sdbFreeRaw
(
pRaw
);
mndReleaseDb
(
pMnode
,
pDb
);
return
code
;
}
...
...
@@ -1593,13 +1646,13 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
mInfo
(
"db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
pVgroup
->
vnodeGid
[
0
].
dnodeId
);
if
(
mndAddVnodeToVgroup
(
pMnode
,
&
newVgroup
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVgroup
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
,
newVgroup
.
vnodeGid
[
0
].
dnodeId
)
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
1
])
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
)
!=
0
)
return
-
1
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
&
newVgroup
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVgroup
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
,
newVgroup
.
vnodeGid
[
0
].
dnodeId
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
,
newVgroup
.
vnodeGid
[
1
].
dnodeId
)
!=
0
)
...
...
@@ -1612,7 +1665,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
SVnodeGid
del1
=
{
0
};
SVnodeGid
del2
=
{
0
};
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
&
newVgroup
,
pArray
,
&
del1
)
!=
0
)
return
-
1
;
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
pTrans
,
&
newVgroup
,
pArray
,
&
del1
)
!=
0
)
return
-
1
;
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
,
&
del1
,
true
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
,
newVgroup
.
vnodeGid
[
0
].
dnodeId
)
!=
0
)
return
-
1
;
...
...
@@ -1620,7 +1673,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
return
-
1
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
)
!=
0
)
return
-
1
;
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
&
newVgroup
,
pArray
,
&
del2
)
!=
0
)
return
-
1
;
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
pTrans
,
&
newVgroup
,
pArray
,
&
del2
)
!=
0
)
return
-
1
;
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
,
&
del2
,
true
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pNewDb
,
&
newVgroup
,
newVgroup
.
vnodeGid
[
0
].
dnodeId
)
!=
0
)
return
-
1
;
...
...
@@ -1629,16 +1682,6 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
return
-
1
;
}
{
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
&
newVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pVgRaw
)
!=
0
)
{
sdbFreeRaw
(
pVgRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_READY
);
}
{
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
&
newVgroup
);
if
(
pVgRaw
==
NULL
)
return
-
1
;
...
...
@@ -1657,10 +1700,9 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
}
static
int32_t
mndSplitVgroup
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
int32_t
code
=
-
1
;
SSdbRaw
*
pRaw
=
NULL
;
STrans
*
pTrans
=
NULL
;
SArray
*
pArray
=
mndBuildDnodesArray
(
pMnode
,
0
);
int32_t
code
=
-
1
;
STrans
*
pTrans
=
NULL
;
SArray
*
pArray
=
mndBuildDnodesArray
(
pMnode
,
0
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
pReq
,
"split-vgroup"
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
...
...
@@ -1676,13 +1718,13 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
}
if
(
newVg1
.
replica
==
1
)
{
if
(
mndAddVnodeToVgroup
(
pMnode
,
&
newVg1
,
pArray
)
!=
0
)
goto
_OVER
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVg1
,
pArray
)
!=
0
)
goto
_OVER
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg1
,
newVg1
.
vnodeGid
[
0
].
dnodeId
)
!=
0
)
goto
_OVER
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg1
,
&
newVg1
.
vnodeGid
[
1
])
!=
0
)
goto
_OVER
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg1
)
!=
0
)
goto
_OVER
;
}
else
if
(
newVg1
.
replica
==
3
)
{
SVnodeGid
del1
=
{
0
};
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
&
newVg1
,
pArray
,
&
del1
)
!=
0
)
goto
_OVER
;
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
pTrans
,
&
newVg1
,
pArray
,
&
del1
)
!=
0
)
goto
_OVER
;
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg1
,
&
del1
,
true
)
!=
0
)
goto
_OVER
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg1
,
newVg1
.
vnodeGid
[
0
].
dnodeId
)
!=
0
)
goto
_OVER
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg1
,
newVg1
.
vnodeGid
[
1
].
dnodeId
)
!=
0
)
goto
_OVER
;
...
...
@@ -1727,17 +1769,23 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
#endif
{
pRaw
=
mndVgroupActionEncode
(
&
newVg1
);
if
(
pRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
goto
_OVER
;
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg1
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
pRaw
=
NULL
;
}
{
pRaw
=
mndVgroupActionEncode
(
&
newVg2
);
if
(
pRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
goto
_OVER
;
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg2
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
pRaw
=
NULL
;
}
mInfo
(
"vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d"
,
newVg1
.
vgId
,
...
...
@@ -1757,7 +1805,6 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
_OVER:
taosArrayDestroy
(
pArray
);
mndTransDrop
(
pTrans
);
sdbFreeRaw
(
pRaw
);
return
code
;
}
...
...
@@ -1802,16 +1849,8 @@ static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SD
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
||
mndTransAppendRedolog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
}
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
90371894
...
...
@@ -2543,6 +2543,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
goto
_err
;
}
// TODO: opt the perf of read del index
code
=
tsdbReadDelIdx
(
pDelFReader
,
aDelIdx
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
aDelIdx
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
90371894
...
...
@@ -515,6 +515,16 @@ void vnodeSyncPreClose(SVnode *pVnode) {
vInfo
(
"vgId:%d, pre close sync"
,
pVnode
->
config
.
vgId
);
syncLeaderTransfer
(
pVnode
->
sync
);
syncPreStop
(
pVnode
->
sync
);
while
(
syncSnapshotRecving
(
pVnode
->
sync
))
{
vInfo
(
"vgId:%d, snapshot is recving"
,
pVnode
->
config
.
vgId
);
taosMsleep
(
300
);
}
while
(
syncSnapshotSending
(
pVnode
->
sync
))
{
vInfo
(
"vgId:%d, snapshot is sending"
,
pVnode
->
config
.
vgId
);
taosMsleep
(
300
);
}
taosThreadMutexLock
(
&
pVnode
->
lock
);
if
(
pVnode
->
blocked
)
{
vInfo
(
"vgId:%d, post block after close sync"
,
pVnode
->
config
.
vgId
);
...
...
source/libs/executor/inc/executil.h
浏览文件 @
90371894
...
...
@@ -161,4 +161,6 @@ int32_t convertFillType(int32_t mode);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
isQualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
90371894
...
...
@@ -566,23 +566,6 @@ typedef struct SStreamIntervalOperatorInfo {
SWinKey
delKey
;
}
SStreamIntervalOperatorInfo
;
typedef
struct
SFillOperatorInfo
{
struct
SFillInfo
*
pFillInfo
;
SSDataBlock
*
pRes
;
SSDataBlock
*
pFinalRes
;
int64_t
totalInputRows
;
void
**
p
;
SSDataBlock
*
existNewGroupBlock
;
STimeWindow
win
;
SColMatchInfo
matchInfo
;
int32_t
primaryTsCol
;
int32_t
primarySrcSlotId
;
uint64_t
curGroupId
;
// current handled group id
SExprInfo
*
pExprInfo
;
int32_t
numOfExpr
;
SExprSupp
noFillExprSupp
;
}
SFillOperatorInfo
;
typedef
struct
SDataGroupInfo
{
uint64_t
groupId
;
int64_t
numOfRows
;
...
...
@@ -834,8 +817,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
getMaximumIdleDurationSec
();
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
...
...
@@ -853,9 +834,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool
isDeletedStreamWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SStreamState
*
pState
,
STimeWindowAggSupp
*
pTwSup
);
void
appendOneRowToStreamSpecialBlock
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
,
uint64_t
*
pGp
,
void
*
pTbName
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
int32_t
finalizeResultRows
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/inc/tfill.h
浏览文件 @
90371894
...
...
@@ -25,6 +25,8 @@ extern "C" {
#include "tcommon.h"
#include "tsimplehash.h"
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
struct
SSDataBlock
;
typedef
struct
SFillColInfo
{
...
...
@@ -113,12 +115,12 @@ typedef struct SStreamFillInfo {
int64_t
getNumOfResultsAfterFillGap
(
SFillInfo
*
pFillInfo
,
int64_t
ekey
,
int32_t
maxNumOfRows
);
void
taosFillSetStartInfo
(
struct
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
);
void
taosResetFillInfo
(
struct
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
);
void
taosFillSetInputDataBlock
(
struct
SFillInfo
*
pFillInfo
,
const
struct
SSDataBlock
*
pInput
);
struct
SFillColInfo
*
createFillColInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfFillExpr
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
const
struct
SNodeListNode
*
val
);
bool
taosFillHasMoreResults
(
struct
SFillInfo
*
pFillInfo
);
void
taosFillSetStartInfo
(
struct
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
);
void
taosResetFillInfo
(
struct
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
);
void
taosFillSetInputDataBlock
(
struct
SFillInfo
*
pFillInfo
,
const
struct
SSDataBlock
*
pInput
);
SFillColInfo
*
createFillColInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfFillExpr
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
const
struct
SNodeListNode
*
val
);
bool
taosFillHasMoreResults
(
struct
SFillInfo
*
pFillInfo
);
SFillInfo
*
taosCreateFillInfo
(
TSKEY
skey
,
int32_t
numOfFillCols
,
int32_t
numOfNotFillCols
,
int32_t
capacity
,
SInterval
*
pInterval
,
int32_t
fillType
,
struct
SFillColInfo
*
pCol
,
int32_t
slotId
,
...
...
@@ -128,6 +130,8 @@ void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
int64_t
taosFillResultDataBlock
(
struct
SFillInfo
*
pFillInfo
,
SSDataBlock
*
p
,
int32_t
capacity
);
int64_t
getFillInfoStart
(
struct
SFillInfo
*
pFillInfo
);
bool
fillIfWindowPseudoColumn
(
SFillInfo
*
pFillInfo
,
SFillColInfo
*
pCol
,
SColumnInfoData
*
pDstColInfoData
,
int32_t
rowIndex
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
90371894
...
...
@@ -15,26 +15,15 @@
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tref.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tsort.h"
#include "ttime.h"
#include "executorimpl.h"
#include "index.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
typedef
struct
SFetchRspHandleWrapper
{
uint32_t
exchangeId
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
90371894
...
...
@@ -2003,3 +2003,13 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return
TSDB_CODE_SUCCESS
;
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"===stream===printDataBlock: Block is Null or Empty"
);
return
;
}
char
*
pBuf
=
NULL
;
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
));
taosMemoryFree
(
pBuf
);
}
\ No newline at end of file
source/libs/executor/src/executor.c
浏览文件 @
90371894
...
...
@@ -704,6 +704,20 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
return
TSDB_CODE_SUCCESS
;
}
static
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
"load block SMA:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
if
(
pTaskInfo
==
NULL
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
90371894
...
...
@@ -91,7 +91,6 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
static
void
releaseQueryBuf
(
size_t
numOfTables
);
static
void
destroyFillOperatorInfo
(
void
*
param
);
static
void
destroyAggOperatorInfo
(
void
*
param
);
static
void
initCtxOutputBuffer
(
SqlFunctionCtx
*
pCtx
,
int32_t
size
);
static
void
doSetTableGroupOutputBuf
(
SOperatorInfo
*
pOperator
,
int32_t
numOfOutput
,
uint64_t
groupId
);
...
...
@@ -1157,20 +1156,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
}
}
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
"load block SMA:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
}
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
...
...
@@ -1603,179 +1588,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return
(
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
static
void
doHandleRemainBlockForNewGroupImpl
(
SOperatorInfo
*
pOperator
,
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
pInfo
->
totalInputRows
=
pInfo
->
existNewGroupBlock
->
info
.
rows
;
SSDataBlock
*
pResBlock
=
pInfo
->
pFinalRes
;
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
);
int64_t
ekey
=
pInfo
->
existNewGroupBlock
->
info
.
window
.
ekey
;
taosResetFillInfo
(
pInfo
->
pFillInfo
,
getFillInfoStart
(
pInfo
->
pFillInfo
));
blockDataCleanup
(
pInfo
->
pRes
);
doApplyScalarCalculation
(
pOperator
,
pInfo
->
existNewGroupBlock
,
order
,
scanFlag
);
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
ekey
);
taosFillSetInputDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
);
int32_t
numOfResultRows
=
pResultInfo
->
capacity
-
pResBlock
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pResBlock
,
numOfResultRows
);
pInfo
->
curGroupId
=
pInfo
->
existNewGroupBlock
->
info
.
id
.
groupId
;
pInfo
->
existNewGroupBlock
=
NULL
;
}
static
void
doHandleRemainBlockFromNewGroup
(
SOperatorInfo
*
pOperator
,
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
if
(
taosFillHasMoreResults
(
pInfo
->
pFillInfo
))
{
int32_t
numOfResultRows
=
pResultInfo
->
capacity
-
pInfo
->
pFinalRes
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pFinalRes
,
numOfResultRows
);
pInfo
->
pRes
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
;
}
// handle the cached new group data block
if
(
pInfo
->
existNewGroupBlock
)
{
doHandleRemainBlockForNewGroupImpl
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
}
}
static
void
doApplyScalarCalculation
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
setInputDataBlock
(
pSup
,
pBlock
,
order
,
scanFlag
,
false
);
projectApplyFunctions
(
pSup
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
NULL
);
// reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
pInfo
->
pRes
->
info
.
rows
=
0
;
SExprSupp
*
pNoFillSupp
=
&
pInfo
->
noFillExprSupp
;
setInputDataBlock
(
pNoFillSupp
,
pBlock
,
order
,
scanFlag
,
false
);
projectApplyFunctions
(
pNoFillSupp
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pNoFillSupp
->
pCtx
,
pNoFillSupp
->
numOfExprs
,
NULL
);
pInfo
->
pRes
->
info
.
id
.
groupId
=
pBlock
->
info
.
id
.
groupId
;
}
static
SSDataBlock
*
doFillImpl
(
SOperatorInfo
*
pOperator
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
SSDataBlock
*
pResBlock
=
pInfo
->
pFinalRes
;
blockDataCleanup
(
pResBlock
);
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
);
doHandleRemainBlockFromNewGroup
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>
0
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
SOperatorInfo
*
pDownstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
SSDataBlock
*
pBlock
=
pDownstream
->
fpSet
.
getNextFn
(
pDownstream
);
if
(
pBlock
==
NULL
)
{
if
(
pInfo
->
totalInputRows
==
0
)
{
setOperatorCompleted
(
pOperator
);
return
NULL
;
}
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
pInfo
->
win
.
ekey
);
}
else
{
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primarySrcSlotId
);
blockDataCleanup
(
pInfo
->
pRes
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pBlock
->
info
.
rows
);
doApplyScalarCalculation
(
pOperator
,
pBlock
,
order
,
scanFlag
);
if
(
pInfo
->
curGroupId
==
0
||
pInfo
->
curGroupId
==
pInfo
->
pRes
->
info
.
id
.
groupId
)
{
pInfo
->
curGroupId
=
pInfo
->
pRes
->
info
.
id
.
groupId
;
// the first data block
pInfo
->
totalInputRows
+=
pInfo
->
pRes
->
info
.
rows
;
if
(
order
==
pInfo
->
pFillInfo
->
order
)
{
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
pBlock
->
info
.
window
.
ekey
);
}
else
{
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
pBlock
->
info
.
window
.
skey
);
}
taosFillSetInputDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
);
}
else
if
(
pInfo
->
curGroupId
!=
pBlock
->
info
.
id
.
groupId
)
{
// the new group data block
pInfo
->
existNewGroupBlock
=
pBlock
;
// Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
pInfo
->
win
.
ekey
);
}
}
int32_t
numOfResultRows
=
pOperator
->
resultInfo
.
capacity
-
pResBlock
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pResBlock
,
numOfResultRows
);
// current group has no more result to return
if
(
pResBlock
->
info
.
rows
>
0
)
{
// 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if
(
pResBlock
->
info
.
rows
>
pResultInfo
->
threshold
||
pBlock
==
NULL
||
pInfo
->
existNewGroupBlock
!=
NULL
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
doHandleRemainBlockFromNewGroup
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
||
pBlock
==
NULL
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
}
else
if
(
pInfo
->
existNewGroupBlock
)
{
// try next group
assert
(
pBlock
!=
NULL
);
blockDataCleanup
(
pResBlock
);
doHandleRemainBlockForNewGroupImpl
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>
pResultInfo
->
threshold
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
}
else
{
return
NULL
;
}
}
}
static
SSDataBlock
*
doFill
(
SOperatorInfo
*
pOperator
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SSDataBlock
*
fillResult
=
NULL
;
while
(
true
)
{
fillResult
=
doFillImpl
(
pOperator
);
if
(
fillResult
==
NULL
)
{
setOperatorCompleted
(
pOperator
);
break
;
}
doFilter
(
fillResult
,
pOperator
->
exprSupp
.
pFilterInfo
,
&
pInfo
->
matchInfo
);
if
(
fillResult
->
info
.
rows
>
0
)
{
break
;
}
}
if
(
fillResult
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
fillResult
->
info
.
rows
;
}
return
fillResult
;
}
void
destroyExprInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfExprs
)
{
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pExpr
[
i
];
...
...
@@ -2045,167 +1857,6 @@ void destroyAggOperatorInfo(void* param) {
taosMemoryFreeClear
(
param
);
}
void
destroyFillOperatorInfo
(
void
*
param
)
{
SFillOperatorInfo
*
pInfo
=
(
SFillOperatorInfo
*
)
param
;
pInfo
->
pFillInfo
=
taosDestroyFillInfo
(
pInfo
->
pFillInfo
);
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
pInfo
->
pFinalRes
=
blockDataDestroy
(
pInfo
->
pFinalRes
);
cleanupExprSupp
(
&
pInfo
->
noFillExprSupp
);
taosMemoryFreeClear
(
pInfo
->
p
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
param
);
}
static
int32_t
initFillInfo
(
SFillOperatorInfo
*
pInfo
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
SNodeListNode
*
pValNode
,
STimeWindow
win
,
int32_t
capacity
,
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
,
int32_t
order
)
{
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfCols
,
pNotFillExpr
,
numOfNotFillCols
,
pValNode
);
int64_t
startKey
=
(
order
==
TSDB_ORDER_ASC
)
?
win
.
skey
:
win
.
ekey
;
STimeWindow
w
=
getAlignQueryTimeWindow
(
pInterval
,
pInterval
->
precision
,
startKey
);
w
=
getFirstQualifiedTimeWindow
(
startKey
,
&
w
,
pInterval
,
order
);
pInfo
->
pFillInfo
=
taosCreateFillInfo
(
w
.
skey
,
numOfCols
,
numOfNotFillCols
,
capacity
,
pInterval
,
fillType
,
pColInfo
,
pInfo
->
primaryTsCol
,
order
,
id
);
if
(
order
==
TSDB_ORDER_ASC
)
{
pInfo
->
win
.
skey
=
win
.
skey
;
pInfo
->
win
.
ekey
=
win
.
ekey
;
}
else
{
pInfo
->
win
.
skey
=
win
.
ekey
;
pInfo
->
win
.
ekey
=
win
.
skey
;
}
pInfo
->
p
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
if
(
pInfo
->
pFillInfo
==
NULL
||
pInfo
->
p
==
NULL
)
{
taosMemoryFree
(
pInfo
->
pFillInfo
);
taosMemoryFree
(
pInfo
->
p
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
return
TSDB_CODE_SUCCESS
;
}
}
static
bool
isWstartColumnExist
(
SFillOperatorInfo
*
pInfo
)
{
if
(
pInfo
->
noFillExprSupp
.
numOfExprs
==
0
)
{
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
pInfo
->
noFillExprSupp
.
numOfExprs
;
++
i
)
{
SExprInfo
*
exprInfo
=
pInfo
->
noFillExprSupp
.
pExprInfo
+
i
;
if
(
exprInfo
->
pExpr
->
nodeType
==
QUERY_NODE_COLUMN
&&
exprInfo
->
base
.
numOfParams
==
1
&&
exprInfo
->
base
.
pParam
[
0
].
pCol
->
colType
==
COLUMN_TYPE_WINDOW_START
)
{
return
true
;
}
}
return
false
;
}
static
int32_t
createPrimaryTsExprIfNeeded
(
SFillOperatorInfo
*
pInfo
,
SFillPhysiNode
*
pPhyFillNode
,
SExprSupp
*
pExprSupp
,
const
char
*
idStr
)
{
bool
wstartExist
=
isWstartColumnExist
(
pInfo
);
if
(
wstartExist
==
false
)
{
if
(
pPhyFillNode
->
pWStartTs
->
type
!=
QUERY_NODE_TARGET
)
{
qError
(
"pWStartTs of fill physical node is not a target node, %s"
,
idStr
);
return
TSDB_CODE_QRY_SYS_ERROR
;
}
SExprInfo
*
pExpr
=
taosMemoryRealloc
(
pExprSupp
->
pExprInfo
,
(
pExprSupp
->
numOfExprs
+
1
)
*
sizeof
(
SExprInfo
));
if
(
pExpr
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
createExprFromTargetNode
(
&
pExpr
[
pExprSupp
->
numOfExprs
],
(
STargetNode
*
)
pPhyFillNode
->
pWStartTs
);
pExprSupp
->
numOfExprs
+=
1
;
pExprSupp
->
pExprInfo
=
pExpr
;
}
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SFillOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SFillOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
pRes
=
createDataBlockFromDescNode
(
pPhyFillNode
->
node
.
pOutputDataBlockDesc
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pFillExprs
,
NULL
,
&
pInfo
->
numOfExpr
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
SExprSupp
*
pNoFillSupp
=
&
pInfo
->
noFillExprSupp
;
pNoFillSupp
->
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pNotFillExprs
,
NULL
,
&
pNoFillSupp
->
numOfExprs
);
int32_t
code
=
createPrimaryTsExprIfNeeded
(
pInfo
,
pPhyFillNode
,
pNoFillSupp
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
initExprSupp
(
pNoFillSupp
,
pNoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
SInterval
*
pInterval
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
==
downstream
->
operatorType
?
&
((
SMergeAlignedIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
intervalAggOperatorInfo
->
interval
:
&
((
SIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
interval
;
int32_t
order
=
(
pPhyFillNode
->
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
type
=
convertFillType
(
pPhyFillNode
->
mode
);
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
pInfo
->
numOfExpr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
primaryTsCol
=
((
STargetNode
*
)
pPhyFillNode
->
pWStartTs
)
->
slotId
;
pInfo
->
primarySrcSlotId
=
((
SColumnNode
*
)((
STargetNode
*
)
pPhyFillNode
->
pWStartTs
)
->
pExpr
)
->
slotId
;
int32_t
numOfOutputCols
=
0
;
code
=
extractColMatchInfo
(
pPhyFillNode
->
pFillExprs
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
&
pInfo
->
matchInfo
);
code
=
initFillInfo
(
pInfo
,
pExprInfo
,
pInfo
->
numOfExpr
,
pNoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
,
(
SNodeListNode
*
)
pPhyFillNode
->
pValues
,
pPhyFillNode
->
timeRange
,
pResultInfo
->
capacity
,
pTaskInfo
->
id
.
str
,
pInterval
,
type
,
order
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
pFinalRes
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pOperator
->
resultInfo
.
capacity
);
code
=
filterInitFromNode
((
SNode
*
)
pPhyFillNode
->
node
.
pConditions
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
setOperatorInfo
(
pOperator
,
"FillOperator"
,
QUERY_NODE_PHYSICAL_PLAN_FILL
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
pInfo
->
numOfExpr
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doFill
,
NULL
,
destroyFillOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
if
(
pInfo
!=
NULL
)
{
destroyFillOperatorInfo
(
pInfo
);
}
pTaskInfo
->
code
=
code
;
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
static
SExecTaskInfo
*
createExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
)
{
SExecTaskInfo
*
pTaskInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecTaskInfo
));
if
(
pTaskInfo
==
NULL
)
{
...
...
source/libs/executor/src/filloperator.c
0 → 100644
浏览文件 @
90371894
此差异已折叠。
点击以展开。
source/libs/executor/src/scanoperator.c
浏览文件 @
90371894
...
...
@@ -1051,6 +1051,9 @@ static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
}
static
bool
prepareRangeScan
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
int32_t
*
pRowIndex
)
{
if
(
pBlock
->
info
.
rows
==
0
)
{
return
false
;
}
if
((
*
pRowIndex
)
==
pBlock
->
info
.
rows
)
{
return
false
;
}
...
...
@@ -1183,10 +1186,10 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
}
static
int32_t
generateSessionScanRange
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pSrcBlock
,
SSDataBlock
*
pDestBlock
)
{
blockDataCleanup
(
pDestBlock
);
if
(
pSrcBlock
->
info
.
rows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
blockDataCleanup
(
pDestBlock
);
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
pSrcBlock
->
info
.
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -1335,30 +1338,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return
code
;
}
static
void
calBlockTag
(
SExprSupp
*
pTagCalSup
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pResBlock
)
{
if
(
pTagCalSup
==
NULL
||
pTagCalSup
->
numOfExprs
==
0
)
return
;
if
(
pBlock
==
NULL
||
pBlock
->
info
.
rows
==
0
)
return
;
SSDataBlock
*
pSrcBlock
=
blockCopyOneRow
(
pBlock
,
0
);
ASSERT
(
pSrcBlock
->
info
.
rows
==
1
);
blockDataEnsureCapacity
(
pResBlock
,
1
);
projectApplyFunctions
(
pTagCalSup
->
pExprInfo
,
pResBlock
,
pSrcBlock
,
pTagCalSup
->
pCtx
,
1
,
NULL
);
ASSERT
(
pResBlock
->
info
.
rows
==
1
);
// build tagArray
/*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
/*STagVal tagVal = {*/
/*.cid = 0,*/
/*.type = 0,*/
/*};*/
// build STag
// set STag
blockDataDestroy
(
pSrcBlock
);
}
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
SExprSupp
*
pTbNameCalSup
=
&
pInfo
->
tbnameCalSup
;
SStreamState
*
pState
=
pInfo
->
pStreamScanOp
->
pTaskInfo
->
streamInfo
.
pState
;
...
...
@@ -1836,6 +1815,12 @@ FETCH_NEXT_BLOCK:
}
setBlockGroupIdByUid
(
pInfo
,
pDelBlock
);
printDataBlock
(
pDelBlock
,
"stream scan delete recv filtered"
);
if
(
pDelBlock
->
info
.
rows
==
0
)
{
if
(
pInfo
->
tqReader
)
{
blockDataDestroy
(
pDelBlock
);
}
goto
FETCH_NEXT_BLOCK
;
}
if
(
!
isIntervalWindow
(
pInfo
)
&&
!
isSessionWindow
(
pInfo
)
&&
!
isStateWindow
(
pInfo
))
{
generateDeleteResultBlock
(
pInfo
,
pDelBlock
,
pInfo
->
pDeleteDataRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
...
...
source/libs/executor/src/tfill.c
浏览文件 @
90371894
此差异已折叠。
点击以展开。
source/libs/executor/src/timesliceoperator.c
浏览文件 @
90371894
...
...
@@ -467,11 +467,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// add current row if timestamp match
if
(
ts
==
pSliceInfo
->
current
&&
pSliceInfo
->
current
<=
pSliceInfo
->
win
.
ekey
)
{
addCurrentRowToResult
(
pSliceInfo
,
&
pOperator
->
exprSupp
,
pResBlock
,
pBlock
,
i
);
doKeepPrevRows
(
pSliceInfo
,
pBlock
,
i
);
pSliceInfo
->
current
=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
}
doKeepPrevRows
(
pSliceInfo
,
pBlock
,
i
);
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
setOperatorCompleted
(
pOperator
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
90371894
...
...
@@ -677,16 +677,6 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
}
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"===stream===printDataBlock: Block is Null or Empty"
);
return
;
}
char
*
pBuf
=
NULL
;
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
));
taosMemoryFree
(
pBuf
);
}
typedef
int32_t
(
*
__compare_fn_t
)(
void
*
pKey
,
void
*
data
,
int32_t
index
);
int32_t
binarySearchCom
(
void
*
keyList
,
int
num
,
void
*
pKey
,
int
order
,
__compare_fn_t
comparefn
)
{
...
...
@@ -3854,7 +3844,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
int32_t
numOfOutput
=
pOperator
->
exprSupp
.
numOfExprs
;
int64_t
groupId
=
pSDataBlock
->
info
.
id
.
groupId
;
uint64_t
groupId
=
pSDataBlock
->
info
.
id
.
groupId
;
int64_t
code
=
TSDB_CODE_SUCCESS
;
TSKEY
*
tsCols
=
NULL
;
SResultRow
*
pResult
=
NULL
;
...
...
source/libs/function/src/detail/tavgfunction.c
浏览文件 @
90371894
...
...
@@ -133,6 +133,14 @@ static void i8VectorSumAVX2(const int8_t* plist, int32_t numOfRows, int32_t type
sum
=
_mm256_add_epi64
(
sum
,
extVal
);
p
+=
width
;
}
// let sum up the final results
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
isum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
rounds
*
width
];
}
}
else
{
const
uint8_t
*
p
=
(
const
uint8_t
*
)
plist
;
...
...
@@ -142,16 +150,16 @@ static void i8VectorSumAVX2(const int8_t* plist, int32_t numOfRows, int32_t type
sum
=
_mm256_add_epi64
(
sum
,
extVal
);
p
+=
width
;
}
}
// let sum up the final results
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
i
sum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
// let sum up the final results
const
uint64_t
*
q
=
(
const
u
int64_t
*
)
&
sum
;
pRes
->
sum
.
u
sum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
int32_t
startIndex
=
rounds
*
width
;
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
startIndex
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
usum
+=
(
uint8_t
)
plist
[
j
+
rounds
*
width
];
}
}
#endif
}
...
...
@@ -176,8 +184,16 @@ static void i16VectorSumAVX2(const int16_t* plist, int32_t numOfRows, int32_t ty
sum
=
_mm256_add_epi64
(
sum
,
extVal
);
p
+=
width
;
}
// let sum up the final results
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
isum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
rounds
*
width
];
}
}
else
{
const
uint
8_t
*
p
=
(
const
uint8
_t
*
)
plist
;
const
uint
16_t
*
p
=
(
const
uint16
_t
*
)
plist
;
for
(
int32_t
i
=
0
;
i
<
rounds
;
++
i
)
{
__m128i
val
=
_mm_lddqu_si128
((
__m128i
*
)
p
);
...
...
@@ -185,16 +201,16 @@ static void i16VectorSumAVX2(const int16_t* plist, int32_t numOfRows, int32_t ty
sum
=
_mm256_add_epi64
(
sum
,
extVal
);
p
+=
width
;
}
}
// let sum up the final results
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
i
sum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
// let sum up the final results
const
uint64_t
*
q
=
(
const
u
int64_t
*
)
&
sum
;
pRes
->
sum
.
u
sum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
int32_t
startIndex
=
rounds
*
width
;
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
startIndex
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
usum
+=
(
uint16_t
)
plist
[
j
+
rounds
*
width
];
}
}
#endif
}
...
...
@@ -219,6 +235,14 @@ static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, int32_t ty
sum
=
_mm256_add_epi64
(
sum
,
extVal
);
p
+=
width
;
}
// let sum up the final results
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
isum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
rounds
*
width
];
}
}
else
{
const
uint32_t
*
p
=
(
const
uint32_t
*
)
plist
;
...
...
@@ -228,16 +252,16 @@ static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, int32_t ty
sum
=
_mm256_add_epi64
(
sum
,
extVal
);
p
+=
width
;
}
}
// let sum up the final results
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
i
sum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
// let sum up the final results
const
uint64_t
*
q
=
(
const
u
int64_t
*
)
&
sum
;
pRes
->
sum
.
u
sum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
int32_t
startIndex
=
rounds
*
width
;
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
startIndex
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
usum
+=
(
uint32_t
)
plist
[
j
+
rounds
*
width
];
}
}
#endif
}
...
...
@@ -262,13 +286,22 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
}
// let sum up the final results
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
isum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
if
(
type
==
TSDB_DATA_TYPE_BIGINT
)
{
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
isum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
int32_t
startIndex
=
rounds
*
width
;
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
startIndex
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
rounds
*
width
];
}
}
else
{
const
uint64_t
*
q
=
(
const
uint64_t
*
)
&
sum
;
pRes
->
sum
.
usum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
usum
+=
(
uint64_t
)
plist
[
j
+
rounds
*
width
];
}
}
#endif
}
...
...
@@ -502,7 +535,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
i8VectorSumAVX2
(
plist
,
numOfRows
,
type
,
pAvgRes
);
}
else
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
pAvgRes
->
sum
.
usum
+=
plist
[
i
];
if
(
type
==
TSDB_DATA_TYPE_TINYINT
)
{
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
else
{
pAvgRes
->
sum
.
usum
+=
(
uint8_t
)
plist
[
i
];
}
}
}
break
;
...
...
@@ -517,7 +554,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
i16VectorSumAVX2
(
plist
,
numOfRows
,
type
,
pAvgRes
);
}
else
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
if
(
type
==
TSDB_DATA_TYPE_SMALLINT
)
{
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
else
{
pAvgRes
->
sum
.
usum
+=
(
uint16_t
)
plist
[
i
];
}
}
}
break
;
...
...
@@ -532,7 +573,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
i32VectorSumAVX2
(
plist
,
numOfRows
,
type
,
pAvgRes
);
}
else
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
if
(
type
==
TSDB_DATA_TYPE_INT
)
{
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
else
{
pAvgRes
->
sum
.
usum
+=
(
uint32_t
)
plist
[
i
];
}
}
}
break
;
...
...
@@ -547,7 +592,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
i64VectorSumAVX2
(
plist
,
numOfRows
,
pAvgRes
);
}
else
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
if
(
type
==
TSDB_DATA_TYPE_BIGINT
)
{
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
else
{
pAvgRes
->
sum
.
isum
+=
(
uint64_t
)
plist
[
i
];
}
}
}
break
;
...
...
source/libs/index/src/indexFilter.c
浏览文件 @
90371894
...
...
@@ -231,8 +231,10 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SIF_ERR_RET
(
sifGetValueFromNode
(
node
,
&
param
->
condValue
));
param
->
colId
=
-
1
;
param
->
colValType
=
(
uint8_t
)(
vn
->
node
.
resType
.
type
);
if
(
strlen
(
vn
->
literal
)
<=
sizeof
(
param
->
colName
))
{
if
(
vn
->
literal
!=
NULL
&&
strlen
(
vn
->
literal
)
<=
sizeof
(
param
->
colName
))
{
memcpy
(
param
->
colName
,
vn
->
literal
,
strlen
(
vn
->
literal
));
}
else
{
param
->
status
=
SFLT_NOT_INDEX
;
}
break
;
}
...
...
source/libs/stream/src/streamUpdate.c
浏览文件 @
90371894
...
...
@@ -44,6 +44,11 @@ static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
}
}
static
void
clearItemHelper
(
void
*
p
)
{
SScalableBf
**
pBf
=
p
;
tScalableBfDestroy
(
*
pBf
);
}
static
void
windowSBfDelete
(
SUpdateInfo
*
pInfo
,
uint64_t
count
)
{
if
(
count
<
pInfo
->
numSBFs
)
{
for
(
uint64_t
i
=
0
;
i
<
count
;
++
i
)
{
...
...
@@ -52,7 +57,7 @@ static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
taosArrayRemove
(
pInfo
->
pTsSBFs
,
0
);
}
}
else
{
taosArrayClear
P
(
pInfo
->
pTsSBFs
,
(
FDelete
)
tScalableBfDestroy
);
taosArrayClear
Ex
(
pInfo
->
pTsSBFs
,
clearItemHelper
);
}
pInfo
->
minTS
+=
pInfo
->
interval
*
count
;
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
90371894
...
...
@@ -1142,12 +1142,21 @@ void syncNodeClose(SSyncNode* pSyncNode) {
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
((
pSyncNode
->
senders
)[
i
]
!=
NULL
)
{
sSTrace
((
pSyncNode
->
senders
)[
i
],
"snapshot sender destroy while close, data:%p"
,
(
pSyncNode
->
senders
)[
i
]);
if
(
snapshotSenderIsStart
((
pSyncNode
->
senders
)[
i
]))
{
snapshotSenderStop
((
pSyncNode
->
senders
)[
i
],
false
);
}
snapshotSenderDestroy
((
pSyncNode
->
senders
)[
i
]);
(
pSyncNode
->
senders
)[
i
]
=
NULL
;
}
}
if
(
pSyncNode
->
pNewNodeReceiver
!=
NULL
)
{
if
(
snapshotReceiverIsStart
(
pSyncNode
->
pNewNodeReceiver
))
{
snapshotReceiverForceStop
(
pSyncNode
->
pNewNodeReceiver
);
}
snapshotReceiverDestroy
(
pSyncNode
->
pNewNodeReceiver
);
pSyncNode
->
pNewNodeReceiver
=
NULL
;
}
...
...
@@ -2458,8 +2467,12 @@ const char* syncStr(ESyncState state) {
return
"candidate"
;
case
TAOS_SYNC_STATE_LEADER
:
return
"leader"
;
default
:
case
TAOS_SYNC_STATE_ERROR
:
return
"error"
;
case
TAOS_SYNC_STATE_OFFLINE
:
return
"offline"
;
default:
return
"unknown"
;
}
}
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
90371894
...
...
@@ -541,6 +541,10 @@ _START_RECEIVER:
taosMsleep
(
10
);
}
if
(
snapshotReceiverIsStart
(
pReceiver
))
{
snapshotReceiverForceStop
(
pReceiver
);
}
snapshotReceiverStart
(
pReceiver
,
pMsg
);
// set start-time same with sender
}
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
90371894
...
...
@@ -169,6 +169,7 @@ typedef struct {
char
spi
:
2
;
char
hasEpSet
:
2
;
// contain epset or not, 0(default): no epset, 1: contain epset
uint64_t
timestamp
;
char
user
[
TSDB_UNI_LEN
];
uint32_t
magicNum
;
STraceId
traceId
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
90371894
...
...
@@ -758,6 +758,14 @@ static void cliSendCb(uv_write_t* req, int status) {
SCliConn
*
pConn
=
transReqQueueRemove
(
req
);
if
(
pConn
==
NULL
)
return
;
SCliMsg
*
pMsg
=
!
transQueueEmpty
(
&
pConn
->
cliMsgs
)
?
transQueueGet
(
&
pConn
->
cliMsgs
,
0
)
:
NULL
;
if
(
pMsg
!=
NULL
)
{
int64_t
cost
=
taosGetTimestampUs
()
-
pMsg
->
st
;
if
(
cost
>
1000
)
{
tWarn
(
"%s conn %p send cost:%dus, send exception"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
(
int
)
cost
);
}
}
if
(
status
==
0
)
{
tTrace
(
"%s conn %p data already was written out"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
}
else
{
...
...
@@ -806,6 +814,7 @@ void cliSend(SCliConn* pConn) {
pHead
->
traceId
=
pMsg
->
info
.
traceId
;
pHead
->
magicNum
=
htonl
(
TRANS_MAGIC_NUM
);
}
pHead
->
timestamp
=
taosHton64
(
taosGetTimestampUs
());
if
(
pHead
->
persist
==
1
)
{
CONN_SET_PERSIST_BY_APP
(
pConn
);
...
...
@@ -1662,6 +1671,7 @@ int transReleaseCliHandle(void* handle) {
SCliMsg
*
cmsg
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliMsg
));
cmsg
->
msg
=
tmsg
;
cmsg
->
st
=
taosGetTimestampUs
();
cmsg
->
type
=
Release
;
cmsg
->
ctx
=
pCtx
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
90371894
...
...
@@ -231,14 +231,29 @@ static bool uvHandleReq(SSvrConn* pConn) {
}
}
STraceId
*
trace
=
&
pHead
->
traceId
;
int64_t
cost
=
taosGetTimestampUs
()
-
taosNtoh64
(
pHead
->
timestamp
);
static
int64_t
EXCEPTION_LIMIT_US
=
100
*
1000
;
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
);
if
(
cost
>=
EXCEPTION_LIMIT_US
)
{
tGWarn
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
(
int
)
cost
);
}
else
{
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
(
int
)
cost
);
}
}
else
{
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
);
if
(
cost
>=
EXCEPTION_LIMIT_US
)
{
tGWarn
(
"%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
,
(
int
)(
cost
));
}
else
{
tGWarn
(
"%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d, cost:%dus"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
,
(
int
)(
cost
));
}
}
// pHead->noResp = 1,
...
...
source/os/src/osSocket.c
浏览文件 @
90371894
...
...
@@ -1103,3 +1103,30 @@ void taosWinSocketInit() {
#else
#endif
}
uint64_t
taosHton64
(
uint64_t
val
)
{
#if defined(WINDOWS) || defined(DARWIN)
return
((
val
&
0x00000000000000ff
)
<<
7
*
8
)
|
((
val
&
0x000000000000ff00
)
<<
5
*
8
)
|
((
val
&
0x0000000000ff0000
)
<<
3
*
8
)
|
((
val
&
0x00000000ff000000
)
<<
1
*
8
)
|
((
val
&
0x000000ff00000000
)
>>
1
*
8
)
|
((
val
&
0x0000ff0000000000
)
>>
3
*
8
)
|
((
val
&
0x00ff000000000000
)
>>
5
*
8
)
|
((
val
&
0xff00000000000000
)
>>
7
*
8
);
#else
if
(
__BYTE_ORDER
==
__LITTLE_ENDIAN
)
{
return
(((
uint64_t
)
htonl
((
int
)((
val
<<
32
)
>>
32
)))
<<
32
)
|
(
unsigned
int
)
htonl
((
int
)(
val
>>
32
));
}
else
if
(
__BYTE_ORDER
==
__BIG_ENDIAN
)
{
return
val
;
}
#endif
}
uint64_t
taosNtoh64
(
uint64_t
val
)
{
#if defined(WINDOWS) || defined(DARWIN)
return
taosHton64
(
val
);
#else
if
(
__BYTE_ORDER
==
__LITTLE_ENDIAN
)
{
return
(((
uint64_t
)
htonl
((
int
)((
val
<<
32
)
>>
32
)))
<<
32
)
|
(
unsigned
int
)
htonl
((
int
)(
val
>>
32
));
}
else
if
(
__BYTE_ORDER
==
__BIG_ENDIAN
)
{
return
val
;
}
#endif
}
source/util/src/tarray.c
浏览文件 @
90371894
...
...
@@ -399,9 +399,7 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
}
void
taosArraySort
(
SArray
*
pArray
,
__compar_fn_t
compar
)
{
assert
(
pArray
!=
NULL
);
assert
(
compar
!=
NULL
);
ASSERT
(
pArray
!=
NULL
&&
compar
!=
NULL
);
taosSort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
compar
);
}
...
...
@@ -417,11 +415,6 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t
return
item
==
NULL
?
-
1
:
(
int32_t
)((
char
*
)
item
-
(
char
*
)
pArray
->
pData
)
/
pArray
->
elemSize
;
}
void
taosArraySortString
(
SArray
*
pArray
,
__compar_fn_t
comparFn
)
{
assert
(
pArray
!=
NULL
);
taosSort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
comparFn
);
}
static
int32_t
taosArrayPartition
(
SArray
*
pArray
,
int32_t
i
,
int32_t
j
,
__ext_compar_fn_t
fn
,
const
void
*
userData
)
{
void
*
key
=
taosArrayGetP
(
pArray
,
i
);
while
(
i
<
j
)
{
...
...
tests/system-test/0-others/compatibility.py
浏览文件 @
90371894
...
...
@@ -15,155 +15,160 @@ from util.cluster import *
class
TDTestCase
:
def
caseDescription
(
self
):
'''
3.0 data compatibility test
case1: basedata version is 3.0.1.0
'''
return
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
or
"taosd.exe"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
getCfgPath
(
self
):
buildPath
=
self
.
getBuildPath
()
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg/"
else
:
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg/"
return
cfgPath
def
installTaosd
(
self
,
bPath
,
cPath
):
# os.system(f"rmtaos && mkdir -p {self.getBuildPath()}/build/lib/temp && mv {self.getBuildPath()}/build/lib/libtaos.so* {self.getBuildPath()}/build/lib/temp/ ")
# os.system(f" mv {bPath}/build {bPath}/build_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so {self.getBuildPath()}/build/lib/libtaos.so_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
packagePath
=
"/usr/local/src/"
packageName
=
"TDengine-server-3.0.1.0-Linux-x64.tar.gz"
os
.
system
(
f
"cd
{
packagePath
}
&& tar xvf TDengine-server-3.0.1.0-Linux-x64.tar.gz && cd TDengine-server-3.0.1.0 && ./install.sh -e no "
)
tdDnodes
.
stop
(
1
)
print
(
f
"start taosd: nohup taosd -c
{
cPath
}
& "
)
os
.
system
(
f
" nohup taosd -c
{
cPath
}
& "
)
sleep
(
1
)
def
buildTaosd
(
self
,
bPath
):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os
.
system
(
f
" cd
{
bPath
}
&& make install "
)
def
run
(
self
):
bPath
=
self
.
getBuildPath
()
cPath
=
self
.
getCfgPath
()
dbname
=
"test"
stb
=
f
"
{
dbname
}
.meters"
self
.
installTaosd
(
bPath
,
cPath
)
os
.
system
(
"echo 'debugFlag 143' > /etc/taos/taos.cfg "
)
tableNumbers
=
100
recordNumbers1
=
100
recordNumbers2
=
1000
tdsqlF
=
tdCom
.
newTdSql
()
print
(
tdsqlF
)
tdsqlF
.
query
(
f
"SELECT SERVER_VERSION();"
)
print
(
tdsqlF
.
query
(
f
"SELECT SERVER_VERSION();"
))
oldServerVersion
=
tdsqlF
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"Base server version is
{
oldServerVersion
}
"
)
tdsqlF
.
query
(
f
"SELECT CLIENT_VERSION();"
)
# the oldClientVersion can't be updated in the same python process,so the version is new compiled verison
oldClientVersion
=
tdsqlF
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"Base client version is
{
oldClientVersion
}
"
)
tdLog
.
printNoPrefix
(
f
"==========step1:prepare and check data in old version-
{
oldServerVersion
}
"
)
tdLog
.
info
(
f
" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers1
}
-y "
)
os
.
system
(
f
"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers1
}
-y "
)
sleep
(
3
)
# tdsqlF.query(f"select count(*) from {stb}")
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
os
.
system
(
"pkill taosd"
)
sleep
(
2
)
print
(
f
"start taosd: nohup taosd -c
{
cPath
}
& "
)
os
.
system
(
f
" nohup taosd -c
{
cPath
}
& "
)
sleep
(
10
)
tdLog
.
info
(
" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y "
)
os
.
system
(
"LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y"
)
os
.
system
(
"pkill -9 taosd"
)
tdLog
.
printNoPrefix
(
"==========step2:update new version "
)
self
.
buildTaosd
(
bPath
)
tdDnodes
.
start
(
1
)
sleep
(
1
)
tdsql
=
tdCom
.
newTdSql
()
print
(
tdsql
)
tdsql
.
query
(
f
"SELECT SERVER_VERSION();"
)
nowServerVersion
=
tdsql
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"New server version is
{
nowServerVersion
}
"
)
tdsql
.
query
(
f
"SELECT CLIENT_VERSION();"
)
nowClientVersion
=
tdsql
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"New client version is
{
nowClientVersion
}
"
)
tdLog
.
printNoPrefix
(
f
"==========step3:prepare and check data in new version-
{
nowServerVersion
}
"
)
tdsql
.
query
(
f
"select count(*) from
{
stb
}
"
)
tdsql
.
checkData
(
0
,
0
,
tableNumbers
*
recordNumbers1
)
os
.
system
(
f
"taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers2
}
-y "
)
tdsql
.
query
(
f
"select count(*) from
{
stb
}
"
)
tdsql
.
checkData
(
0
,
0
,
tableNumbers
*
recordNumbers2
)
tdsql
=
tdCom
.
newTdSql
()
tdLog
.
printNoPrefix
(
f
"==========step4:verify backticks in taos Sql-TD18542"
)
tdsql
.
execute
(
"drop database if exists db"
)
tdsql
.
execute
(
"create database db"
)
tdsql
.
execute
(
"use db"
)
tdsql
.
execute
(
"create stable db.stb1 (ts timestamp, c1 int) tags (t1 int);"
)
tdsql
.
execute
(
"insert into db.ct1 using db.stb1 TAGS(1) values(now(),11);"
)
tdsql
.
error
(
" insert into `db.ct2` using db.stb1 TAGS(9) values(now(),11);"
)
tdsql
.
error
(
" insert into db.`db.ct2` using db.stb1 TAGS(9) values(now(),11);"
)
tdsql
.
execute
(
"insert into `db`.ct3 using db.stb1 TAGS(3) values(now(),13);"
)
tdsql
.
query
(
"select * from db.ct3"
)
tdsql
.
checkData
(
0
,
1
,
13
)
tdsql
.
execute
(
"insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);"
)
tdsql
.
query
(
"select * from db.ct4"
)
tdsql
.
checkData
(
0
,
1
,
14
)
tdsql
.
query
(
"describe information_schema.ins_databases;"
)
qRows
=
tdsql
.
queryRows
for
i
in
range
(
qRows
)
:
if
tdsql
.
queryResult
[
i
][
0
]
==
"retentions"
:
return
True
else
:
return
False
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
def
caseDescription
(
self
):
'''
3.0 data compatibility test
case1: basedata version is 3.0.1.0
'''
return
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
or
"taosd.exe"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
getCfgPath
(
self
):
buildPath
=
self
.
getBuildPath
()
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg/"
else
:
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg/"
return
cfgPath
def
installTaosd
(
self
,
bPath
,
cPath
):
# os.system(f"rmtaos && mkdir -p {self.getBuildPath()}/build/lib/temp && mv {self.getBuildPath()}/build/lib/libtaos.so* {self.getBuildPath()}/build/lib/temp/ ")
# os.system(f" mv {bPath}/build {bPath}/build_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so {self.getBuildPath()}/build/lib/libtaos.so_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
packagePath
=
"/usr/local/src/"
packageName
=
"TDengine-server-3.0.1.0-Linux-x64.tar.gz"
os
.
system
(
f
"cd
{
packagePath
}
&& tar xvf TDengine-server-3.0.1.0-Linux-x64.tar.gz && cd TDengine-server-3.0.1.0 && ./install.sh -e no "
)
tdDnodes
.
stop
(
1
)
print
(
f
"start taosd: nohup taosd -c
{
cPath
}
& "
)
os
.
system
(
f
" nohup taosd -c
{
cPath
}
& "
)
sleep
(
1
)
def
buildTaosd
(
self
,
bPath
):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os
.
system
(
f
" cd
{
bPath
}
&& make install "
)
def
run
(
self
):
print
(
f
"start taosd run"
)
bPath
=
self
.
getBuildPath
()
cPath
=
self
.
getCfgPath
()
dbname
=
"test"
stb
=
f
"
{
dbname
}
.meters"
self
.
installTaosd
(
bPath
,
cPath
)
os
.
system
(
"echo 'debugFlag 143' > /etc/taos/taos.cfg "
)
os
.
system
(
"echo ' supportVnodes 256' > /etc/taos/taos.cfg "
)
tableNumbers
=
100
recordNumbers1
=
100
recordNumbers2
=
1000
#tdsqlF=tdCom.newTdSql()
#print(tdsqlF)
oldServerVersion
=
'3.0.1.0'
#tdsqlF.query(f"SELECT SERVER_VERSION();")
#print(tdsqlF.query(f"SELECT SERVER_VERSION();"))
#oldServerVersion=tdsqlF.queryResult[0][0]
#tdLog.info(f"Base server version is {oldServerVersion}")
#tdsqlF.query(f"SELECT CLIENT_VERSION();")
#
## the oldClientVersion can't be updated in the same python process,so the version is new compiled verison
#oldClientVersion=tdsqlF.queryResult[0][0]
#tdLog.info(f"Base client version is {oldClientVersion}")
tdLog
.
printNoPrefix
(
f
"==========step1:prepare and check data in old version-
{
oldServerVersion
}
"
)
tdLog
.
info
(
f
" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers1
}
-y "
)
os
.
system
(
f
"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers1
}
-y "
)
sleep
(
3
)
# tdsqlF.query(f"select count(*) from {stb}")
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
os
.
system
(
"pkill taosd"
)
sleep
(
2
)
print
(
f
"start taosd: nohup taosd -c
{
cPath
}
& "
)
os
.
system
(
f
" nohup taosd -c
{
cPath
}
& "
)
sleep
(
10
)
tdLog
.
info
(
" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y "
)
os
.
system
(
"LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y"
)
os
.
system
(
"pkill -9 taosd"
)
tdLog
.
printNoPrefix
(
"==========step2:update new version "
)
self
.
buildTaosd
(
bPath
)
tdDnodes
.
start
(
1
)
sleep
(
1
)
tdsql
=
tdCom
.
newTdSql
()
print
(
tdsql
)
tdsql
.
query
(
f
"SELECT SERVER_VERSION();"
)
nowServerVersion
=
tdsql
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"New server version is
{
nowServerVersion
}
"
)
tdsql
.
query
(
f
"SELECT CLIENT_VERSION();"
)
nowClientVersion
=
tdsql
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"New client version is
{
nowClientVersion
}
"
)
tdLog
.
printNoPrefix
(
f
"==========step3:prepare and check data in new version-
{
nowServerVersion
}
"
)
tdsql
.
query
(
f
"select count(*) from
{
stb
}
"
)
tdsql
.
checkData
(
0
,
0
,
tableNumbers
*
recordNumbers1
)
os
.
system
(
f
"taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers2
}
-y "
)
tdsql
.
query
(
f
"select count(*) from
{
stb
}
"
)
tdsql
.
checkData
(
0
,
0
,
tableNumbers
*
recordNumbers2
)
tdsql
=
tdCom
.
newTdSql
()
tdLog
.
printNoPrefix
(
f
"==========step4:verify backticks in taos Sql-TD18542"
)
tdsql
.
execute
(
"drop database if exists db"
)
tdsql
.
execute
(
"create database db"
)
tdsql
.
execute
(
"use db"
)
tdsql
.
execute
(
"create stable db.stb1 (ts timestamp, c1 int) tags (t1 int);"
)
tdsql
.
execute
(
"insert into db.ct1 using db.stb1 TAGS(1) values(now(),11);"
)
tdsql
.
error
(
" insert into `db.ct2` using db.stb1 TAGS(9) values(now(),11);"
)
tdsql
.
error
(
" insert into db.`db.ct2` using db.stb1 TAGS(9) values(now(),11);"
)
tdsql
.
execute
(
"insert into `db`.ct3 using db.stb1 TAGS(3) values(now(),13);"
)
tdsql
.
query
(
"select * from db.ct3"
)
tdsql
.
checkData
(
0
,
1
,
13
)
tdsql
.
execute
(
"insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);"
)
tdsql
.
query
(
"select * from db.ct4"
)
tdsql
.
checkData
(
0
,
1
,
14
)
tdsql
.
query
(
"describe information_schema.ins_databases;"
)
qRows
=
tdsql
.
queryRows
for
i
in
range
(
qRows
)
:
if
tdsql
.
queryResult
[
i
][
0
]
==
"retentions"
:
return
True
else
:
return
False
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录