Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
daqiang163
TDengine
提交
d1d02a2f
T
TDengine
项目概览
daqiang163
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d1d02a2f
编写于
12月 28, 2020
作者:
R
root
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/TD-2502-v3
上级
9256dacc
ac319773
变更
38
隐藏空白更改
内联
并排
Showing
38 changed file
with
306 addition
and
453 deletion
+306
-453
src/client/inc/tscLocalMerge.h
src/client/inc/tscLocalMerge.h
+0
-7
src/client/inc/tscLog.h
src/client/inc/tscLog.h
+2
-2
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+11
-9
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+0
-1
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+6
-27
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+1
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+8
-2
src/client/src/tscSchemaUtil.c
src/client/src/tscSchemaUtil.c
+11
-19
src/client/src/tscServer.c
src/client/src/tscServer.c
+28
-63
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+9
-9
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+23
-34
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+30
-29
src/common/inc/tulog.h
src/common/inc/tulog.h
+1
-1
src/common/src/tglobal.c
src/common/src/tglobal.c
+66
-51
src/dnode/inc/dnodeCfg.h
src/dnode/inc/dnodeCfg.h
+1
-0
src/dnode/src/dnodeCfg.c
src/dnode/src/dnodeCfg.c
+6
-0
src/dnode/src/dnodeMRead.c
src/dnode/src/dnodeMRead.c
+0
-2
src/dnode/src/dnodeMWrite.c
src/dnode/src/dnodeMWrite.c
+0
-1
src/dnode/src/dnodeVnodes.c
src/dnode/src/dnodeVnodes.c
+7
-3
src/inc/dnode.h
src/inc/dnode.h
+3
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+6
-1
src/mnode/inc/mnodeDnode.h
src/mnode/inc/mnodeDnode.h
+3
-0
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+22
-4
src/mnode/src/mnodeShow.c
src/mnode/src/mnodeShow.c
+2
-0
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+0
-4
src/query/inc/queryLog.h
src/query/inc/queryLog.h
+2
-2
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+15
-11
src/query/src/qUtil.c
src/query/src/qUtil.c
+0
-154
src/rpc/inc/rpcLog.h
src/rpc/inc/rpcLog.h
+1
-1
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+1
-1
src/util/inc/hash.h
src/util/inc/hash.h
+7
-6
src/util/inc/tconfig.h
src/util/inc/tconfig.h
+1
-0
src/util/src/hash.c
src/util/src/hash.c
+3
-3
src/util/src/tcache.c
src/util/src/tcache.c
+1
-1
src/util/src/tconfig.c
src/util/src/tconfig.c
+26
-0
src/util/src/tlog.c
src/util/src/tlog.c
+1
-1
src/util/src/ttimer.c
src/util/src/ttimer.c
+1
-1
src/vnode/src/vnodeMgmt.c
src/vnode/src/vnodeMgmt.c
+1
-1
未找到文件。
src/client/inc/tscLocalMerge.h
浏览文件 @
d1d02a2f
...
...
@@ -38,12 +38,6 @@ typedef struct SLocalDataSource {
tFilePage
filePage
;
}
SLocalDataSource
;
enum
{
TSC_LOCALREDUCE_READY
=
0x0
,
TSC_LOCALREDUCE_IN_PROGRESS
=
0x1
,
TSC_LOCALREDUCE_TOBE_FREED
=
0x2
,
};
typedef
struct
SLocalReducer
{
SLocalDataSource
**
pLocalDataSrc
;
int32_t
numOfBuffer
;
...
...
@@ -56,7 +50,6 @@ typedef struct SLocalReducer {
tFilePage
*
pTempBuffer
;
struct
SQLFunctionCtx
*
pCtx
;
int32_t
rowSize
;
// size of each intermediate result.
int32_t
status
;
// denote it is in reduce process, in reduce process, it
bool
hasPrevRow
;
// cannot be released
bool
hasUnprocessedRow
;
tOrderDescriptor
*
pDesc
;
...
...
src/client/inc/tscLog.h
浏览文件 @
d1d02a2f
...
...
@@ -22,8 +22,8 @@ extern "C" {
#include "tlog.h"
extern
u
int32_t
cDebugFlag
;
extern
uint32_t
tscEmbedded
;
extern
int32_t
cDebugFlag
;
extern
int8_t
tscEmbedded
;
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
...
...
src/client/inc/tsclient.h
浏览文件 @
d1d02a2f
...
...
@@ -69,9 +69,10 @@ typedef struct STableMeta {
int16_t
sversion
;
int16_t
tversion
;
char
sTableId
[
TSDB_TABLE_FNAME_LEN
];
SVgroupInfo
vgroupInfo
;
int32_t
vgId
;
SCorVgroupInfo
corVgroupInfo
;
STableId
id
;
// union {int64_t stableUid; SSchema* schema;};
SSchema
schema
[];
// if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
}
STableMeta
;
...
...
@@ -307,6 +308,7 @@ typedef struct STscObj {
SRpcCorEpSet
*
tscCorMgmtEpSet
;
void
*
pDnodeConn
;
pthread_mutex_t
mutex
;
int32_t
numOfObj
;
// number of sqlObj from this tscObj
}
STscObj
;
typedef
struct
SSubqueryState
{
...
...
@@ -477,14 +479,14 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
}
}
extern
SCacheObj
*
tscMetaCache
;
extern
int
tscObjRef
;
extern
void
*
tscTmr
;
extern
void
*
tscQhandle
;
extern
int
tscKeepConn
[]
;
extern
int
tscNumOfThreads
;
extern
int
tscRefId
;
extern
SCacheObj
*
tscMetaCache
;
extern
int
tscObjRef
;
extern
void
*
tscTmr
;
extern
void
*
tscQhandle
;
extern
int
tscKeepConn
[]
;
extern
int
tscRefId
;
extern
int
tscNumOfObj
;
// number of existed sqlObj in current process.
extern
int
(
*
tscBuildMsg
[
TSDB_SQL_MAX
])(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
d1d02a2f
...
...
@@ -2625,7 +2625,6 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
pInfo
->
pHisto
=
tHistogramCreateFrom
(
tmp
,
MAX_HISTOGRAM_BIN
);
printf
(
"%p, %p
\n
"
,
pInfo
->
pHisto
,
pInfo
->
pHisto
->
elems
);
return
true
;
}
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
d1d02a2f
...
...
@@ -93,7 +93,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
// for top/bottom function, the output of timestamp is the first column
int32_t
functionId
=
pExpr
->
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
)
{
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pCtx
->
ptsOutputBuf
=
pReducer
->
pCtx
[
0
].
aOutputBuf
;
pCtx
->
param
[
2
].
i64Key
=
pQueryInfo
->
order
.
order
;
pCtx
->
param
[
2
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
...
...
@@ -493,13 +493,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
// there is no more result, so we release all allocated resource
SLocalReducer
*
pLocalReducer
=
(
SLocalReducer
*
)
atomic_exchange_ptr
(
&
pRes
->
pLocalReducer
,
NULL
);
if
(
pLocalReducer
!=
NULL
)
{
int32_t
status
=
0
;
while
((
status
=
atomic_val_compare_exchange_32
(
&
pLocalReducer
->
status
,
TSC_LOCALREDUCE_READY
,
TSC_LOCALREDUCE_TOBE_FREED
))
==
TSC_LOCALREDUCE_IN_PROGRESS
)
{
taosMsleep
(
100
);
tscDebug
(
"%p waiting for delete procedure, status: %d"
,
pSql
,
status
);
}
pLocalReducer
->
pFillInfo
=
taosDestroyFillInfo
(
pLocalReducer
->
pFillInfo
);
if
(
pLocalReducer
->
pCtx
!=
NULL
)
{
...
...
@@ -1303,6 +1296,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// re
for
(
int32_t
i
=
0
;
i
<
t
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
pLocalReducer
->
pCtx
[
i
].
aOutputBuf
=
pLocalReducer
->
pResultBuf
->
data
+
pExpr
->
offset
*
pLocalReducer
->
resColModel
->
capacity
;
if
(
pExpr
->
functionId
==
TSDB_FUNC_TOP
||
pExpr
->
functionId
==
TSDB_FUNC_BOTTOM
||
pExpr
->
functionId
==
TSDB_FUNC_DIFF
)
{
pLocalReducer
->
pCtx
[
i
].
ptsOutputBuf
=
pLocalReducer
->
pCtx
[
0
].
aOutputBuf
;
}
}
memset
(
pLocalReducer
->
pResultBuf
,
0
,
pLocalReducer
->
nResultBufSize
+
sizeof
(
tFilePage
));
...
...
@@ -1437,24 +1434,13 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
// set the data merge in progress
int32_t
prevStatus
=
atomic_val_compare_exchange_32
(
&
pLocalReducer
->
status
,
TSC_LOCALREDUCE_READY
,
TSC_LOCALREDUCE_IN_PROGRESS
);
if
(
prevStatus
!=
TSC_LOCALREDUCE_READY
)
{
assert
(
prevStatus
==
TSC_LOCALREDUCE_TOBE_FREED
);
// it is in tscDestroyLocalReducer function already
return
TSDB_CODE_SUCCESS
;
}
tFilePage
*
tmpBuffer
=
pLocalReducer
->
pTempBuffer
;
tFilePage
*
tmpBuffer
=
pLocalReducer
->
pTempBuffer
;
if
(
doHandleLastRemainData
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
if
(
doBuildFilledResultForGroup
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1510,7 +1496,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
pLocalReducer
->
discardData
->
num
=
0
;
if
(
saveGroupResultInfo
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1556,7 +1541,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
// here we do not check the return value
adjustLoserTreeFromNewData
(
pLocalReducer
,
pOneDataSrc
,
pTree
);
assert
(
pLocalReducer
->
status
==
TSC_LOCALREDUCE_IN_PROGRESS
);
if
(
pRes
->
numOfRows
==
0
)
{
handleUnprocessedRow
(
pCmd
,
pLocalReducer
,
tmpBuffer
);
...
...
@@ -1567,7 +1551,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* If previous group is not skipped, keep it in pRes->numOfGroups
*/
if
(
notSkipped
&&
saveGroupResultInfo
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1587,7 +1570,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
if
(
pRes
->
numOfRows
==
0
)
{
continue
;
}
else
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
}
else
{
// result buffer is not full
...
...
@@ -1612,9 +1594,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
genFinalResults
(
pSql
,
pLocalReducer
,
true
);
}
assert
(
pLocalReducer
->
status
==
TSC_LOCALREDUCE_IN_PROGRESS
&&
pRes
->
row
==
0
);
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
d1d02a2f
...
...
@@ -731,7 +731,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI
return
code
;
}
dataBuf
->
vgId
=
pTableMeta
->
vg
roupInfo
.
vg
Id
;
dataBuf
->
vgId
=
pTableMeta
->
vgId
;
dataBuf
->
numOfTables
=
1
;
*
totalNum
+=
numOfRows
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
d1d02a2f
...
...
@@ -666,6 +666,7 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
const
char
*
msg1
=
"invalid query expression"
;
const
char
*
msg2
=
"interval cannot be less than 10 ms"
;
const
char
*
msg3
=
"sliding cannot be used without interval"
;
const
char
*
msg4
=
"top/bottom query does not support order by value in interval query"
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -712,6 +713,11 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
return
TSDB_CODE_TSC_INVALID_SQL
;
}
int32_t
colId
=
pQueryInfo
->
order
.
orderColId
;
if
(
pQueryInfo
->
interval
.
interval
>
0
&&
colId
!=
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg4
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4646,7 +4652,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
if
(
!
(
orderByTags
||
orderByTS
)
&&
!
isTopBottomQuery
(
pQueryInfo
))
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
else
{
}
else
{
// order by top/bottom result value column is not supported in case of interval query.
assert
(
!
(
orderByTags
&&
orderByTS
));
}
...
...
@@ -4936,7 +4942,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
pCmd
->
payload
;
pUpdateMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vg
roupInfo
.
vg
Id
);
pUpdateMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pUpdateMsg
->
tid
=
htonl
(
pTableMeta
->
id
.
tid
);
pUpdateMsg
->
uid
=
htobe64
(
pTableMeta
->
id
.
uid
);
pUpdateMsg
->
colId
=
htons
(
pTagsSchema
->
colId
);
...
...
src/client/src/tscSchemaUtil.c
浏览文件 @
d1d02a2f
...
...
@@ -130,13 +130,14 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
return
NULL
;
}
static
void
tscInitCorVgroupInfo
(
SCorVgroupInfo
*
corVgroupInfo
,
SVgroup
Info
*
vgroupInfo
)
{
static
void
tscInitCorVgroupInfo
(
SCorVgroupInfo
*
corVgroupInfo
,
SVgroup
Msg
*
pVgroupMsg
)
{
corVgroupInfo
->
version
=
0
;
corVgroupInfo
->
inUse
=
0
;
corVgroupInfo
->
numOfEps
=
vgroupInfo
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
corVgroupInfo
->
numOfEps
;
i
++
)
{
corVgroupInfo
->
epAddr
[
i
].
fqdn
=
strdup
(
vgroupInfo
->
epAddr
[
i
].
fqdn
);
corVgroupInfo
->
epAddr
[
i
].
port
=
vgroupInfo
->
epAddr
[
i
].
port
;
corVgroupInfo
->
inUse
=
0
;
corVgroupInfo
->
numOfEps
=
pVgroupMsg
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
pVgroupMsg
->
numOfEps
;
i
++
)
{
corVgroupInfo
->
epAddr
[
i
].
fqdn
=
strndup
(
pVgroupMsg
->
epAddr
[
i
].
fqdn
,
tListLen
(
pVgroupMsg
->
epAddr
[
0
].
fqdn
));
corVgroupInfo
->
epAddr
[
i
].
port
=
pVgroupMsg
->
epAddr
[
i
].
port
;
}
}
...
...
@@ -145,8 +146,10 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
int32_t
schemaSize
=
(
pTableMetaMsg
->
numOfColumns
+
pTableMetaMsg
->
numOfTags
)
*
sizeof
(
SSchema
);
STableMeta
*
pTableMeta
=
calloc
(
1
,
sizeof
(
STableMeta
)
+
schemaSize
);
pTableMeta
->
tableType
=
pTableMetaMsg
->
tableType
;
pTableMeta
->
vgId
=
pTableMetaMsg
->
vgroup
.
vgId
;
pTableMeta
->
tableInfo
=
(
STableComInfo
)
{
.
numOfTags
=
pTableMetaMsg
->
numOfTags
,
.
precision
=
pTableMetaMsg
->
precision
,
...
...
@@ -156,18 +159,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta
->
id
.
tid
=
pTableMetaMsg
->
tid
;
pTableMeta
->
id
.
uid
=
pTableMetaMsg
->
uid
;
SVgroupInfo
*
pVgroupInfo
=
&
pTableMeta
->
vgroupInfo
;
pVgroupInfo
->
numOfEps
=
pTableMetaMsg
->
vgroup
.
numOfEps
;
pVgroupInfo
->
vgId
=
pTableMetaMsg
->
vgroup
.
vgId
;
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfEps
;
++
i
)
{
SEpAddrMsg
*
pEpMsg
=
&
pTableMetaMsg
->
vgroup
.
epAddr
[
i
];
pVgroupInfo
->
epAddr
[
i
].
fqdn
=
strndup
(
pEpMsg
->
fqdn
,
tListLen
(
pEpMsg
->
fqdn
));
pVgroupInfo
->
epAddr
[
i
].
port
=
pEpMsg
->
port
;
}
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
pVgroupInfo
);
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
&
pTableMetaMsg
->
vgroup
);
pTableMeta
->
sversion
=
pTableMetaMsg
->
sversion
;
pTableMeta
->
tversion
=
pTableMetaMsg
->
tversion
;
...
...
src/client/src/tscServer.c
浏览文件 @
d1d02a2f
...
...
@@ -45,32 +45,30 @@ static int32_t getWaitingTimeInterval(int32_t count) {
return
0
;
}
return
initial
*
(
2
<<
(
count
-
2
));
return
initial
*
(
(
2u
)
<<
(
count
-
2
));
}
static
void
tscSetDnodeEpSet
(
SSqlObj
*
pSql
,
SVgroupInfo
*
pVgroupInfo
)
{
assert
(
pSql
!=
NULL
&&
pVgroupInfo
!=
NULL
&&
pVgroupInfo
->
numOfEps
>
0
);
SRpcEpSet
*
pEpSet
=
&
pSql
->
epSet
;
static
void
tscSetDnodeEpSet
(
SRpcEpSet
*
pEpSet
,
SVgroupInfo
*
pVgroupInfo
)
{
assert
(
pEpSet
!=
NULL
&&
pVgroupInfo
!=
NULL
&&
pVgroupInfo
->
numOfEps
>
0
);
// Issue the query to one of the vnode among a vgroup randomly.
// change the inUse property would not affect the isUse attribute of STableMeta
pEpSet
->
inUse
=
rand
()
%
pVgroupInfo
->
numOfEps
;
// apply the FQDN string length check here
bool
hasFqdn
=
false
;
bool
existed
=
false
;
pEpSet
->
numOfEps
=
pVgroupInfo
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfEps
;
++
i
)
{
tstrncpy
(
pEpSet
->
fqdn
[
i
],
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
tListLen
(
pEpSet
->
fqdn
[
i
]));
pEpSet
->
port
[
i
]
=
pVgroupInfo
->
epAddr
[
i
].
port
;
if
(
!
hasFqdn
)
{
hasFqdn
=
(
strlen
(
pEpSet
->
fqdn
[
i
])
>
0
);
int32_t
len
=
(
int32_t
)
strnlen
(
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
if
(
len
>
0
)
{
tstrncpy
(
pEpSet
->
fqdn
[
i
],
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
tListLen
(
pEpSet
->
fqdn
[
i
]));
existed
=
true
;
}
}
assert
(
hasFqdn
);
assert
(
existed
);
}
static
void
tscDumpMgmtEpSet
(
SSqlObj
*
pSql
)
{
...
...
@@ -102,7 +100,8 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
pCorEpSet
->
epSet
=
*
pEpSet
;
taosCorEndWrite
(
&
pCorEpSet
->
version
);
}
static
void
tscDumpEpSetFromVgroupInfo
(
SCorVgroupInfo
*
pVgroupInfo
,
SRpcEpSet
*
pEpSet
)
{
static
void
tscDumpEpSetFromVgroupInfo
(
SRpcEpSet
*
pEpSet
,
SCorVgroupInfo
*
pVgroupInfo
)
{
if
(
pVgroupInfo
==
NULL
)
{
return
;}
taosCorBeginRead
(
&
pVgroupInfo
->
version
);
int8_t
inUse
=
pVgroupInfo
->
inUse
;
...
...
@@ -515,8 +514,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
else
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vg
roupInfo
.
vg
Id
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d"
,
pSql
,
pTableMeta
->
vg
roupInfo
.
vg
Id
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d"
,
pSql
,
pTableMeta
->
vgId
);
}
pSql
->
cmd
.
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
...
...
@@ -535,7 +534,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// NOTE: shell message size should not include SMsgDesc
int32_t
size
=
pSql
->
cmd
.
payloadLen
-
sizeof
(
SMsgDesc
);
int32_t
vgId
=
pTableMeta
->
vgroupInfo
.
vgId
;
SMsgDesc
*
pMsgDesc
=
(
SMsgDesc
*
)
pMsg
;
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
// always one vnode
...
...
@@ -543,7 +541,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg
+=
sizeof
(
SMsgDesc
);
SSubmitMsg
*
pShellMsg
=
(
SSubmitMsg
*
)
pMsg
;
pShellMsg
->
header
.
vgId
=
htonl
(
vgId
);
pShellMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pShellMsg
->
header
.
contLen
=
htonl
(
size
);
// the length not includes the size of SMsgDesc
pShellMsg
->
length
=
pShellMsg
->
header
.
contLen
;
...
...
@@ -551,9 +549,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload
pSql
->
cmd
.
msgType
=
TSDB_MSG_TYPE_SUBMIT
;
tscDumpEpSetFromVgroupInfo
(
&
p
TableMeta
->
corVgroupInfo
,
&
pSql
->
epSet
);
tscDumpEpSetFromVgroupInfo
(
&
p
Sql
->
epSet
,
&
pTableMeta
->
corVgroupInfo
);
tscDebug
(
"%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
,
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
tscDebug
(
"%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
,
pTableMeta
->
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
pSql
->
epSet
.
numOfEps
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -597,24 +595,28 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
UTIL_TABLE_IS_NORMAL_TABLE
(
pTableMetaInfo
)
||
pTableMetaInfo
->
pVgroupTables
==
NULL
)
{
SVgroupInfo
*
pVgroupInfo
=
NULL
;
int32_t
vgId
=
-
1
;
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
int32_t
index
=
pTableMetaInfo
->
vgroupIndex
;
assert
(
index
>=
0
);
SVgroupInfo
*
pVgroupInfo
=
NULL
;
if
(
pTableMetaInfo
->
vgroupList
->
numOfVgroups
>
0
)
{
assert
(
index
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
index
];
}
vgId
=
pVgroupInfo
->
vgId
;
tscSetDnodeEpSet
(
&
pSql
->
epSet
,
pVgroupInfo
);
tscDebug
(
"%p query on stable, vgIndex:%d, numOfVgroups:%d"
,
pSql
,
index
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
}
else
{
pVgroupInfo
=
&
pTableMeta
->
vgroupInfo
;
vgId
=
pTableMeta
->
vgId
;
tscDumpEpSetFromVgroupInfo
(
&
pSql
->
epSet
,
&
pTableMeta
->
corVgroupInfo
);
}
assert
(
pVgroupInfo
!=
NULL
)
;
pSql
->
epSet
.
inUse
=
rand
()
%
pSql
->
epSet
.
numOfEps
;
tscSetDnodeEpSet
(
pSql
,
pVgroupInfo
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pVgroupInfo
->
vgId
);
pQueryMsg
->
head
.
vgId
=
htonl
(
vgId
);
STableIdInfo
*
pTableIdInfo
=
(
STableIdInfo
*
)
pMsg
;
pTableIdInfo
->
tid
=
htonl
(
pTableMeta
->
id
.
tid
);
...
...
@@ -633,7 +635,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
SVgroupTableInfo
*
pTableIdList
=
taosArrayGet
(
pTableMetaInfo
->
pVgroupTables
,
index
);
// set the vgroup info
tscSetDnodeEpSet
(
pSql
,
&
pTableIdList
->
vgInfo
);
tscSetDnodeEpSet
(
&
pSql
->
epSet
,
&
pTableIdList
->
vgInfo
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pTableIdList
->
vgInfo
.
vgId
);
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pTableIdList
->
itemList
);
...
...
@@ -1448,48 +1450,11 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
tscDumpEpSetFromVgroupInfo
(
&
p
TableMetaInfo
->
pTableMeta
->
corVgroupInfo
,
&
pSql
->
epSet
);
tscDumpEpSetFromVgroupInfo
(
&
p
Sql
->
epSet
,
&
pTableMetaInfo
->
pTableMeta
->
corVgroupInfo
);
return
TSDB_CODE_SUCCESS
;
}
//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
// pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
//
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// int32_t vgIndex = pTableMetaInfo->vgroupIndex;
// if (pTableMetaInfo->pVgroupTables == NULL) {
// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
//
// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
// } else {
// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
// assert(vgIndex >= 0 && vgIndex < numOfVgroups);
//
// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
//
// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
// }
// } else {
// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
// }
//
// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
//
// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
// return TSDB_CODE_SUCCESS;
//}
int
tscAlterDbMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
payloadLen
=
sizeof
(
SAlterDbMsg
);
...
...
src/client/src/tscSystem.c
浏览文件 @
d1d02a2f
...
...
@@ -31,17 +31,16 @@
#include "tlocale.h"
// global, not configurable
SCacheObj
*
tscMetaCache
;
SCacheObj
*
tscMetaCache
;
// table meta cache
SHashObj
*
tscHashMap
;
// hash map to keep the global vgroup info
int
tscObjRef
=
-
1
;
void
*
tscTmr
;
void
*
tscQhandle
;
void
*
tscCheckDiskUsageTmr
;
void
*
tscTmr
;
void
*
tscQhandle
;
void
*
tscCheckDiskUsageTmr
;
int
tscRefId
=
-
1
;
int
tscNumOfThreads
;
int
tscNumOfObj
=
0
;
// number of sqlObj in current process.
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
void
tscCheckDiskUsage
(
void
*
UNUSED_PARAM
(
para
),
void
*
UNUSED_PARAM
(
param
))
{
taosGetDisk
();
...
...
@@ -114,7 +113,7 @@ void taos_init_imp(void) {
int
queueSize
=
tsMaxConnections
*
2
;
double
factor
=
(
tscEmbedded
==
0
)
?
2
.
0
:
4
.
0
;
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
int32_t
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
if
(
tscNumOfThreads
<
2
)
{
tscNumOfThreads
=
2
;
}
...
...
@@ -133,7 +132,8 @@ void taos_init_imp(void) {
int64_t
refreshTime
=
10
;
// 10 seconds by default
if
(
tscMetaCache
==
NULL
)
{
tscMetaCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
refreshTime
,
false
,
tscFreeTableMetaHelper
,
"tableMeta"
);
tscObjRef
=
taosOpenRef
(
40960
,
tscFreeRegisteredSqlObj
);
tscObjRef
=
taosOpenRef
(
40960
,
tscFreeRegisteredSqlObj
);
tscHashMap
=
taosHashInit
(
1024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
}
tscRefId
=
taosOpenRef
(
200
,
tscCloseTscObj
);
...
...
src/client/src/tscUtil.c
浏览文件 @
d1d02a2f
...
...
@@ -458,21 +458,19 @@ void tscFreeRegisteredSqlObj(void *pSql) {
SSqlObj
*
p
=
*
(
SSqlObj
**
)
pSql
;
STscObj
*
pTscObj
=
p
->
pTscObj
;
assert
(
p
->
self
!=
0
);
assert
(
RID_VALID
(
p
->
self
));
tscFreeSqlObj
(
p
);
taosReleaseRef
(
tscRefId
,
pTscObj
->
rid
);
int32_t
num
=
atomic_sub_fetch_32
(
&
pTscObj
->
numOfObj
,
1
);
int32_t
total
=
atomic_sub_fetch_32
(
&
tscNumOfObj
,
1
);
tscDebug
(
"%p free SqlObj, total in tscObj:%d, total:%d"
,
pSql
,
num
,
total
);
}
void
tscFreeTableMetaHelper
(
void
*
pTableMeta
)
{
STableMeta
*
p
=
(
STableMeta
*
)
pTableMeta
;
int32_t
numOfEps
=
p
->
vgroupInfo
.
numOfEps
;
assert
(
numOfEps
>=
0
&&
numOfEps
<=
TSDB_MAX_REPLICA
);
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
++
i
)
{
tfree
(
p
->
vgroupInfo
.
epAddr
[
i
].
fqdn
);
}
int32_t
numOfEps1
=
p
->
corVgroupInfo
.
numOfEps
;
assert
(
numOfEps1
>=
0
&&
numOfEps1
<=
TSDB_MAX_REPLICA
);
...
...
@@ -1912,6 +1910,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
void
registerSqlObj
(
SSqlObj
*
pSql
)
{
taosAcquireRef
(
tscRefId
,
pSql
->
pTscObj
->
rid
);
pSql
->
self
=
taosAddRef
(
tscObjRef
,
pSql
);
int32_t
num
=
atomic_add_fetch_32
(
&
pSql
->
pTscObj
->
numOfObj
,
1
);
int32_t
total
=
atomic_add_fetch_32
(
&
tscNumOfObj
,
1
);
tscDebug
(
"%p new SqlObj from %p, total in tscObj:%d, total:%d"
,
pSql
,
pSql
->
pTscObj
,
num
,
total
);
}
SSqlObj
*
createSimpleSubObj
(
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
int32_t
cmd
)
{
...
...
@@ -1941,30 +1943,24 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
return
NULL
;
}
pNew
->
fp
=
fp
;
pNew
->
fp
=
fp
;
pNew
->
fetchFp
=
fp
;
pNew
->
param
=
param
;
pNew
->
param
=
param
;
pNew
->
sqlstr
=
NULL
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
pNew
->
sqlstr
=
strdup
(
pSql
->
sqlstr
);
if
(
pNew
->
sqlstr
==
NULL
)
{
tscError
(
"%p new subquery failed"
,
pSql
);
tscFreeSqlObj
(
pNew
);
return
NULL
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetailSafely
(
pCmd
,
0
);
assert
(
pSql
->
cmd
.
clauseIndex
==
0
);
STableMetaInfo
*
pMasterTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
,
0
);
tscAddTableMetaInfo
(
pQueryInfo
,
pMasterTableMetaInfo
->
name
,
NULL
,
NULL
,
NULL
,
NULL
);
registerSqlObj
(
pNew
);
return
pNew
;
}
static
void
doSetSqlExprAndResultFieldInfo
(
SQueryInfo
*
p
QueryInfo
,
SQueryInfo
*
p
NewQueryInfo
,
int64_t
uid
)
{
static
void
doSetSqlExprAndResultFieldInfo
(
SQueryInfo
*
pNewQueryInfo
,
int64_t
uid
)
{
int32_t
numOfOutput
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pNewQueryInfo
);
if
(
numOfOutput
==
0
)
{
return
;
...
...
@@ -2017,15 +2013,9 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
tableIndex
);
pNew
->
pTscObj
=
pSql
->
pTscObj
;
pNew
->
pTscObj
=
pSql
->
pTscObj
;
pNew
->
signature
=
pNew
;
pNew
->
sqlstr
=
strdup
(
pSql
->
sqlstr
);
if
(
pNew
->
sqlstr
==
NULL
)
{
tscError
(
"%p new subquery failed, tableIndex:%d, vgroupIndex:%d"
,
pSql
,
tableIndex
,
pTableMetaInfo
->
vgroupIndex
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
pNew
->
sqlstr
=
NULL
;
SSqlCmd
*
pnCmd
=
&
pNew
->
cmd
;
memcpy
(
pnCmd
,
pCmd
,
sizeof
(
SSqlCmd
));
...
...
@@ -2113,23 +2103,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
goto
_error
;
}
doSetSqlExprAndResultFieldInfo
(
p
QueryInfo
,
p
NewQueryInfo
,
uid
);
doSetSqlExprAndResultFieldInfo
(
pNewQueryInfo
,
uid
);
pNew
->
fp
=
fp
;
pNew
->
fp
=
fp
;
pNew
->
fetchFp
=
fp
;
pNew
->
param
=
param
;
pNew
->
param
=
param
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
char
*
name
=
pTableMetaInfo
->
name
;
STableMetaInfo
*
pFinalInfo
=
NULL
;
if
(
pPrevSql
==
NULL
)
{
STableMeta
*
pTableMeta
=
taosCacheAcquireByData
(
tscMetaCache
,
pTableMetaInfo
->
pTableMeta
);
// get by name may failed due to the cache cleanup
if
(
pPrevSql
==
NULL
)
{
// get by name may failed due to the cache cleanup
STableMeta
*
pTableMeta
=
taosCacheAcquireByData
(
tscMetaCache
,
pTableMetaInfo
->
pTableMeta
);
assert
(
pTableMeta
!=
NULL
);
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pTableMeta
,
pTableMetaInfo
->
vgroupList
,
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
}
else
{
// transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo
*
pPrevInfo
=
tscGetTableMetaInfoFromCmd
(
&
pPrevSql
->
cmd
,
pPrevSql
->
cmd
.
clauseIndex
,
0
);
...
...
src/common/inc/tglobal.h
浏览文件 @
d1d02a2f
...
...
@@ -32,8 +32,8 @@ extern uint16_t tsSyncPort;
extern
uint16_t
tsArbitratorPort
;
extern
int32_t
tsStatusInterval
;
extern
int32_t
tsNumOfMnodes
;
extern
int
32_t
tsEnableVnodeBak
;
extern
int
32_t
tsEnableTelemetryReporting
;
extern
int
8_t
tsEnableVnodeBak
;
extern
int
8_t
tsEnableTelemetryReporting
;
extern
char
tsEmail
[];
extern
char
tsArbitrator
[];
...
...
@@ -51,7 +51,7 @@ extern int8_t tsDaylight;
extern
char
tsTimezone
[];
extern
char
tsLocale
[];
extern
char
tsCharset
[];
// default encode string
extern
int
32_t
tsEnableCoreFile
;
extern
int
8_t
tsEnableCoreFile
;
extern
int32_t
tsCompressMsgSize
;
extern
char
tsTempDir
[];
...
...
@@ -59,12 +59,12 @@ extern char tsTempDir[];
extern
int32_t
tsQueryBufferSize
;
// maximum allowed usage buffer for each data node during query processing
extern
int32_t
tsRetrieveBlockingModel
;
// retrieve threads will be blocked
extern
int
32_t
tsKeepOriginalColumnName
;
extern
int
8_t
tsKeepOriginalColumnName
;
// client
extern
int32_t
tsTableMetaKeepTimer
;
extern
int32_t
tsMaxSQLStringLen
;
extern
int
32_t
tsTscEnableRecordSql
;
extern
int
8_t
tsTscEnableRecordSql
;
extern
int32_t
tsMaxNumOfOrderedResults
;
extern
int32_t
tsMinSlidingTime
;
extern
int32_t
tsMinIntervalTime
;
...
...
@@ -93,50 +93,51 @@ extern int16_t tsWAL;
extern
int32_t
tsFsyncPeriod
;
extern
int32_t
tsReplications
;
extern
int32_t
tsQuorum
;
extern
int
32_t
tsUpdate
;
extern
int
32_t
tsCacheLastRow
;
extern
int
8_t
tsUpdate
;
extern
int
8_t
tsCacheLastRow
;
// balance
extern
int
32_t
tsEnableBalance
;
extern
int
32_t
tsAlternativeRole
;
extern
int
8_t
tsEnableBalance
;
extern
int
8_t
tsAlternativeRole
;
extern
int32_t
tsBalanceInterval
;
extern
int32_t
tsOfflineThreshold
;
extern
int32_t
tsMnodeEqualVnodeNum
;
extern
int32_t
tsEnableFlowCtrl
;
extern
int32_t
tsEnableSlaveQuery
;
extern
int8_t
tsEnableFlowCtrl
;
extern
int8_t
tsEnableSlaveQuery
;
extern
int8_t
tsEnableAdjustMaster
;
// restful
extern
int
32_t
tsEnableHttpModule
;
extern
int
8_t
tsEnableHttpModule
;
extern
int32_t
tsRestRowLimit
;
extern
uint16_t
tsHttpPort
;
extern
int32_t
tsHttpCacheSessions
;
extern
int32_t
tsHttpSessionExpire
;
extern
int32_t
tsHttpMaxThreads
;
extern
int
32_t
tsHttpEnableCompress
;
extern
int
32_t
tsHttpEnableRecordSql
;
extern
int
32_t
tsTelegrafUseFieldNum
;
extern
int
8_t
tsHttpEnableCompress
;
extern
int
8_t
tsHttpEnableRecordSql
;
extern
int
8_t
tsTelegrafUseFieldNum
;
// mqtt
extern
int
32
_t
tsEnableMqttModule
;
extern
char
tsMqttHostName
[];
extern
char
tsMqttPort
[];
extern
char
tsMqttUser
[];
extern
char
tsMqttPass
[];
extern
char
tsMqttClientId
[];
extern
char
tsMqttTopic
[];
extern
int
8
_t
tsEnableMqttModule
;
extern
char
tsMqttHostName
[];
extern
char
tsMqttPort
[];
extern
char
tsMqttUser
[];
extern
char
tsMqttPass
[];
extern
char
tsMqttClientId
[];
extern
char
tsMqttTopic
[];
// monitor
extern
int
32_t
tsEnableMonitorModule
;
extern
int
8_t
tsEnableMonitorModule
;
extern
char
tsMonitorDbName
[];
extern
char
tsInternalPass
[];
extern
int32_t
tsMonitorInterval
;
// stream
extern
int
32
_t
tsEnableStream
;
extern
int
8
_t
tsEnableStream
;
// internal
extern
int
32_t
tsPrintAuth
;
extern
uint32_t
tscEmbedded
;
extern
int
8_t
tsPrintAuth
;
extern
int8_t
tscEmbedded
;
extern
char
configDir
[];
extern
char
tsVnodeDir
[];
extern
char
tsDnodeDir
[];
...
...
@@ -173,13 +174,13 @@ extern char gitinfoOfInternal[];
extern
char
buildinfo
[];
// log
extern
int
32_t
tsAsyncLog
;
extern
int
8_t
tsAsyncLog
;
extern
int32_t
tsNumOfLogLines
;
extern
int32_t
tsLogKeepDays
;
extern
int32_t
dDebugFlag
;
extern
int32_t
vDebugFlag
;
extern
int32_t
mDebugFlag
;
extern
u
int32_t
cDebugFlag
;
extern
int32_t
cDebugFlag
;
extern
int32_t
jniDebugFlag
;
extern
int32_t
tmrDebugFlag
;
extern
int32_t
sdbDebugFlag
;
...
...
@@ -189,7 +190,7 @@ extern int32_t monDebugFlag;
extern
int32_t
uDebugFlag
;
extern
int32_t
rpcDebugFlag
;
extern
int32_t
odbcDebugFlag
;
extern
u
int32_t
qDebugFlag
;
extern
int32_t
qDebugFlag
;
extern
int32_t
wDebugFlag
;
extern
int32_t
cqDebugFlag
;
extern
int32_t
debugFlag
;
...
...
src/common/inc/tulog.h
浏览文件 @
d1d02a2f
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include "tlog.h"
extern
int32_t
uDebugFlag
;
extern
uint32_t
tscEmbedded
;
extern
int8_t
tscEmbedded
;
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
...
...
src/common/src/tglobal.c
浏览文件 @
d1d02a2f
...
...
@@ -39,8 +39,8 @@ uint16_t tsSyncPort = 6040;
uint16_t
tsArbitratorPort
=
6042
;
int32_t
tsStatusInterval
=
1
;
// second
int32_t
tsNumOfMnodes
=
3
;
int
32_t
tsEnableVnodeBak
=
1
;
int
32_t
tsEnableTelemetryReporting
=
1
;
int
8_t
tsEnableVnodeBak
=
1
;
int
8_t
tsEnableTelemetryReporting
=
1
;
char
tsEmail
[
TSDB_FQDN_LEN
]
=
{
0
};
// common
...
...
@@ -56,7 +56,7 @@ int8_t tsDaylight = 0;
char
tsTimezone
[
TSDB_TIMEZONE_LEN
]
=
{
0
};
char
tsLocale
[
TSDB_LOCALE_LEN
]
=
{
0
};
char
tsCharset
[
TSDB_LOCALE_LEN
]
=
{
0
};
// default encode string
int
32_t
tsEnableCoreFile
=
0
;
int
8_t
tsEnableCoreFile
=
0
;
int32_t
tsMaxBinaryDisplayWidth
=
30
;
char
tsTempDir
[
TSDB_FILENAME_LEN
]
=
"/tmp/"
;
...
...
@@ -73,7 +73,7 @@ int32_t tsCompressMsgSize = -1;
// client
int32_t
tsTableMetaKeepTimer
=
7200
;
// second
int32_t
tsMaxSQLStringLen
=
TSDB_MAX_SQL_LEN
;
int
32_t
tsTscEnableRecordSql
=
0
;
int
8_t
tsTscEnableRecordSql
=
0
;
// the maximum number of results for projection query on super table that are returned from
// one virtual node, to order according to timestamp
...
...
@@ -110,7 +110,7 @@ int32_t tsQueryBufferSize = -1;
int32_t
tsRetrieveBlockingModel
=
0
;
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
int
32_t
tsKeepOriginalColumnName
=
0
;
int
8_t
tsKeepOriginalColumnName
=
0
;
// db parameters
int32_t
tsCacheBlockSize
=
TSDB_DEFAULT_CACHE_BLOCK_SIZE
;
...
...
@@ -126,35 +126,36 @@ int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
int32_t
tsFsyncPeriod
=
TSDB_DEFAULT_FSYNC_PERIOD
;
int32_t
tsReplications
=
TSDB_DEFAULT_DB_REPLICA_OPTION
;
int32_t
tsQuorum
=
TSDB_DEFAULT_DB_QUORUM_OPTION
;
int
32_t
tsUpdate
=
TSDB_DEFAULT_DB_UPDATE_OPTION
;
int
32_t
tsCacheLastRow
=
TSDB_DEFAULT_CACHE_BLOCK_SIZE
;
int
8_t
tsUpdate
=
TSDB_DEFAULT_DB_UPDATE_OPTION
;
int
8_t
tsCacheLastRow
=
TSDB_DEFAULT_CACHE_BLOCK_SIZE
;
int32_t
tsMaxVgroupsPerDb
=
0
;
int32_t
tsMinTablePerVnode
=
TSDB_TABLES_STEP
;
int32_t
tsMaxTablePerVnode
=
TSDB_DEFAULT_TABLES
;
int32_t
tsTableIncStepPerVnode
=
TSDB_TABLES_STEP
;
// balance
int
32_t
tsEnableBalance
=
1
;
int
32_t
tsAlternativeRole
=
0
;
int32_t
tsBalanceInterval
=
300
;
// seconds
int32_t
tsOfflineThreshold
=
86400
*
100
;
// seconds 10days
int
8_t
tsEnableBalance
=
1
;
int
8_t
tsAlternativeRole
=
0
;
int32_t
tsBalanceInterval
=
300
;
// seconds
int32_t
tsOfflineThreshold
=
86400
*
100
;
// seconds 10days
int32_t
tsMnodeEqualVnodeNum
=
4
;
int32_t
tsEnableFlowCtrl
=
1
;
int32_t
tsEnableSlaveQuery
=
1
;
int8_t
tsEnableFlowCtrl
=
1
;
int8_t
tsEnableSlaveQuery
=
1
;
int8_t
tsEnableAdjustMaster
=
1
;
// restful
int
32_t
tsEnableHttpModule
=
1
;
int
8_t
tsEnableHttpModule
=
1
;
int32_t
tsRestRowLimit
=
10240
;
uint16_t
tsHttpPort
=
6041
;
// only tcp, range tcp[6041]
int32_t
tsHttpCacheSessions
=
1000
;
int32_t
tsHttpSessionExpire
=
36000
;
int32_t
tsHttpMaxThreads
=
2
;
int
32_t
tsHttpEnableCompress
=
1
;
int
32_t
tsHttpEnableRecordSql
=
0
;
int
32_t
tsTelegrafUseFieldNum
=
0
;
int
8_t
tsHttpEnableCompress
=
1
;
int
8_t
tsHttpEnableRecordSql
=
0
;
int
8_t
tsTelegrafUseFieldNum
=
0
;
// mqtt
int
32
_t
tsEnableMqttModule
=
0
;
// not finished yet, not started it by default
int
8
_t
tsEnableMqttModule
=
0
;
// not finished yet, not started it by default
char
tsMqttHostName
[
TSDB_MQTT_HOSTNAME_LEN
]
=
"test.mosquitto.org"
;
char
tsMqttPort
[
TSDB_MQTT_PORT_LEN
]
=
"1883"
;
char
tsMqttUser
[
TSDB_MQTT_USER_LEN
]
=
{
0
};
...
...
@@ -163,24 +164,24 @@ char tsMqttClientId[TSDB_MQTT_CLIENT_ID_LEN] = "TDengineMqttSubscriber";
char
tsMqttTopic
[
TSDB_MQTT_TOPIC_LEN
]
=
"/test"
;
// #
// monitor
int
32_t
tsEnableMonitorModule
=
1
;
int
8_t
tsEnableMonitorModule
=
1
;
char
tsMonitorDbName
[
TSDB_DB_NAME_LEN
]
=
"log"
;
char
tsInternalPass
[]
=
"secretkey"
;
int32_t
tsMonitorInterval
=
30
;
// seconds
// stream
int
32_t
tsEnableStream
=
1
;
int
8_t
tsEnableStream
=
1
;
// internal
int
32
_t
tsPrintAuth
=
0
;
uint32
_t
tscEmbedded
=
0
;
char
configDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsVnodeDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsDnodeDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsMnodeDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsDataDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsScriptDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsVnodeBakDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
int
8
_t
tsPrintAuth
=
0
;
int8
_t
tscEmbedded
=
0
;
char
configDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsVnodeDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsDnodeDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsMnodeDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsDataDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsScriptDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsVnodeBakDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
/*
* minimum scale for whole system, millisecond by default
...
...
@@ -211,19 +212,19 @@ int32_t mDebugFlag = 131;
int32_t
sdbDebugFlag
=
131
;
int32_t
dDebugFlag
=
135
;
int32_t
vDebugFlag
=
135
;
u
int32_t
cDebugFlag
=
131
;
int32_t
cDebugFlag
=
131
;
int32_t
jniDebugFlag
=
131
;
int32_t
odbcDebugFlag
=
131
;
int32_t
httpDebugFlag
=
131
;
int32_t
mqttDebugFlag
=
131
;
int32_t
monDebugFlag
=
131
;
u
int32_t
qDebugFlag
=
131
;
int32_t
qDebugFlag
=
131
;
int32_t
rpcDebugFlag
=
131
;
int32_t
uDebugFlag
=
131
;
int32_t
debugFlag
=
0
;
int32_t
sDebugFlag
=
135
;
int32_t
wDebugFlag
=
135
;
u
int32_t
tsdbDebugFlag
=
131
;
int32_t
tsdbDebugFlag
=
131
;
int32_t
cqDebugFlag
=
131
;
int32_t
(
*
monStartSystemFp
)()
=
NULL
;
...
...
@@ -277,12 +278,16 @@ bool taosCfgDynamicOptions(char *msg) {
for
(
int32_t
i
=
0
;
i
<
tsGlobalConfigNum
;
++
i
)
{
SGlobalCfg
*
cfg
=
tsGlobalConfig
+
i
;
//if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
if
(
cfg
->
valType
!=
TAOS_CFG_VTYPE_INT32
)
continue
;
if
(
cfg
->
valType
!=
TAOS_CFG_VTYPE_INT32
&&
cfg
->
valType
!=
TAOS_CFG_VTYPE_INT8
)
continue
;
int32_t
cfgLen
=
(
int32_t
)
strlen
(
cfg
->
option
);
if
(
cfgLen
!=
olen
)
continue
;
if
(
strncasecmp
(
option
,
cfg
->
option
,
olen
)
!=
0
)
continue
;
*
((
int32_t
*
)
cfg
->
ptr
)
=
vint
;
if
(
cfg
->
valType
!=
TAOS_CFG_VTYPE_INT32
)
{
*
((
int32_t
*
)
cfg
->
ptr
)
=
vint
;
}
else
{
*
((
int8_t
*
)
cfg
->
ptr
)
=
(
int8_t
)
vint
;
}
if
(
strncasecmp
(
cfg
->
option
,
"monitor"
,
olen
)
==
0
)
{
if
(
1
==
vint
)
{
...
...
@@ -470,7 +475,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"vnodeBak"
;
cfg
.
ptr
=
&
tsEnableVnodeBak
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -480,7 +485,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"telemetryReporting"
;
cfg
.
ptr
=
&
tsEnableTelemetryReporting
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -490,7 +495,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"balance"
;
cfg
.
ptr
=
&
tsEnableBalance
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -511,7 +516,7 @@ static void doInitGlobalConfig(void) {
// 0-any; 1-mnode; 2-vnode
cfg
.
option
=
"role"
;
cfg
.
ptr
=
&
tsAlternativeRole
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
2
;
...
...
@@ -813,7 +818,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"update"
;
cfg
.
ptr
=
&
tsUpdate
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
TSDB_MIN_DB_UPDATE
;
cfg
.
maxValue
=
TSDB_MAX_DB_UPDATE
;
...
...
@@ -903,7 +908,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"keepColumnName"
;
cfg
.
ptr
=
&
tsKeepOriginalColumnName
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1007,7 +1012,7 @@ static void doInitGlobalConfig(void) {
// module configs
cfg
.
option
=
"flowctrl"
;
cfg
.
ptr
=
&
tsEnableFlowCtrl
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1017,7 +1022,17 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"slaveQuery"
;
cfg
.
ptr
=
&
tsEnableSlaveQuery
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"adjustMaster"
;
cfg
.
ptr
=
&
tsEnableAdjustMaster
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1027,7 +1042,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"http"
;
cfg
.
ptr
=
&
tsEnableHttpModule
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1037,7 +1052,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"mqtt"
;
cfg
.
ptr
=
&
tsEnableMqttModule
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1047,7 +1062,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"monitor"
;
cfg
.
ptr
=
&
tsEnableMonitorModule
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1057,7 +1072,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"stream"
;
cfg
.
ptr
=
&
tsEnableStream
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1067,7 +1082,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"httpEnableRecordSql"
;
cfg
.
ptr
=
&
tsHttpEnableRecordSql
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1077,7 +1092,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"telegrafUseFieldNum"
;
cfg
.
ptr
=
&
tsTelegrafUseFieldNum
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1128,7 +1143,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"asyncLog"
;
cfg
.
ptr
=
&
tsAsyncLog
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
16
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_LOG
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1329,7 +1344,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"enableRecordSql"
;
cfg
.
ptr
=
&
tsTscEnableRecordSql
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
@@ -1339,7 +1354,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"enableCoreFile"
;
cfg
.
ptr
=
&
tsEnableCoreFile
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
32
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT
8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
...
...
src/dnode/inc/dnodeCfg.h
浏览文件 @
d1d02a2f
...
...
@@ -25,6 +25,7 @@ int32_t dnodeInitCfg();
void
dnodeCleanupCfg
();
void
dnodeUpdateCfg
(
SDnodeCfg
*
cfg
);
int32_t
dnodeGetDnodeId
();
void
dnodeGetClusterId
(
char
*
clusterId
);
void
dnodeGetCfg
(
int32_t
*
dnodeId
,
char
*
clusterId
);
#ifdef __cplusplus
...
...
src/dnode/src/dnodeCfg.c
浏览文件 @
d1d02a2f
...
...
@@ -51,6 +51,12 @@ int32_t dnodeGetDnodeId() {
return
dnodeId
;
}
void
dnodeGetClusterId
(
char
*
clusterId
)
{
pthread_mutex_lock
(
&
tsCfgMutex
);
tstrncpy
(
clusterId
,
tsCfg
.
clusterId
,
TSDB_CLUSTER_ID_LEN
);
pthread_mutex_unlock
(
&
tsCfgMutex
);
}
void
dnodeGetCfg
(
int32_t
*
dnodeId
,
char
*
clusterId
)
{
pthread_mutex_lock
(
&
tsCfgMutex
);
*
dnodeId
=
tsCfg
.
dnodeId
;
...
...
src/dnode/src/dnodeMRead.c
浏览文件 @
d1d02a2f
...
...
@@ -16,9 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tqueue.h"
#include "twal.h"
#include "mnode.h"
#include "dnodeVMgmt.h"
#include "dnodeMInfos.h"
#include "dnodeMRead.h"
...
...
src/dnode/src/dnodeMWrite.c
浏览文件 @
d1d02a2f
...
...
@@ -18,7 +18,6 @@
#include "ttimer.h"
#include "tqueue.h"
#include "mnode.h"
#include "dnodeVMgmt.h"
#include "dnodeMInfos.h"
#include "dnodeMWrite.h"
...
...
src/dnode/src/dnodeVnodes.c
浏览文件 @
d1d02a2f
...
...
@@ -245,12 +245,11 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
pStatus
->
lastReboot
=
htonl
(
tsRebootTime
);
pStatus
->
numOfCores
=
htons
((
uint16_t
)
tsNumOfCores
);
pStatus
->
diskAvailable
=
tsAvailDataDirGB
;
pStatus
->
alternativeRole
=
(
uint8_t
)
tsAlternativeRole
;
pStatus
->
alternativeRole
=
tsAlternativeRole
;
tstrncpy
(
pStatus
->
dnodeEp
,
tsLocalEp
,
TSDB_EP_LEN
);
// fill cluster cfg parameters
pStatus
->
clusterCfg
.
numOfMnodes
=
htonl
(
tsNumOfMnodes
);
pStatus
->
clusterCfg
.
enableBalance
=
htonl
(
tsEnableBalance
);
pStatus
->
clusterCfg
.
mnodeEqualVnodeNum
=
htonl
(
tsMnodeEqualVnodeNum
);
pStatus
->
clusterCfg
.
offlineThreshold
=
htonl
(
tsOfflineThreshold
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
tsStatusInterval
);
...
...
@@ -262,7 +261,12 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
pStatus
->
clusterCfg
.
checkTime
,
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
tstrncpy
(
pStatus
->
clusterCfg
.
locale
,
tsLocale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
tsCharset
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
tsCharset
,
TSDB_LOCALE_LEN
);
pStatus
->
clusterCfg
.
enableBalance
=
tsEnableBalance
;
pStatus
->
clusterCfg
.
flowCtrl
=
tsEnableFlowCtrl
;
pStatus
->
clusterCfg
.
slaveQuery
=
tsEnableSlaveQuery
;
pStatus
->
clusterCfg
.
adjustMaster
=
tsEnableAdjustMaster
;
vnodeBuildStatusMsg
(
pStatus
);
contLen
=
sizeof
(
SStatusMsg
)
+
pStatus
->
openVnodes
*
sizeof
(
SVnodeLoad
);
...
...
src/inc/dnode.h
浏览文件 @
d1d02a2f
...
...
@@ -36,6 +36,8 @@ bool dnodeIsMasterEp(char *ep);
void
dnodeGetEpSetForPeer
(
SRpcEpSet
*
epSet
);
void
dnodeGetEpSetForShell
(
SRpcEpSet
*
epSet
);
int32_t
dnodeGetDnodeId
();
void
dnodeGetClusterId
(
char
*
clusterId
);
void
dnodeUpdateEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
bool
dnodeCheckEpChanged
(
int32_t
dnodeId
,
char
*
epstr
);
bool
dnodeStartMnode
(
SMInfos
*
pMinfos
);
...
...
@@ -80,4 +82,4 @@ void dnodeReportStep(char *name, char *desc, int8_t finished);
}
#endif
#endif
\ No newline at end of file
#endif
src/inc/taosmsg.h
浏览文件 @
d1d02a2f
...
...
@@ -324,6 +324,7 @@ typedef struct {
typedef
struct
{
char
acctId
[
TSDB_ACCT_LEN
];
char
serverVersion
[
TSDB_VERSION_LEN
];
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
int8_t
writeAuth
;
int8_t
superAuth
;
int8_t
reserved1
;
...
...
@@ -605,7 +606,6 @@ typedef struct {
typedef
struct
{
int32_t
numOfMnodes
;
// tsNumOfMnodes
int32_t
enableBalance
;
// tsEnableBalance
int32_t
mnodeEqualVnodeNum
;
// tsMnodeEqualVnodeNum
int32_t
offlineThreshold
;
// tsOfflineThreshold
int32_t
statusInterval
;
// tsStatusInterval
...
...
@@ -616,6 +616,11 @@ typedef struct {
int64_t
checkTime
;
// 1970-01-01 00:00:00.000
char
locale
[
TSDB_LOCALE_LEN
];
// tsLocale
char
charset
[
TSDB_LOCALE_LEN
];
// tsCharset
int8_t
enableBalance
;
// tsEnableBalance
int8_t
flowCtrl
;
int8_t
slaveQuery
;
int8_t
adjustMaster
;
int8_t
reserved
[
4
];
}
SClusterCfg
;
typedef
struct
{
...
...
src/mnode/inc/mnodeDnode.h
浏览文件 @
d1d02a2f
...
...
@@ -52,6 +52,9 @@ typedef enum EDnodeOfflineReason {
TAOS_DN_OFF_TIME_ZONE_NOT_MATCH
,
TAOS_DN_OFF_LOCALE_NOT_MATCH
,
TAOS_DN_OFF_CHARSET_NOT_MATCH
,
TAOS_DN_OFF_FLOW_CTRL_NOT_MATCH
,
TAOS_DN_OFF_SLAVE_QUERY_NOT_MATCH
,
TAOS_DN_OFF_ADJUST_MASTER_NOT_MATCH
,
TAOS_DN_OFF_OTHERS
}
EDnodeOfflineReason
;
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
d1d02a2f
...
...
@@ -375,10 +375,6 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) {
mError
(
"
\"
numOfMnodes
\"
[%d - %d] cfg parameters inconsistent"
,
clusterCfg
->
numOfMnodes
,
htonl
(
tsNumOfMnodes
));
return
TAOS_DN_OFF_NUM_OF_MNODES_NOT_MATCH
;
}
if
(
clusterCfg
->
enableBalance
!=
htonl
(
tsEnableBalance
))
{
mError
(
"
\"
balance
\"
[%d - %d] cfg parameters inconsistent"
,
clusterCfg
->
enableBalance
,
htonl
(
tsEnableBalance
));
return
TAOS_DN_OFF_ENABLE_BALANCE_NOT_MATCH
;
}
if
(
clusterCfg
->
mnodeEqualVnodeNum
!=
htonl
(
tsMnodeEqualVnodeNum
))
{
mError
(
"
\"
mnodeEqualVnodeNum
\"
[%d - %d] cfg parameters inconsistent"
,
clusterCfg
->
mnodeEqualVnodeNum
,
htonl
(
tsMnodeEqualVnodeNum
));
...
...
@@ -428,6 +424,23 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) {
return
TAOS_DN_OFF_CHARSET_NOT_MATCH
;
}
if
(
clusterCfg
->
enableBalance
!=
tsEnableBalance
)
{
mError
(
"
\"
balance
\"
[%d - %d] cfg parameters inconsistent"
,
clusterCfg
->
enableBalance
,
tsEnableBalance
);
return
TAOS_DN_OFF_ENABLE_BALANCE_NOT_MATCH
;
}
if
(
clusterCfg
->
flowCtrl
!=
tsEnableFlowCtrl
)
{
mError
(
"
\"
flowCtrl
\"
[%d - %d] cfg parameters inconsistent"
,
clusterCfg
->
flowCtrl
,
tsEnableFlowCtrl
);
return
TAOS_DN_OFF_FLOW_CTRL_NOT_MATCH
;
}
if
(
clusterCfg
->
slaveQuery
!=
tsEnableSlaveQuery
)
{
mError
(
"
\"
slaveQuery
\"
[%d - %d] cfg parameters inconsistent"
,
clusterCfg
->
slaveQuery
,
tsEnableSlaveQuery
);
return
TAOS_DN_OFF_SLAVE_QUERY_NOT_MATCH
;
}
if
(
clusterCfg
->
adjustMaster
!=
tsEnableAdjustMaster
)
{
mError
(
"
\"
adjustMaster
\"
[%d - %d] cfg parameters inconsistent"
,
clusterCfg
->
adjustMaster
,
tsEnableAdjustMaster
);
return
TAOS_DN_OFF_ADJUST_MASTER_NOT_MATCH
;
}
return
0
;
}
...
...
@@ -1031,6 +1044,11 @@ static int32_t mnodeRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, v
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
switch
(
cfg
->
valType
)
{
case
TAOS_CFG_VTYPE_INT8
:
t
=
snprintf
(
varDataVal
(
pWrite
),
TSDB_CFG_VALUE_LEN
,
"%d"
,
*
((
int8_t
*
)
cfg
->
ptr
));
varDataSetLen
(
pWrite
,
t
);
numOfRows
++
;
break
;
case
TAOS_CFG_VTYPE_INT16
:
t
=
snprintf
(
varDataVal
(
pWrite
),
TSDB_CFG_VALUE_LEN
,
"%d"
,
*
((
int16_t
*
)
cfg
->
ptr
));
varDataSetLen
(
pWrite
,
t
);
...
...
src/mnode/src/mnodeShow.c
浏览文件 @
d1d02a2f
...
...
@@ -351,6 +351,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
mnodeGetMnodeEpSetForShell
(
&
pConnectRsp
->
epSet
,
false
);
dnodeGetClusterId
(
pConnectRsp
->
clusterId
);
connect_over:
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pConnectRsp
)
rpcFreeCont
(
pConnectRsp
);
...
...
src/query/inc/qUtil.h
浏览文件 @
d1d02a2f
...
...
@@ -34,17 +34,13 @@ int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
resetResultRowInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
void
popFrontResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
num
);
void
clearClosedResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
removeRedundantResultRows
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
clearResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
);
void
copyResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
dst
,
const
SResultRow
*
src
,
int16_t
type
);
SResultRowCellInfo
*
getResultCell
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
const
SResultRow
*
pRow
,
int32_t
index
);
...
...
src/query/inc/queryLog.h
浏览文件 @
d1d02a2f
...
...
@@ -22,8 +22,8 @@ extern "C" {
#include "tlog.h"
extern
u
int32_t
qDebugFlag
;
extern
uint32_t
tscEmbedded
;
extern
int32_t
qDebugFlag
;
extern
int8_t
tscEmbedded
;
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", 255, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", 255, __VA_ARGS__); }} while(0)
...
...
src/query/src/qExecutor.c
浏览文件 @
d1d02a2f
...
...
@@ -753,11 +753,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
}
static
void
updateResultRowIndex
(
SResultRowInfo
*
pResultRowInfo
,
STableQueryInfo
*
pTableQueryInfo
,
bool
ascQuery
)
{
if
((
pTableQueryInfo
->
lastKey
>
=
pTableQueryInfo
->
win
.
ekey
&&
ascQuery
)
||
(
pTableQueryInfo
->
lastKey
<=
pTableQueryInfo
->
win
.
ekey
&&
(
!
ascQuery
)))
{
if
((
pTableQueryInfo
->
lastKey
>
pTableQueryInfo
->
win
.
ekey
&&
ascQuery
)
||
(
pTableQueryInfo
->
lastKey
<
pTableQueryInfo
->
win
.
ekey
&&
(
!
ascQuery
)))
{
closeAllResultRows
(
pResultRowInfo
);
pResultRowInfo
->
curIndex
=
pResultRowInfo
->
size
-
1
;
}
else
{
doUpdateResultRowIndex
(
pResultRowInfo
,
pTableQueryInfo
->
lastKey
,
ascQuery
);
int32_t
step
=
ascQuery
?
1
:-
1
;
doUpdateResultRowIndex
(
pResultRowInfo
,
pTableQueryInfo
->
lastKey
-
step
,
ascQuery
);
}
}
...
...
@@ -1198,8 +1199,12 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
// prev time window not interpolation yet.
int32_t
curIndex
=
curTimeWindowIndex
(
pWindowResInfo
);
if
(
prevIndex
!=
-
1
&&
prevIndex
<
curIndex
&&
pRuntimeEnv
->
timeWindowInterpo
)
{
for
(
int32_t
j
=
prevIndex
;
j
<
curIndex
;
++
j
)
{
for
(
int32_t
j
=
prevIndex
;
j
<
curIndex
;
++
j
)
{
// previous time window may be all closed already.
SResultRow
*
pRes
=
pWindowResInfo
->
pResult
[
j
];
if
(
pRes
->
closed
)
{
assert
(
resultRowInterpolated
(
pRes
,
RESULT_ROW_START_INTERP
)
&&
resultRowInterpolated
(
pRes
,
RESULT_ROW_END_INTERP
));
continue
;
}
STimeWindow
w
=
pRes
->
win
;
ret
=
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
&
w
,
masterScan
,
&
pResult
,
groupId
);
...
...
@@ -1600,6 +1605,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if
(
prevWindowIndex
!=
-
1
&&
prevWindowIndex
<
curIndex
)
{
for
(
int32_t
k
=
prevWindowIndex
;
k
<
curIndex
;
++
k
)
{
SResultRow
*
pRes
=
pWindowResInfo
->
pResult
[
k
];
if
(
pRes
->
closed
)
{
assert
(
resultRowInterpolated
(
pResult
,
RESULT_ROW_START_INTERP
)
&&
resultRowInterpolated
(
pResult
,
RESULT_ROW_END_INTERP
));
continue
;
}
ret
=
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
&
pRes
->
win
,
masterScan
,
&
pResult
,
groupId
);
assert
(
ret
==
TSDB_CODE_SUCCESS
&&
!
resultRowInterpolated
(
pResult
,
RESULT_ROW_END_INTERP
));
...
...
@@ -1713,10 +1722,6 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
blockwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pResultRowInfo
,
searchFn
,
pDataBlock
);
}
// update the lastkey of current table
TSKEY
lastKey
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pDataBlockInfo
->
window
.
ekey
:
pDataBlockInfo
->
window
.
skey
;
pTableQueryInfo
->
lastKey
=
lastKey
+
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
// interval query with limit applied
int32_t
numOfRes
=
0
;
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
pRuntimeEnv
->
groupbyNormalCol
)
{
...
...
@@ -5181,10 +5186,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
scanMultiTableDataBlocks
(
pQInfo
);
pQInfo
->
groupIndex
+=
1
;
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
taosArrayDestroy
(
s
)
;
// no results generated for current group, continue to try the next group
taosArrayDestroy
(
s
)
;
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
pWindowResInfo
->
size
<=
0
)
{
continue
;
}
...
...
@@ -5211,8 +5216,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQInfo
->
groupIndex
=
currentGroupIndex
;
// restore the group index
assert
(
pQuery
->
rec
.
rows
==
pWindowResInfo
->
size
);
clearClosedResultRows
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
resetResultRowInfo
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
break
;
}
}
else
if
(
pRuntimeEnv
->
queryWindowIdentical
&&
pRuntimeEnv
->
pTsBuf
==
NULL
&&
!
isTSCompQuery
(
pQuery
))
{
...
...
src/query/src/qUtil.c
浏览文件 @
d1d02a2f
...
...
@@ -20,18 +20,6 @@
#include "qExecutor.h"
#include "qUtil.h"
static
int32_t
getResultRowKeyInfo
(
SResultRow
*
pResult
,
int16_t
type
,
char
**
key
,
int16_t
*
bytes
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
*
key
=
varDataVal
(
pResult
->
key
);
*
bytes
=
varDataLen
(
pResult
->
key
);
}
else
{
*
key
=
(
char
*
)
&
pResult
->
win
.
skey
;
*
bytes
=
tDataTypeDesc
[
type
].
nSize
;
}
return
0
;
}
int32_t
getOutputInterResultBufSize
(
SQuery
*
pQuery
)
{
int32_t
size
=
0
;
...
...
@@ -99,73 +87,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
}
void
popFrontResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
num
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
||
pResultRowInfo
->
size
==
0
||
num
==
0
)
{
return
;
}
int32_t
numOfClosed
=
numOfClosedResultRows
(
pResultRowInfo
);
assert
(
num
>=
0
&&
num
<=
numOfClosed
);
int16_t
type
=
pResultRowInfo
->
type
;
int64_t
uid
=
0
;
char
*
key
=
NULL
;
int16_t
bytes
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SResultRow
*
pResult
=
pResultRowInfo
->
pResult
[
i
];
if
(
pResult
->
closed
)
{
// remove the window slot from hash table
getResultRowKeyInfo
(
pResult
,
type
,
&
key
,
&
bytes
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
taosHashRemove
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
}
else
{
break
;
}
}
int32_t
remain
=
pResultRowInfo
->
size
-
num
;
// clear all the closed windows from the window list
for
(
int32_t
k
=
0
;
k
<
remain
;
++
k
)
{
copyResultRow
(
pRuntimeEnv
,
pResultRowInfo
->
pResult
[
k
],
pResultRowInfo
->
pResult
[
num
+
k
],
type
);
}
// move the unclosed window in the front of the window list
for
(
int32_t
k
=
remain
;
k
<
pResultRowInfo
->
size
;
++
k
)
{
SResultRow
*
pWindowRes
=
pResultRowInfo
->
pResult
[
k
];
clearResultRow
(
pRuntimeEnv
,
pWindowRes
,
pResultRowInfo
->
type
);
}
pResultRowInfo
->
size
=
remain
;
for
(
int32_t
k
=
0
;
k
<
pResultRowInfo
->
size
;
++
k
)
{
SResultRow
*
pResult
=
pResultRowInfo
->
pResult
[
k
];
getResultRowKeyInfo
(
pResult
,
type
,
&
key
,
&
bytes
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
assert
(
p
!=
NULL
);
int32_t
v
=
(
*
p
-
num
);
assert
(
v
>=
0
&&
v
<=
pResultRowInfo
->
size
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
taosHashPut
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
pResultRowInfo
->
curIndex
=
-
1
;
}
void
clearClosedResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
||
pResultRowInfo
->
size
==
0
)
{
return
;
}
int32_t
numOfClosed
=
numOfClosedResultRows
(
pResultRowInfo
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
numOfClosed
);
}
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
int32_t
i
=
0
;
while
(
i
<
pResultRowInfo
->
size
&&
pResultRowInfo
->
pResult
[
i
]
->
closed
)
{
...
...
@@ -188,40 +109,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
}
}
/*
* remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order
*/
void
removeRedundantResultRows
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
)
{
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
if
(
pResultRowInfo
->
size
<=
1
)
{
return
;
}
// get the result order
int32_t
resultOrder
=
(
pResultRowInfo
->
pResult
[
0
]
->
win
.
skey
<
pResultRowInfo
->
pResult
[
1
]
->
win
.
skey
)
?
1
:-
1
;
if
(
order
!=
resultOrder
)
{
return
;
}
int32_t
i
=
0
;
if
(
order
==
QUERY_ASC_FORWARD_STEP
)
{
TSKEY
ekey
=
pResultRowInfo
->
pResult
[
i
]
->
win
.
ekey
;
while
(
i
<
pResultRowInfo
->
size
&&
(
ekey
<
lastKey
))
{
++
i
;
}
}
else
if
(
order
==
QUERY_DESC_FORWARD_STEP
)
{
while
(
i
<
pResultRowInfo
->
size
&&
(
pResultRowInfo
->
pResult
[
i
]
->
win
.
skey
>
lastKey
))
{
++
i
;
}
}
if
(
i
<
pResultRowInfo
->
size
)
{
pResultRowInfo
->
size
=
(
i
+
1
);
}
}
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
return
(
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
==
true
);
}
...
...
@@ -262,47 +149,6 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
}
}
/**
* The source window result pos attribution of the source window result does not assign to the destination,
* since the attribute of "Pos" is bound to each window result when the window result is created in the
* disk-based result buffer.
*/
void
copyResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
dst
,
const
SResultRow
*
src
,
int16_t
type
)
{
dst
->
numOfRows
=
src
->
numOfRows
;
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
dst
->
key
=
realloc
(
dst
->
key
,
varDataTLen
(
src
->
key
));
varDataCopy
(
dst
->
key
,
src
->
key
);
}
else
{
dst
->
win
=
src
->
win
;
}
dst
->
closed
=
src
->
closed
;
int32_t
nOutputCols
=
pRuntimeEnv
->
pQuery
->
numOfOutput
;
for
(
int32_t
i
=
0
;
i
<
nOutputCols
;
++
i
)
{
SResultRowCellInfo
*
pDst
=
getResultCell
(
pRuntimeEnv
,
dst
,
i
);
SResultRowCellInfo
*
pSrc
=
getResultCell
(
pRuntimeEnv
,
src
,
i
);
// char *buf = pDst->interResultBuf;
memcpy
(
pDst
,
pSrc
,
sizeof
(
SResultRowCellInfo
)
+
pRuntimeEnv
->
pCtx
[
i
].
interBufBytes
);
// pDst->interResultBuf = buf; // restore the allocated buffer
// copy the result info struct
// memcpy(pDst->interResultBuf, pSrc->interResultBuf, pRuntimeEnv->pCtx[i].interBufBytes);
// copy the output buffer data from src to dst, the position info keep unchanged
tFilePage
*
dstpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
dst
->
pageId
);
char
*
dstBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
dst
,
dstpage
);
tFilePage
*
srcpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
src
->
pageId
);
char
*
srcBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
(
SResultRow
*
)
src
,
srcpage
);
size_t
s
=
pRuntimeEnv
->
pQuery
->
pExpr1
[
i
].
bytes
;
memcpy
(
dstBuf
,
srcBuf
,
s
);
}
}
SResultRowCellInfo
*
getResultCell
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
const
SResultRow
*
pRow
,
int32_t
index
)
{
assert
(
index
>=
0
&&
index
<
pRuntimeEnv
->
pQuery
->
numOfOutput
);
return
(
SResultRowCellInfo
*
)((
char
*
)
pRow
->
pCellInfo
+
pRuntimeEnv
->
rowCellInfoOffset
[
index
]);
...
...
src/rpc/inc/rpcLog.h
浏览文件 @
d1d02a2f
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include "tlog.h"
extern
int32_t
rpcDebugFlag
;
extern
uint32_t
tscEmbedded
;
extern
int8_t
tscEmbedded
;
#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
d1d02a2f
...
...
@@ -31,7 +31,7 @@
extern
"C"
{
#endif
extern
u
int32_t
tsdbDebugFlag
;
extern
int32_t
tsdbDebugFlag
;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
...
...
src/util/inc/hash.h
浏览文件 @
d1d02a2f
...
...
@@ -32,17 +32,18 @@ typedef void (*_hash_free_fn_t)(void *param);
typedef
struct
SHashNode
{
struct
SHashNode
*
next
;
uint32_t
hashVal
;
// the hash value of key
uint32_t
keyLen
;
// length of the key
size_t
dataLen
;
// length of data
int8_t
count
;
// reference count
int8_t
removed
;
// flag to indicate removed
uint32_t
hashVal
;
// the hash value of key
uint32_t
dataLen
;
// length of data
uint32_t
keyLen
;
// length of the key
int8_t
removed
;
// flag to indicate removed
int8_t
count
;
// reference count
char
data
[];
}
SHashNode
;
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((char*)(_n) - sizeof(SHashNode));
typedef
enum
SHashLockTypeE
{
HASH_NO_LOCK
=
0
,
HASH_ENTRY_LOCK
=
1
,
...
...
@@ -115,7 +116,7 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
* @param dsize
* @return
*/
void
*
taosHashGetC
B
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
(
*
fp
)(
void
*
),
void
*
d
,
size_t
dsize
);
void
*
taosHashGetC
lone
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
(
*
fp
)(
void
*
),
void
*
d
,
size_t
dsize
);
/**
* remove item with the specified key
...
...
src/util/inc/tconfig.h
浏览文件 @
d1d02a2f
...
...
@@ -41,6 +41,7 @@ enum {
};
enum
{
TAOS_CFG_VTYPE_INT8
,
TAOS_CFG_VTYPE_INT16
,
TAOS_CFG_VTYPE_INT32
,
TAOS_CFG_VTYPE_FLOAT
,
...
...
src/util/src/hash.c
浏览文件 @
d1d02a2f
...
...
@@ -271,10 +271,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
}
void
*
taosHashGet
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
return
taosHashGetC
B
(
pHashObj
,
key
,
keyLen
,
NULL
,
NULL
,
0
);
return
taosHashGetC
lone
(
pHashObj
,
key
,
keyLen
,
NULL
,
NULL
,
0
);
}
void
*
taosHashGetC
B
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
(
*
fp
)(
void
*
),
void
*
d
,
size_t
dsize
)
{
void
*
taosHashGetC
lone
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
(
*
fp
)(
void
*
),
void
*
d
,
size_t
dsize
)
{
if
(
pHashObj
->
size
<=
0
||
keyLen
==
0
||
key
==
NULL
)
{
return
NULL
;
}
...
...
@@ -654,7 +654,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
pNewNode
->
keyLen
=
(
uint32_t
)
keyLen
;
pNewNode
->
hashVal
=
hashVal
;
pNewNode
->
dataLen
=
dsize
;
pNewNode
->
dataLen
=
(
uint32_t
)
dsize
;
pNewNode
->
count
=
1
;
memcpy
(
GET_HASH_NODE_DATA
(
pNewNode
),
pData
,
dsize
);
...
...
src/util/src/tcache.c
浏览文件 @
d1d02a2f
...
...
@@ -278,7 +278,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
}
SCacheDataNode
*
ptNode
=
NULL
;
taosHashGetC
B
(
pCacheObj
->
pHashTable
,
key
,
keyLen
,
incRefFn
,
&
ptNode
,
sizeof
(
void
*
));
taosHashGetC
lone
(
pCacheObj
->
pHashTable
,
key
,
keyLen
,
incRefFn
,
&
ptNode
,
sizeof
(
void
*
));
void
*
pData
=
(
ptNode
!=
NULL
)
?
ptNode
->
data
:
NULL
;
...
...
src/util/src/tconfig.c
浏览文件 @
d1d02a2f
...
...
@@ -95,6 +95,23 @@ static void taosReadInt16Config(SGlobalCfg *cfg, char *input_value) {
}
}
static
void
taosReadInt8Config
(
SGlobalCfg
*
cfg
,
char
*
input_value
)
{
int32_t
value
=
atoi
(
input_value
);
int8_t
*
option
=
(
int8_t
*
)
cfg
->
ptr
;
if
(
value
<
cfg
->
minValue
||
value
>
cfg
->
maxValue
)
{
uError
(
"config option:%s, input value:%s, out of range[%f, %f], use default value:%d"
,
cfg
->
option
,
input_value
,
cfg
->
minValue
,
cfg
->
maxValue
,
*
option
);
}
else
{
if
(
cfg
->
cfgStatus
<=
TAOS_CFG_CSTATUS_FILE
)
{
*
option
=
(
int8_t
)
value
;
cfg
->
cfgStatus
=
TAOS_CFG_CSTATUS_FILE
;
}
else
{
uWarn
(
"config option:%s, input value:%s, is configured by %s, use %d"
,
cfg
->
option
,
input_value
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
*
option
);
}
}
}
static
void
taosReadDirectoryConfig
(
SGlobalCfg
*
cfg
,
char
*
input_value
)
{
int
length
=
(
int
)
strlen
(
input_value
);
char
*
option
=
(
char
*
)
cfg
->
ptr
;
...
...
@@ -204,6 +221,9 @@ static void taosReadConfigOption(const char *option, char *value) {
if
(
strcasecmp
(
cfg
->
option
,
option
)
!=
0
)
continue
;
switch
(
cfg
->
valType
)
{
case
TAOS_CFG_VTYPE_INT8
:
taosReadInt8Config
(
cfg
,
value
);
break
;
case
TAOS_CFG_VTYPE_INT16
:
taosReadInt16Config
(
cfg
,
value
);
break
;
...
...
@@ -376,6 +396,9 @@ void taosPrintGlobalCfg() {
blank
[
blankLen
]
=
0
;
switch
(
cfg
->
valType
)
{
case
TAOS_CFG_VTYPE_INT8
:
uInfo
(
" %s:%s%d%s"
,
cfg
->
option
,
blank
,
*
((
int8_t
*
)
cfg
->
ptr
),
tsGlobalUnit
[
cfg
->
unitType
]);
break
;
case
TAOS_CFG_VTYPE_INT16
:
uInfo
(
" %s:%s%d%s"
,
cfg
->
option
,
blank
,
*
((
int16_t
*
)
cfg
->
ptr
),
tsGlobalUnit
[
cfg
->
unitType
]);
break
;
...
...
@@ -408,6 +431,9 @@ static void taosDumpCfg(SGlobalCfg *cfg) {
blank
[
blankLen
]
=
0
;
switch
(
cfg
->
valType
)
{
case
TAOS_CFG_VTYPE_INT8
:
printf
(
" %s:%s%d%s
\n
"
,
cfg
->
option
,
blank
,
*
((
int8_t
*
)
cfg
->
ptr
),
tsGlobalUnit
[
cfg
->
unitType
]);
break
;
case
TAOS_CFG_VTYPE_INT16
:
printf
(
" %s:%s%d%s
\n
"
,
cfg
->
option
,
blank
,
*
((
int16_t
*
)
cfg
->
ptr
),
tsGlobalUnit
[
cfg
->
unitType
]);
break
;
...
...
src/util/src/tlog.c
浏览文件 @
d1d02a2f
...
...
@@ -64,7 +64,7 @@ typedef struct {
}
SLogObj
;
int32_t
tsLogKeepDays
=
0
;
int
32_t
tsAsyncLog
=
1
;
int
8_t
tsAsyncLog
=
1
;
float
tsTotalLogDirGB
=
0
;
float
tsAvailLogDirGB
=
0
;
float
tsMinimalLogDirGB
=
1
.
0
f
;
...
...
src/util/src/ttimer.c
浏览文件 @
d1d02a2f
...
...
@@ -19,7 +19,7 @@
#include "ttimer.h"
#include "tutil.h"
extern
uint32
_t
tscEmbedded
;
extern
int8
_t
tscEmbedded
;
#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
...
...
src/vnode/src/vnodeMgmt.c
浏览文件 @
d1d02a2f
...
...
@@ -91,7 +91,7 @@ static void vnodeIncRef(void *ptNode) {
void
*
vnodeAcquire
(
int32_t
vgId
)
{
SVnodeObj
**
ppVnode
=
NULL
;
if
(
tsVnodesHash
!=
NULL
)
{
ppVnode
=
taosHashGetC
B
(
tsVnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
NULL
,
sizeof
(
void
*
));
ppVnode
=
taosHashGetC
lone
(
tsVnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
NULL
,
sizeof
(
void
*
));
}
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录