Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c054ed88
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c054ed88
编写于
8月 06, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
other: merge 3.0
上级
0d60d54a
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
166 addition
and
134 deletion
+166
-134
include/common/systable.h
include/common/systable.h
+30
-30
include/common/tdatablock.h
include/common/tdatablock.h
+1
-0
include/common/tgrant.h
include/common/tgrant.h
+4
-4
include/common/tmsg.h
include/common/tmsg.h
+7
-5
include/libs/executor/executor.h
include/libs/executor/executor.h
+7
-0
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+4
-1
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+3
-0
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+51
-68
include/libs/sync/sync.h
include/libs/sync/sync.h
+4
-4
include/os/osEnv.h
include/os/osEnv.h
+2
-0
include/os/osFile.h
include/os/osFile.h
+1
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
tools/shell/src/shellCommand.c
tools/shell/src/shellCommand.c
+5
-1
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+22
-9
tools/shell/src/shellWebsocket.c
tools/shell/src/shellWebsocket.c
+22
-12
未找到文件。
include/common/systable.h
浏览文件 @
c054ed88
...
@@ -23,38 +23,38 @@ extern "C" {
...
@@ -23,38 +23,38 @@ extern "C" {
#define TDENGINE_SYSTABLE_H
#define TDENGINE_SYSTABLE_H
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_INS_TABLE_DNODES "dnodes"
#define TSDB_INS_TABLE_DNODES "
ins_
dnodes"
#define TSDB_INS_TABLE_MNODES "mnodes"
#define TSDB_INS_TABLE_MNODES "
ins_
mnodes"
#define TSDB_INS_TABLE_MODULES "modules"
#define TSDB_INS_TABLE_MODULES "
ins_
modules"
#define TSDB_INS_TABLE_QNODES "qnodes"
#define TSDB_INS_TABLE_QNODES "
ins_
qnodes"
#define TSDB_INS_TABLE_BNODES "bnodes"
#define TSDB_INS_TABLE_BNODES "
ins_
bnodes"
#define TSDB_INS_TABLE_SNODES "snodes"
#define TSDB_INS_TABLE_SNODES "
ins_
snodes"
#define TSDB_INS_TABLE_CLUSTER "cluster"
#define TSDB_INS_TABLE_CLUSTER "
ins_
cluster"
#define TSDB_INS_TABLE_
USER_DATABASES "user
_databases"
#define TSDB_INS_TABLE_
DATABASES "ins
_databases"
#define TSDB_INS_TABLE_
USER_FUNCTIONS "user
_functions"
#define TSDB_INS_TABLE_
FUNCTIONS "ins
_functions"
#define TSDB_INS_TABLE_
USER_INDEXES "user
_indexes"
#define TSDB_INS_TABLE_
INDEXES "ins
_indexes"
#define TSDB_INS_TABLE_
USER_STABLES "user
_stables"
#define TSDB_INS_TABLE_
STABLES "ins
_stables"
#define TSDB_INS_TABLE_
USER_TABLES "user
_tables"
#define TSDB_INS_TABLE_
TABLES "ins
_tables"
#define TSDB_INS_TABLE_
USER_TAGS "user
_tags"
#define TSDB_INS_TABLE_
TAGS "ins
_tags"
#define TSDB_INS_TABLE_
USER_TABLE_DISTRIBUTED "user
_table_distributed"
#define TSDB_INS_TABLE_
TABLE_DISTRIBUTED "ins
_table_distributed"
#define TSDB_INS_TABLE_USER
_USERS "user
_users"
#define TSDB_INS_TABLE_USER
S "ins
_users"
#define TSDB_INS_TABLE_LICENCES "grants"
#define TSDB_INS_TABLE_LICENCES "
ins_
grants"
#define TSDB_INS_TABLE_VGROUPS "vgroups"
#define TSDB_INS_TABLE_VGROUPS "
ins_
vgroups"
#define TSDB_INS_TABLE_VNODES "vnodes"
#define TSDB_INS_TABLE_VNODES "
ins_
vnodes"
#define TSDB_INS_TABLE_CONFIGS "configs"
#define TSDB_INS_TABLE_CONFIGS "
ins_
configs"
#define TSDB_INS_TABLE_DNODE_VARIABLES "dnode_variables"
#define TSDB_INS_TABLE_DNODE_VARIABLES "
ins_
dnode_variables"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "smas"
#define TSDB_PERFS_TABLE_SMAS "
perf_
smas"
#define TSDB_PERFS_TABLE_CONNECTIONS "connections"
#define TSDB_PERFS_TABLE_CONNECTIONS "
perf_
connections"
#define TSDB_PERFS_TABLE_QUERIES "queries"
#define TSDB_PERFS_TABLE_QUERIES "
perf_
queries"
#define TSDB_PERFS_TABLE_TOPICS "topics"
#define TSDB_PERFS_TABLE_TOPICS "
perf_
topics"
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
#define TSDB_PERFS_TABLE_CONSUMERS "
perf_
consumers"
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "
perf_
subscriptions"
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_PERFS_TABLE_OFFSETS "
perf_
offsets"
#define TSDB_PERFS_TABLE_TRANS "trans"
#define TSDB_PERFS_TABLE_TRANS "
perf_
trans"
#define TSDB_PERFS_TABLE_STREAMS "streams"
#define TSDB_PERFS_TABLE_STREAMS "
perf_
streams"
#define TSDB_PERFS_TABLE_APPS "apps"
#define TSDB_PERFS_TABLE_APPS "
perf_
apps"
typedef
struct
SSysDbTableSchema
{
typedef
struct
SSysDbTableSchema
{
const
char
*
name
;
const
char
*
name
;
...
...
include/common/tdatablock.h
浏览文件 @
c054ed88
...
@@ -184,6 +184,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
...
@@ -184,6 +184,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
int32_t
getJsonValueLen
(
const
char
*
data
);
int32_t
getJsonValueLen
(
const
char
*
data
);
int32_t
colDataAppend
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
);
int32_t
colDataAppend
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
);
int32_t
colDataAppendNItems
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
uint32_t
numOfRows
);
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRow1
,
int32_t
*
capacity
,
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRow1
,
int32_t
*
capacity
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRow2
);
const
SColumnInfoData
*
pSource
,
int32_t
numOfRow2
);
int32_t
colDataAssign
(
SColumnInfoData
*
pColumnInfoData
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRows
,
int32_t
colDataAssign
(
SColumnInfoData
*
pColumnInfoData
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRows
,
...
...
include/common/tgrant.h
浏览文件 @
c054ed88
...
@@ -49,9 +49,9 @@ int32_t grantCheck(EGrantType grant);
...
@@ -49,9 +49,9 @@ int32_t grantCheck(EGrantType grant);
#ifndef GRANTS_CFG
#ifndef GRANTS_CFG
#define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \
#define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire
time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire
_
time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage
(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
\
{.name = "storage
", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
\
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
...
@@ -59,8 +59,8 @@ int32_t grantCheck(EGrantType grant);
...
@@ -59,8 +59,8 @@ int32_t grantCheck(EGrantType grant);
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu
cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu
_
cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed
(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
\
{.name = "speed
", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
\
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
}
}
#define GRANT_CFG_ADD
#define GRANT_CFG_ADD
...
...
include/common/tmsg.h
浏览文件 @
c054ed88
...
@@ -227,8 +227,7 @@ typedef struct SSubmitBlk {
...
@@ -227,8 +227,7 @@ typedef struct SSubmitBlk {
int32_t
sversion
;
// data schema version
int32_t
sversion
;
// data schema version
int32_t
dataLen
;
// data part length, not including the SSubmitBlk head
int32_t
dataLen
;
// data part length, not including the SSubmitBlk head
int32_t
schemaLen
;
// schema length, if length is 0, no schema exists
int32_t
schemaLen
;
// schema length, if length is 0, no schema exists
int16_t
numOfRows
;
// total number of rows in current submit block
int32_t
numOfRows
;
// total number of rows in current submit block
int16_t
padding
;
// TODO just for padding here
char
data
[];
char
data
[];
}
SSubmitBlk
;
}
SSubmitBlk
;
...
@@ -256,7 +255,7 @@ typedef struct {
...
@@ -256,7 +255,7 @@ typedef struct {
int32_t
sversion
;
// data schema version
int32_t
sversion
;
// data schema version
int32_t
dataLen
;
// data part length, not including the SSubmitBlk head
int32_t
dataLen
;
// data part length, not including the SSubmitBlk head
int32_t
schemaLen
;
// schema length, if length is 0, no schema exists
int32_t
schemaLen
;
// schema length, if length is 0, no schema exists
int
16
_t
numOfRows
;
// total number of rows in current submit block
int
32
_t
numOfRows
;
// total number of rows in current submit block
// head of SSubmitBlk
// head of SSubmitBlk
int32_t
numOfBlocks
;
int32_t
numOfBlocks
;
const
void
*
pMsg
;
const
void
*
pMsg
;
...
@@ -337,8 +336,10 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
...
@@ -337,8 +336,10 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
}
}
static
FORCE_INLINE
void
tDeleteSSchemaWrapper
(
SSchemaWrapper
*
pSchemaWrapper
)
{
static
FORCE_INLINE
void
tDeleteSSchemaWrapper
(
SSchemaWrapper
*
pSchemaWrapper
)
{
taosMemoryFree
(
pSchemaWrapper
->
pSchema
);
if
(
pSchemaWrapper
)
{
taosMemoryFree
(
pSchemaWrapper
);
taosMemoryFree
(
pSchemaWrapper
->
pSchema
);
taosMemoryFree
(
pSchemaWrapper
);
}
}
}
static
FORCE_INLINE
int32_t
taosEncodeSSchema
(
void
**
buf
,
const
SSchema
*
pSchema
)
{
static
FORCE_INLINE
int32_t
taosEncodeSSchema
(
void
**
buf
,
const
SSchema
*
pSchema
)
{
...
@@ -2223,6 +2224,7 @@ typedef struct SAppClusterSummary {
...
@@ -2223,6 +2224,7 @@ typedef struct SAppClusterSummary {
uint64_t
insertBytes
;
// submit to tsdb since launched.
uint64_t
insertBytes
;
// submit to tsdb since launched.
uint64_t
fetchBytes
;
uint64_t
fetchBytes
;
uint64_t
numOfQueryReq
;
uint64_t
queryElapsedTime
;
uint64_t
queryElapsedTime
;
uint64_t
numOfSlowQueries
;
uint64_t
numOfSlowQueries
;
uint64_t
totalRequests
;
uint64_t
totalRequests
;
...
...
include/libs/executor/executor.h
浏览文件 @
c054ed88
...
@@ -76,6 +76,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
...
@@ -76,6 +76,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
*/
*/
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
);
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
);
/**
* @brief Cleanup SSDataBlock for StreamScanInfo
*
* @param tinfo
*/
void
tdCleanupStreamInputDataBlock
(
qTaskInfo_t
tinfo
);
/**
/**
* Update the table id list, add or remove.
* Update the table id list, add or remove.
*
*
...
...
include/libs/nodes/cmdnodes.h
浏览文件 @
c054ed88
...
@@ -253,7 +253,8 @@ typedef struct SShowCreateTableStmt {
...
@@ -253,7 +253,8 @@ typedef struct SShowCreateTableStmt {
ENodeType
type
;
ENodeType
type
;
char
dbName
[
TSDB_DB_NAME_LEN
];
char
dbName
[
TSDB_DB_NAME_LEN
];
char
tableName
[
TSDB_TABLE_NAME_LEN
];
char
tableName
[
TSDB_TABLE_NAME_LEN
];
void
*
pCfg
;
// STableCfg
void
*
pDbCfg
;
// SDbCfgInfo
void
*
pTableCfg
;
// STableCfg
}
SShowCreateTableStmt
;
}
SShowCreateTableStmt
;
typedef
struct
SShowTableDistributedStmt
{
typedef
struct
SShowTableDistributedStmt
{
...
@@ -282,6 +283,7 @@ typedef struct SCreateIndexStmt {
...
@@ -282,6 +283,7 @@ typedef struct SCreateIndexStmt {
ENodeType
type
;
ENodeType
type
;
EIndexType
indexType
;
EIndexType
indexType
;
bool
ignoreExists
;
bool
ignoreExists
;
char
indexDbName
[
TSDB_DB_NAME_LEN
];
char
indexName
[
TSDB_INDEX_NAME_LEN
];
char
indexName
[
TSDB_INDEX_NAME_LEN
];
char
dbName
[
TSDB_DB_NAME_LEN
];
char
dbName
[
TSDB_DB_NAME_LEN
];
char
tableName
[
TSDB_TABLE_NAME_LEN
];
char
tableName
[
TSDB_TABLE_NAME_LEN
];
...
@@ -292,6 +294,7 @@ typedef struct SCreateIndexStmt {
...
@@ -292,6 +294,7 @@ typedef struct SCreateIndexStmt {
typedef
struct
SDropIndexStmt
{
typedef
struct
SDropIndexStmt
{
ENodeType
type
;
ENodeType
type
;
bool
ignoreNotExists
;
bool
ignoreNotExists
;
char
indexDbName
[
TSDB_DB_NAME_LEN
];
char
indexName
[
TSDB_INDEX_NAME_LEN
];
char
indexName
[
TSDB_INDEX_NAME_LEN
];
}
SDropIndexStmt
;
}
SDropIndexStmt
;
...
...
include/libs/nodes/plannodes.h
浏览文件 @
c054ed88
...
@@ -114,6 +114,7 @@ typedef struct SAggLogicNode {
...
@@ -114,6 +114,7 @@ typedef struct SAggLogicNode {
SNodeList
*
pAggFuncs
;
SNodeList
*
pAggFuncs
;
bool
hasLastRow
;
bool
hasLastRow
;
bool
hasTimeLineFunc
;
bool
hasTimeLineFunc
;
bool
onlyHasKeepOrderFunc
;
}
SAggLogicNode
;
}
SAggLogicNode
;
typedef
struct
SProjectLogicNode
{
typedef
struct
SProjectLogicNode
{
...
@@ -555,6 +556,8 @@ typedef struct SQueryPlan {
...
@@ -555,6 +556,8 @@ typedef struct SQueryPlan {
void
nodesWalkPhysiPlan
(
SNode
*
pNode
,
FNodeWalker
walker
,
void
*
pContext
);
void
nodesWalkPhysiPlan
(
SNode
*
pNode
,
FNodeWalker
walker
,
void
*
pContext
);
const
char
*
dataOrderStr
(
EDataOrderLevel
order
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
include/libs/nodes/querynodes.h
浏览文件 @
c054ed88
...
@@ -275,6 +275,7 @@ typedef struct SSelectStmt {
...
@@ -275,6 +275,7 @@ typedef struct SSelectStmt {
bool
hasInterpFunc
;
bool
hasInterpFunc
;
bool
hasLastRowFunc
;
bool
hasLastRowFunc
;
bool
hasTimeLineFunc
;
bool
hasTimeLineFunc
;
bool
onlyHasKeepOrderFunc
;
bool
groupSort
;
bool
groupSort
;
}
SSelectStmt
;
}
SSelectStmt
;
...
...
include/libs/qcom/query.h
浏览文件 @
c054ed88
...
@@ -71,6 +71,7 @@ typedef struct SIndexMeta {
...
@@ -71,6 +71,7 @@ typedef struct SIndexMeta {
typedef
struct
SExecResult
{
typedef
struct
SExecResult
{
int32_t
code
;
int32_t
code
;
uint64_t
numOfRows
;
uint64_t
numOfRows
;
uint64_t
numOfBytes
;
int32_t
msgType
;
int32_t
msgType
;
void
*
res
;
void
*
res
;
}
SExecResult
;
}
SExecResult
;
...
...
include/libs/stream/tstream.h
浏览文件 @
c054ed88
...
@@ -46,9 +46,10 @@ enum {
...
@@ -46,9 +46,10 @@ enum {
};
};
enum
{
enum
{
TASK_EXEC_STATUS__IDLE
=
1
,
TASK_SCHED_STATUS__INACTIVE
=
1
,
TASK_EXEC_STATUS__EXECUTING
,
TASK_SCHED_STATUS__WAITING
,
TASK_EXEC_STATUS__CLOSING
,
TASK_SCHED_STATUS__ACTIVE
,
TASK_SCHED_STATUS__FAILED
,
};
};
enum
{
enum
{
...
@@ -65,6 +66,25 @@ enum {
...
@@ -65,6 +66,25 @@ enum {
TASK_OUTPUT_STATUS__BLOCKED
,
TASK_OUTPUT_STATUS__BLOCKED
,
};
};
enum
{
TASK_TRIGGER_STATUS__INACTIVE
=
1
,
TASK_TRIGGER_STATUS__ACTIVE
,
};
enum
{
TASK_LEVEL__SOURCE
=
1
,
TASK_LEVEL__AGG
,
TASK_LEVEL__SINK
,
};
enum
{
TASK_OUTPUT__FIXED_DISPATCH
=
1
,
TASK_OUTPUT__SHUFFLE_DISPATCH
,
TASK_OUTPUT__TABLE
,
TASK_OUTPUT__SMA
,
TASK_OUTPUT__FETCH
,
};
typedef
struct
{
typedef
struct
{
int8_t
type
;
int8_t
type
;
}
SStreamQueueItem
;
}
SStreamQueueItem
;
...
@@ -201,41 +221,6 @@ typedef struct {
...
@@ -201,41 +221,6 @@ typedef struct {
int8_t
reserved
;
int8_t
reserved
;
}
STaskSinkFetch
;
}
STaskSinkFetch
;
enum
{
TASK_SOURCE__SCAN
=
1
,
TASK_SOURCE__PIPE
,
TASK_SOURCE__MERGE
,
};
enum
{
TASK_EXEC__NONE
=
1
,
TASK_EXEC__PIPE
,
TASK_EXEC__MERGE
,
};
enum
{
TASK_DISPATCH__NONE
=
1
,
TASK_DISPATCH__FIXED
,
TASK_DISPATCH__SHUFFLE
,
};
enum
{
TASK_SINK__NONE
=
1
,
TASK_SINK__TABLE
,
TASK_SINK__SMA
,
TASK_SINK__FETCH
,
};
enum
{
TASK_INPUT_TYPE__SUMBIT_BLOCK
=
1
,
TASK_INPUT_TYPE__DATA_BLOCK
,
};
enum
{
TASK_TRIGGER_STATUS__IN_ACTIVE
=
1
,
TASK_TRIGGER_STATUS__ACTIVE
,
};
typedef
struct
{
typedef
struct
{
int32_t
nodeId
;
int32_t
nodeId
;
int32_t
childId
;
int32_t
childId
;
...
@@ -248,28 +233,24 @@ typedef struct {
...
@@ -248,28 +233,24 @@ typedef struct {
typedef
struct
SStreamTask
{
typedef
struct
SStreamTask
{
int64_t
streamId
;
int64_t
streamId
;
int32_t
taskId
;
int32_t
taskId
;
int8_t
isDataScan
;
int8_t
taskLevel
;
int8_t
execType
;
int8_t
outputType
;
int8_t
sinkType
;
int8_t
dispatchType
;
int8_t
isStreamDistributed
;
int16_t
dispatchMsgType
;
int16_t
dispatchMsgType
;
int8_t
taskStatus
;
int8_t
taskStatus
;
int8_t
exec
Status
;
int8_t
sched
Status
;
// node info
// node info
int32_t
selfChildId
;
int32_t
selfChildId
;
int32_t
nodeId
;
int32_t
nodeId
;
SEpSet
epSet
;
SEpSet
epSet
;
// used for
semi or single tas
k,
// used for
task source and sin
k,
// while
final task
should have processedVer for each child
// while
task agg
should have processedVer for each child
int64_t
recoverSnapVer
;
int64_t
recoverSnapVer
;
int64_t
startVer
;
int64_t
startVer
;
int64_t
checkpointVer
;
int64_t
checkpointVer
;
int64_t
processedVer
;
int64_t
processedVer
;
// int32_t numOfVgroups;
// children info
// children info
SArray
*
childEpInfo
;
// SArray<SStreamChildEpInfo*>
SArray
*
childEpInfo
;
// SArray<SStreamChildEpInfo*>
...
@@ -277,19 +258,13 @@ typedef struct SStreamTask {
...
@@ -277,19 +258,13 @@ typedef struct SStreamTask {
// exec
// exec
STaskExec
exec
;
STaskExec
exec
;
// TODO: unify sink and dispatch
// output
// local sink
union
{
STaskSinkTb
tbSink
;
STaskSinkSma
smaSink
;
STaskSinkFetch
fetchSink
;
};
// remote dispatcher
union
{
union
{
STaskDispatcherFixedEp
fixedEpDispatcher
;
STaskDispatcherFixedEp
fixedEpDispatcher
;
STaskDispatcherShuffle
shuffleDispatcher
;
STaskDispatcherShuffle
shuffleDispatcher
;
STaskSinkTb
tbSink
;
STaskSinkSma
smaSink
;
STaskSinkFetch
fetchSink
;
};
};
int8_t
inputStatus
;
int8_t
inputStatus
;
...
@@ -303,9 +278,6 @@ typedef struct SStreamTask {
...
@@ -303,9 +278,6 @@ typedef struct SStreamTask {
int64_t
triggerParam
;
int64_t
triggerParam
;
void
*
timer
;
void
*
timer
;
// application storage
// void* ahandle;
// msg handle
// msg handle
SMsgCb
*
pMsgCb
;
SMsgCb
*
pMsgCb
;
}
SStreamTask
;
}
SStreamTask
;
...
@@ -342,7 +314,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
...
@@ -342,7 +314,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
}
if
(
pItem
->
type
!=
STREAM_INPUT__GET_RES
&&
pItem
->
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
)
{
if
(
pItem
->
type
!=
STREAM_INPUT__GET_RES
&&
pItem
->
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
)
{
atomic_val_compare_exchange_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN
_
ACTIVE
,
TASK_TRIGGER_STATUS__ACTIVE
);
atomic_val_compare_exchange_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__INACTIVE
,
TASK_TRIGGER_STATUS__ACTIVE
);
}
}
#if 0
#if 0
...
@@ -357,18 +329,15 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
...
@@ -357,18 +329,15 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
}
}
static
FORCE_INLINE
int32_t
streamTaskOutput
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
pBlock
)
{
static
FORCE_INLINE
int32_t
streamTaskOutput
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
pBlock
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
pTask
->
outputType
==
TASK_OUTPUT__TABLE
)
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
0
,
pBlock
->
blocks
);
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
0
,
pBlock
->
blocks
);
taosArrayDestroyEx
(
pBlock
->
blocks
,
(
FDelete
)
blockDataFreeRes
);
taosArrayDestroyEx
(
pBlock
->
blocks
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
pBlock
);
taosFreeQitem
(
pBlock
);
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
}
else
if
(
pTask
->
outputType
==
TASK_OUTPUT__SMA
)
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
pTask
->
smaSink
.
smaSink
(
pTask
->
smaSink
.
vnode
,
pTask
->
smaSink
.
smaId
,
pBlock
->
blocks
);
pTask
->
smaSink
.
smaSink
(
pTask
->
smaSink
.
vnode
,
pTask
->
smaSink
.
smaId
,
pBlock
->
blocks
);
taosArrayDestroyEx
(
pBlock
->
blocks
,
(
FDelete
)
blockDataFreeRes
);
taosArrayDestroyEx
(
pBlock
->
blocks
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
pBlock
);
taosFreeQitem
(
pBlock
);
}
else
{
}
else
{
ASSERT
(
pTask
->
dispatchType
!=
TASK_DISPATCH__NONE
);
taosWriteQitem
(
pTask
->
outputQueue
->
queue
,
pBlock
);
taosWriteQitem
(
pTask
->
outputQueue
->
queue
,
pBlock
);
}
}
return
0
;
return
0
;
...
@@ -475,11 +444,10 @@ typedef struct {
...
@@ -475,11 +444,10 @@ typedef struct {
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
);
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
);
int32_t
tDecodeStreamRetrieveReq
(
SDecoder
*
pDecoder
,
SStreamRetrieveReq
*
pReq
);
int32_t
tDecodeStreamRetrieveReq
(
SDecoder
*
pDecoder
,
SStreamRetrieveReq
*
pReq
);
int32_t
streamLaunchByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
);
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
);
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
);
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
);
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
);
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pMsg
,
bool
exec
);
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SStreamDispatchRsp
*
pRsp
);
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SStreamDispatchRsp
*
pRsp
);
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
);
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
);
...
@@ -487,6 +455,21 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
...
@@ -487,6 +455,21 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
int32_t
streamProcessRetrieveReq
(
SStreamTask
*
pTask
,
SStreamRetrieveReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRetrieveReq
(
SStreamTask
*
pTask
,
SStreamRetrieveReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRetrieveRsp
(
SStreamTask
*
pTask
,
SStreamRetrieveRsp
*
pRsp
);
int32_t
streamProcessRetrieveRsp
(
SStreamTask
*
pTask
,
SStreamRetrieveRsp
*
pRsp
);
int32_t
streamTryExec
(
SStreamTask
*
pTask
);
int32_t
streamSchedExec
(
SStreamTask
*
pTask
);
typedef
struct
SStreamMeta
SStreamMeta
;
SStreamMeta
*
streamMetaOpen
();
void
streamMetaClose
(
SStreamMeta
*
streamMeta
);
int32_t
streamMetaAddTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
);
int32_t
streamMetaRemoveTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
int32_t
streamMetaBegin
(
SStreamMeta
*
pMeta
);
int32_t
streamMetaCommit
(
SStreamMeta
*
pMeta
);
int32_t
streamMetaRollBack
(
SStreamMeta
*
pMeta
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
include/libs/sync/sync.h
浏览文件 @
c054ed88
...
@@ -28,10 +28,10 @@ extern bool gRaftDetailLog;
...
@@ -28,10 +28,10 @@ extern bool gRaftDetailLog;
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_MAX_BATCH_SIZE
500
#define SYNC_MAX_BATCH_SIZE
1
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN
0
#define SYNC_INDEX_INVALID -1
#define SYNC_INDEX_INVALID
-1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
#define SYNC_TERM_INVALID
0xFFFFFFFFFFFFFFFF
typedef
enum
{
typedef
enum
{
SYNC_STRATEGY_NO_SNAPSHOT
=
0
,
SYNC_STRATEGY_NO_SNAPSHOT
=
0
,
...
...
include/os/osEnv.h
浏览文件 @
c054ed88
...
@@ -49,6 +49,8 @@ void osDefaultInit();
...
@@ -49,6 +49,8 @@ void osDefaultInit();
void
osUpdate
();
void
osUpdate
();
void
osCleanup
();
void
osCleanup
();
bool
osLogSpaceAvailable
();
bool
osLogSpaceAvailable
();
bool
osDataSpaceAvailable
();
bool
osTempSpaceAvailable
();
void
osSetTimezone
(
const
char
*
timezone
);
void
osSetTimezone
(
const
char
*
timezone
);
void
osSetSystemLocale
(
const
char
*
inLocale
,
const
char
*
inCharSet
);
void
osSetSystemLocale
(
const
char
*
inLocale
,
const
char
*
inCharSet
);
...
...
include/os/osFile.h
浏览文件 @
c054ed88
...
@@ -54,6 +54,7 @@ typedef struct TdFile *TdFilePtr;
...
@@ -54,6 +54,7 @@ typedef struct TdFile *TdFilePtr;
#define TD_FILE_EXCL 0x0080
#define TD_FILE_EXCL 0x0080
#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile
#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile
TdFilePtr
taosOpenFile
(
const
char
*
path
,
int32_t
tdFileOptions
);
TdFilePtr
taosOpenFile
(
const
char
*
path
,
int32_t
tdFileOptions
);
TdFilePtr
taosCreateFile
(
const
char
*
path
,
int32_t
tdFileOptions
);
#define TD_FILE_ACCESS_EXIST_OK 0x1
#define TD_FILE_ACCESS_EXIST_OK 0x1
#define TD_FILE_ACCESS_READ_OK 0x2
#define TD_FILE_ACCESS_READ_OK 0x2
...
...
include/util/taoserror.h
浏览文件 @
c054ed88
...
@@ -608,6 +608,7 @@ int32_t* taosGetErrno();
...
@@ -608,6 +608,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
//index
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
...
...
tools/shell/src/shellCommand.c
浏览文件 @
c054ed88
...
@@ -525,7 +525,11 @@ int32_t shellReadCommand(char *command) {
...
@@ -525,7 +525,11 @@ int32_t shellReadCommand(char *command) {
switch
(
c
)
{
switch
(
c
)
{
case
'A'
:
// Up arrow
case
'A'
:
// Up arrow
hist_counter
=
(
hist_counter
+
SHELL_MAX_HISTORY_SIZE
-
1
)
%
SHELL_MAX_HISTORY_SIZE
;
hist_counter
=
(
hist_counter
+
SHELL_MAX_HISTORY_SIZE
-
1
)
%
SHELL_MAX_HISTORY_SIZE
;
shellResetCommand
(
&
cmd
,
(
pHistory
->
hist
[
hist_counter
]
==
NULL
)
?
""
:
pHistory
->
hist
[
hist_counter
]);
if
(
pHistory
->
hist
[
hist_counter
]
==
NULL
)
{
hist_counter
=
(
hist_counter
+
SHELL_MAX_HISTORY_SIZE
+
1
)
%
SHELL_MAX_HISTORY_SIZE
;
}
else
{
shellResetCommand
(
&
cmd
,
pHistory
->
hist
[
hist_counter
]);
}
break
;
break
;
case
'B'
:
// Down arrow
case
'B'
:
// Down arrow
if
(
hist_counter
!=
pHistory
->
hend
)
{
if
(
hist_counter
!=
pHistory
->
hend
)
{
...
...
tools/shell/src/shellEngine.c
浏览文件 @
c054ed88
...
@@ -22,7 +22,8 @@
...
@@ -22,7 +22,8 @@
static
bool
shellIsEmptyCommand
(
const
char
*
cmd
);
static
bool
shellIsEmptyCommand
(
const
char
*
cmd
);
static
int32_t
shellRunSingleCommand
(
char
*
command
);
static
int32_t
shellRunSingleCommand
(
char
*
command
);
static
int32_t
shellRunCommand
(
char
*
command
);
static
void
shellRecordCommandToHistory
(
char
*
command
);
static
int32_t
shellRunCommand
(
char
*
command
,
bool
recordHistory
);
static
void
shellRunSingleCommandImp
(
char
*
command
);
static
void
shellRunSingleCommandImp
(
char
*
command
);
static
char
*
shellFormatTimestamp
(
char
*
buf
,
int64_t
val
,
int32_t
precision
);
static
char
*
shellFormatTimestamp
(
char
*
buf
,
int64_t
val
,
int32_t
precision
);
static
int32_t
shellDumpResultToFile
(
const
char
*
fname
,
TAOS_RES
*
tres
);
static
int32_t
shellDumpResultToFile
(
const
char
*
fname
,
TAOS_RES
*
tres
);
...
@@ -101,11 +102,7 @@ int32_t shellRunSingleCommand(char *command) {
...
@@ -101,11 +102,7 @@ int32_t shellRunSingleCommand(char *command) {
return
0
;
return
0
;
}
}
int32_t
shellRunCommand
(
char
*
command
)
{
void
shellRecordCommandToHistory
(
char
*
command
)
{
if
(
shellIsEmptyCommand
(
command
))
{
return
0
;
}
SShellHistory
*
pHistory
=
&
shell
.
history
;
SShellHistory
*
pHistory
=
&
shell
.
history
;
if
(
pHistory
->
hstart
==
pHistory
->
hend
||
if
(
pHistory
->
hstart
==
pHistory
->
hend
||
pHistory
->
hist
[(
pHistory
->
hend
+
SHELL_MAX_HISTORY_SIZE
-
1
)
%
SHELL_MAX_HISTORY_SIZE
]
==
NULL
||
pHistory
->
hist
[(
pHistory
->
hend
+
SHELL_MAX_HISTORY_SIZE
-
1
)
%
SHELL_MAX_HISTORY_SIZE
]
==
NULL
||
...
@@ -120,6 +117,14 @@ int32_t shellRunCommand(char *command) {
...
@@ -120,6 +117,14 @@ int32_t shellRunCommand(char *command) {
pHistory
->
hstart
=
(
pHistory
->
hstart
+
1
)
%
SHELL_MAX_HISTORY_SIZE
;
pHistory
->
hstart
=
(
pHistory
->
hstart
+
1
)
%
SHELL_MAX_HISTORY_SIZE
;
}
}
}
}
}
int32_t
shellRunCommand
(
char
*
command
,
bool
recordHistory
)
{
if
(
shellIsEmptyCommand
(
command
))
{
return
0
;
}
if
(
recordHistory
)
shellRecordCommandToHistory
(
command
);
char
quote
=
0
,
*
cmd
=
command
;
char
quote
=
0
,
*
cmd
=
command
;
for
(
char
c
=
*
command
++
;
c
!=
0
;
c
=
*
command
++
)
{
for
(
char
c
=
*
command
++
;
c
!=
0
;
c
=
*
command
++
)
{
...
@@ -826,11 +831,15 @@ void shellSourceFile(const char *file) {
...
@@ -826,11 +831,15 @@ void shellSourceFile(const char *file) {
size_t
cmd_len
=
0
;
size_t
cmd_len
=
0
;
char
*
line
=
NULL
;
char
*
line
=
NULL
;
char
fullname
[
PATH_MAX
]
=
{
0
};
char
fullname
[
PATH_MAX
]
=
{
0
};
char
sourceFileCommand
[
PATH_MAX
+
8
]
=
{
0
};
if
(
taosExpandDir
(
file
,
fullname
,
PATH_MAX
)
!=
0
)
{
if
(
taosExpandDir
(
file
,
fullname
,
PATH_MAX
)
!=
0
)
{
tstrncpy
(
fullname
,
file
,
PATH_MAX
);
tstrncpy
(
fullname
,
file
,
PATH_MAX
);
}
}
sprintf
(
sourceFileCommand
,
"source %s;"
,
fullname
);
shellRecordCommandToHistory
(
sourceFileCommand
);
TdFilePtr
pFile
=
taosOpenFile
(
fullname
,
TD_FILE_READ
|
TD_FILE_STREAM
);
TdFilePtr
pFile
=
taosOpenFile
(
fullname
,
TD_FILE_READ
|
TD_FILE_STREAM
);
if
(
pFile
==
NULL
)
{
if
(
pFile
==
NULL
)
{
fprintf
(
stderr
,
"failed to open file %s
\r\n
"
,
fullname
);
fprintf
(
stderr
,
"failed to open file %s
\r\n
"
,
fullname
);
...
@@ -853,9 +862,13 @@ void shellSourceFile(const char *file) {
...
@@ -853,9 +862,13 @@ void shellSourceFile(const char *file) {
continue
;
continue
;
}
}
if
(
line
[
read_len
-
1
]
==
'\r'
)
{
line
[
read_len
-
1
]
=
' '
;
}
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
printf
(
"%s%s
\r\n
"
,
shell
.
info
.
promptHeader
,
cmd
);
printf
(
"%s%s
\r\n
"
,
shell
.
info
.
promptHeader
,
cmd
);
shellRunCommand
(
cmd
);
shellRunCommand
(
cmd
,
false
);
memset
(
cmd
,
0
,
TSDB_MAX_ALLOWED_SQL_LEN
);
memset
(
cmd
,
0
,
TSDB_MAX_ALLOWED_SQL_LEN
);
cmd_len
=
0
;
cmd_len
=
0
;
}
}
...
@@ -977,7 +990,7 @@ void *shellThreadLoop(void *arg) {
...
@@ -977,7 +990,7 @@ void *shellThreadLoop(void *arg) {
}
}
taosResetTerminalMode
();
taosResetTerminalMode
();
}
while
(
shellRunCommand
(
command
)
==
0
);
}
while
(
shellRunCommand
(
command
,
true
)
==
0
);
taosMemoryFreeClear
(
command
);
taosMemoryFreeClear
(
command
);
shellWriteHistory
();
shellWriteHistory
();
...
@@ -1019,7 +1032,7 @@ int32_t shellExecute() {
...
@@ -1019,7 +1032,7 @@ int32_t shellExecute() {
if
(
pArgs
->
commands
!=
NULL
)
{
if
(
pArgs
->
commands
!=
NULL
)
{
printf
(
"%s%s
\r\n
"
,
shell
.
info
.
promptHeader
,
pArgs
->
commands
);
printf
(
"%s%s
\r\n
"
,
shell
.
info
.
promptHeader
,
pArgs
->
commands
);
char
*
cmd
=
strdup
(
pArgs
->
commands
);
char
*
cmd
=
strdup
(
pArgs
->
commands
);
shellRunCommand
(
cmd
);
shellRunCommand
(
cmd
,
true
);
taosMemoryFree
(
cmd
);
taosMemoryFree
(
cmd
);
}
}
...
...
tools/shell/src/shellWebsocket.c
浏览文件 @
c054ed88
...
@@ -33,10 +33,11 @@ int shell_conn_ws_server(bool first) {
...
@@ -33,10 +33,11 @@ int shell_conn_ws_server(bool first) {
return
0
;
return
0
;
}
}
static
int
horizontalPrintWebsocket
(
WS_RES
*
wres
)
{
static
int
horizontalPrintWebsocket
(
WS_RES
*
wres
,
double
*
execute_time
)
{
const
void
*
data
=
NULL
;
const
void
*
data
=
NULL
;
int
rows
;
int
rows
;
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
*
execute_time
+=
(
double
)(
ws_take_timing
(
wres
)
/
1E6
);
if
(
!
rows
)
{
if
(
!
rows
)
{
return
0
;
return
0
;
}
}
...
@@ -72,10 +73,11 @@ static int horizontalPrintWebsocket(WS_RES* wres) {
...
@@ -72,10 +73,11 @@ static int horizontalPrintWebsocket(WS_RES* wres) {
return
numOfRows
;
return
numOfRows
;
}
}
static
int
verticalPrintWebsocket
(
WS_RES
*
wres
)
{
static
int
verticalPrintWebsocket
(
WS_RES
*
wres
,
double
*
pexecute_time
)
{
int
rows
=
0
;
int
rows
=
0
;
const
void
*
data
=
NULL
;
const
void
*
data
=
NULL
;
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
*
pexecute_time
+=
(
double
)(
ws_take_timing
(
wres
)
/
1E6
);
if
(
!
rows
)
{
if
(
!
rows
)
{
return
0
;
return
0
;
}
}
...
@@ -112,7 +114,7 @@ static int verticalPrintWebsocket(WS_RES* wres) {
...
@@ -112,7 +114,7 @@ static int verticalPrintWebsocket(WS_RES* wres) {
return
numOfRows
;
return
numOfRows
;
}
}
static
int
dumpWebsocketToFile
(
const
char
*
fname
,
WS_RES
*
wres
)
{
static
int
dumpWebsocketToFile
(
const
char
*
fname
,
WS_RES
*
wres
,
double
*
pexecute_time
)
{
char
fullname
[
PATH_MAX
]
=
{
0
};
char
fullname
[
PATH_MAX
]
=
{
0
};
if
(
taosExpandDir
(
fname
,
fullname
,
PATH_MAX
)
!=
0
)
{
if
(
taosExpandDir
(
fname
,
fullname
,
PATH_MAX
)
!=
0
)
{
tstrncpy
(
fullname
,
fname
,
PATH_MAX
);
tstrncpy
(
fullname
,
fname
,
PATH_MAX
);
...
@@ -127,6 +129,7 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
...
@@ -127,6 +129,7 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
int
rows
=
0
;
int
rows
=
0
;
const
void
*
data
=
NULL
;
const
void
*
data
=
NULL
;
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
ws_fetch_block
(
wres
,
&
data
,
&
rows
);
*
pexecute_time
+=
(
double
)(
ws_take_timing
(
wres
)
/
1E6
);
if
(
!
rows
)
{
if
(
!
rows
)
{
taosCloseFile
(
&
pFile
);
taosCloseFile
(
&
pFile
);
return
0
;
return
0
;
...
@@ -162,14 +165,14 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
...
@@ -162,14 +165,14 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
return
numOfRows
;
return
numOfRows
;
}
}
static
int
shellDumpWebsocket
(
WS_RES
*
wres
,
char
*
fname
,
int
*
error_no
,
bool
vertical
)
{
static
int
shellDumpWebsocket
(
WS_RES
*
wres
,
char
*
fname
,
int
*
error_no
,
bool
vertical
,
double
*
pexecute_time
)
{
int
numOfRows
=
0
;
int
numOfRows
=
0
;
if
(
fname
!=
NULL
)
{
if
(
fname
!=
NULL
)
{
numOfRows
=
dumpWebsocketToFile
(
fname
,
wres
);
numOfRows
=
dumpWebsocketToFile
(
fname
,
wres
,
pexecute_time
);
}
else
if
(
vertical
)
{
}
else
if
(
vertical
)
{
numOfRows
=
verticalPrintWebsocket
(
wres
);
numOfRows
=
verticalPrintWebsocket
(
wres
,
pexecute_time
);
}
else
{
}
else
{
numOfRows
=
horizontalPrintWebsocket
(
wres
);
numOfRows
=
horizontalPrintWebsocket
(
wres
,
pexecute_time
);
}
}
*
error_no
=
ws_errno
(
wres
);
*
error_no
=
ws_errno
(
wres
);
return
numOfRows
;
return
numOfRows
;
...
@@ -225,6 +228,8 @@ void shellRunSingleCommandWebsocketImp(char *command) {
...
@@ -225,6 +228,8 @@ void shellRunSingleCommandWebsocketImp(char *command) {
return
;
return
;
}
}
double
execute_time
=
ws_take_timing
(
res
)
/
1E6
;
if
(
shellRegexMatch
(
command
,
"^
\\
s*use
\\
s+[a-zA-Z0-9_]+
\\
s*;
\\
s*$"
,
REG_EXTENDED
|
REG_ICASE
))
{
if
(
shellRegexMatch
(
command
,
"^
\\
s*use
\\
s+[a-zA-Z0-9_]+
\\
s*;
\\
s*$"
,
REG_EXTENDED
|
REG_ICASE
))
{
fprintf
(
stdout
,
"Database changed.
\r\n\r\n
"
);
fprintf
(
stdout
,
"Database changed.
\r\n\r\n
"
);
fflush
(
stdout
);
fflush
(
stdout
);
...
@@ -236,22 +241,27 @@ void shellRunSingleCommandWebsocketImp(char *command) {
...
@@ -236,22 +241,27 @@ void shellRunSingleCommandWebsocketImp(char *command) {
if
(
ws_is_update_query
(
res
))
{
if
(
ws_is_update_query
(
res
))
{
numOfRows
=
ws_affected_rows
(
res
);
numOfRows
=
ws_affected_rows
(
res
);
et
=
taosGetTimestampUs
();
et
=
taosGetTimestampUs
();
printf
(
"Query Ok, %d of %d row(s) in database (%.6fs)
\n
"
,
numOfRows
,
numOfRows
,
double
total_time
=
(
et
-
st
)
/
1E3
;
(
et
-
st
)
/
1E6
);
double
net_time
=
total_time
-
(
double
)
execute_time
;
printf
(
"Query Ok, %d of %d row(s) in database
\n
"
,
numOfRows
,
numOfRows
);
printf
(
"Execute: %.2f ms Network: %.2f ms Total: %.2f ms
\n
"
,
execute_time
,
net_time
,
total_time
);
}
else
{
}
else
{
int
error_no
=
0
;
int
error_no
=
0
;
numOfRows
=
shellDumpWebsocket
(
res
,
fname
,
&
error_no
,
printMode
);
numOfRows
=
shellDumpWebsocket
(
res
,
fname
,
&
error_no
,
printMode
,
&
execute_time
);
if
(
numOfRows
<
0
)
{
if
(
numOfRows
<
0
)
{
ws_free_result
(
res
);
ws_free_result
(
res
);
return
;
return
;
}
}
et
=
taosGetTimestampUs
();
et
=
taosGetTimestampUs
();
double
total_time
=
(
et
-
st
)
/
1E3
;
double
net_time
=
total_time
-
execute_time
;
if
(
error_no
==
0
&&
!
shell
.
stop_query
)
{
if
(
error_no
==
0
&&
!
shell
.
stop_query
)
{
printf
(
"Query OK, %d row(s) in set
(%.6fs)
\n
"
,
numOfRows
,
printf
(
"Query OK, %d row(s) in set
\n
"
,
numOfRows
);
(
et
-
st
)
/
1E6
);
printf
(
"Execute: %.2f ms Network: %.2f ms Total: %.2f ms
\n
"
,
execute_time
,
net_time
,
total_time
);
}
else
{
}
else
{
printf
(
"Query interrupted, %d row(s) in set (%.6fs)
\n
"
,
numOfRows
,
printf
(
"Query interrupted, %d row(s) in set (%.6fs)
\n
"
,
numOfRows
,
(
et
-
st
)
/
1E6
);
(
et
-
st
)
/
1E6
);
printf
(
"Execute: %.2f ms Network: %.2f ms Total: %.2f ms
\n
"
,
execute_time
,
net_time
,
total_time
);
}
}
}
}
printf
(
"
\n
"
);
printf
(
"
\n
"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录