Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2010f837
T
TDengine
项目概览
taosdata
/
TDengine
12 个月 前同步成功
通知
1180
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
2010f837
编写于
12月 30, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj
上级
b91eb6b8
0e9987a0
变更
30
显示空白变更内容
内联
并排
Showing
30 changed file
with
605 addition
and
684 deletion
+605
-684
.devcontainer/devcontainer.json
.devcontainer/devcontainer.json
+2
-1
include/common/tglobal.h
include/common/tglobal.h
+31
-40
include/common/tmsg.h
include/common/tmsg.h
+49
-95
include/dnode/mgmt/dnode.h
include/dnode/mgmt/dnode.h
+2
-2
include/os/osEnv.h
include/os/osEnv.h
+0
-1
include/util/encode.h
include/util/encode.h
+4
-4
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+3
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+9
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+70
-100
source/common/src/tmsg.c
source/common/src/tmsg.c
+227
-1
source/common/test/CMakeLists.txt
source/common/test/CMakeLists.txt
+8
-8
source/dnode/mgmt/daemon/src/daemon.c
source/dnode/mgmt/daemon/src/daemon.c
+1
-1
source/dnode/mgmt/impl/src/dndDnode.c
source/dnode/mgmt/impl/src/dndDnode.c
+16
-14
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+1
-1
source/dnode/mgmt/impl/test/dnode/dnode.cpp
source/dnode/mgmt/impl/test/dnode/dnode.cpp
+1
-1
source/dnode/mgmt/impl/test/sut/src/base.cpp
source/dnode/mgmt/impl/test/sut/src/base.cpp
+0
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+2
-10
source/dnode/mnode/impl/inc/mndDnode.h
source/dnode/mnode/impl/inc/mndDnode.h
+1
-1
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+66
-66
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+7
-4
source/dnode/vnode/meta/src/metaBDBImpl.c
source/dnode/vnode/meta/src/metaBDBImpl.c
+4
-4
source/libs/parser/src/insertParser.c
source/libs/parser/src/insertParser.c
+0
-261
source/libs/parser/src/parserUtil.c
source/libs/parser/src/parserUtil.c
+11
-11
source/libs/parser/test/mockCatalogService.cpp
source/libs/parser/test/mockCatalogService.cpp
+3
-2
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+4
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+1
-1
source/util/src/tlog.c
source/util/src/tlog.c
+0
-1
tests/script/sh/deploy.sh
tests/script/sh/deploy.sh
+1
-0
tests/script/tmp/dnodes.sim
tests/script/tmp/dnodes.sim
+80
-52
未找到文件。
.devcontainer/devcontainer.json
浏览文件 @
2010f837
...
...
@@ -18,7 +18,8 @@
"ms-vscode.cpptools"
,
"ms-vscode.cmake-tools"
,
"austin.code-gnu-global"
,
"visualstudioexptteam.vscodeintel"
"visualstudioexptteam.vscodeintel"
,
"eamodio.gitlens"
],
//
Use
'forwardPorts'
to
make
a
list
of
ports
inside
the
container
available
locally.
...
...
include/common/tglobal.h
浏览文件 @
2010f837
...
...
@@ -30,6 +30,7 @@ extern char tsLocalEp[];
extern
uint16_t
tsServerPort
;
extern
int32_t
tsStatusInterval
;
extern
int8_t
tsEnableTelemetryReporting
;
extern
int32_t
tsNumOfSupportVnodes
;
// common
extern
int
tsRpcTimer
;
...
...
@@ -48,14 +49,18 @@ extern int32_t tsCompressMsgSize;
extern
int32_t
tsCompressColData
;
extern
int32_t
tsMaxNumOfDistinctResults
;
extern
char
tsTempDir
[];
extern
int64_t
tsMaxVnodeQueuedBytes
;
extern
int
tsCompatibleModel
;
// 2.0 compatible model
extern
int8_t
tsEnableSlaveQuery
;
extern
int8_t
tsEnableAdjustMaster
;
extern
int8_t
tsPrintAuth
;
extern
int64_t
tsTickPerDay
[
3
];
//query buffer management
//
query buffer management
extern
int32_t
tsQueryBufferSize
;
// maximum allowed usage buffer size in MB for each data node during query processing
extern
int64_t
tsQueryBufferSizeBytes
;
// maximum allowed usage buffer size in byte for each data node during query processing
extern
int32_t
tsRetrieveBlockingModel
;
// retrieve threads will be blocked
extern
int64_t
tsQueryBufferSizeBytes
;
// maximum allowed usage buffer size in byte for each data node
extern
int32_t
tsRetrieveBlockingModel
;
// retrieve threads will be blocked
extern
int8_t
tsKeepOriginalColumnName
;
extern
int8_t
tsDeadLockKillQuery
;
// client
extern
int32_t
tsMaxSQLStringLen
;
...
...
@@ -72,16 +77,6 @@ extern float tsStreamComputDelayRatio; // the delayed computing ration of the
extern
int32_t
tsProjectExecInterval
;
extern
int64_t
tsMaxRetentWindow
;
// balance
extern
int8_t
tsEnableSlaveQuery
;
// interna
extern
int8_t
tsPrintAuth
;
extern
char
tsVnodeDir
[];
extern
char
tsMnodeDir
[];
extern
int64_t
tsTickPerDay
[
3
];
// system info
extern
float
tsTotalLogDirGB
;
extern
float
tsTotalTmpDirGB
;
...
...
@@ -102,17 +97,13 @@ extern char gitinfo[];
extern
char
gitinfoOfInternal
[];
extern
char
buildinfo
[];
#ifdef TD_TSZ
// lossy
extern
char
lossyColumns
[];
extern
double
fPrecision
;
extern
double
dPrecision
;
extern
uint32_t
maxRange
;
extern
uint32_t
curRange
;
extern
char
Compressor
[];
#endif
// long query
extern
int8_t
tsDeadLockKillQuery
;
extern
char
tsLossyColumns
[];
extern
double
tsFPrecision
;
extern
double
tsDPrecision
;
extern
uint32_t
tsMaxRange
;
extern
uint32_t
tsCurRange
;
extern
char
tsCompressor
[];
typedef
struct
{
char
dir
[
TSDB_FILENAME_LEN
];
...
...
include/common/tmsg.h
浏览文件 @
2010f837
...
...
@@ -25,7 +25,9 @@ extern "C" {
#include "taoserror.h"
#include "tcoding.h"
#include "tdataformat.h"
#include "tlist.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
#define TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
...
...
@@ -54,6 +56,47 @@ extern int tMsgDict[];
typedef
uint16_t
tmsg_t
;
/* ------------------------ ENCODE/DECODE FUNCTIONS AND MACROS ------------------------ */
struct
SMEListNode
{
TD_SLIST_NODE
(
SMEListNode
);
SEncoder
coder
;
};
typedef
struct
SMsgEncoder
{
SEncoder
coder
;
TD_SLIST
(
SMEListNode
)
eStack
;
// encode stack
}
SMsgEncoder
;
struct
SMDFreeListNode
{
TD_SLIST_NODE
(
SMDFreeListNode
);
char
payload
[];
};
struct
SMDListNode
{
TD_SLIST_NODE
(
SMDListNode
);
SDecoder
coder
;
};
typedef
struct
SMsgDecoder
{
SDecoder
coder
;
TD_SLIST
(
SMDListNode
)
dStack
;
TD_SLIST
(
SMDFreeListNode
)
freeList
;
}
SMsgDecoder
;
#define TMSG_MALLOC(SIZE, DECODER) \
({ \
void* ptr = malloc((SIZE) + sizeof(struct SMDFreeListNode)); \
if (ptr) { \
TD_SLIST_PUSH(&((DECODER)->freeList), (struct SMDFreeListNode*)ptr); \
ptr = POINTER_SHIFT(ptr, sizeof(struct SMDFreeListNode*)); \
} \
ptr; \
})
void
tmsgInitMsgDecoder
(
SMsgDecoder
*
pMD
,
td_endian_t
endian
,
uint8_t
*
data
,
int64_t
size
);
void
tmsgClearMsgDecoder
(
SMsgDecoder
*
pMD
);
/* ------------------------ OTHER DEFINITIONS ------------------------ */
// IE type
#define TSDB_IE_TYPE_SEC 1
#define TSDB_IE_TYPE_META 2
...
...
@@ -651,8 +694,8 @@ typedef struct {
int64_t
clusterId
;
int64_t
rebootTime
;
int64_t
updateTime
;
int
16
_t
numOfCores
;
int
16
_t
numOfSupportVnodes
;
int
32
_t
numOfCores
;
int
32
_t
numOfSupportVnodes
;
char
dnodeEp
[
TSDB_EP_LEN
];
SClusterCfg
clusterCfg
;
SVnodeLoads
vnodeLoads
;
...
...
@@ -1228,100 +1271,11 @@ typedef struct SVCreateTbReq {
};
}
SVCreateTbReq
;
static
FORCE_INLINE
int
tSerializeSVCreateTbReq
(
void
**
buf
,
const
SVCreateTbReq
*
pReq
)
{
int
tlen
=
0
;
int
tmsgSVCreateTbReqEncode
(
SMsgEncoder
*
pCoder
,
SVCreateTbReq
*
pReq
);
int
tmsgSVCreateTbReqDecode
(
SMsgDecoder
*
pCoder
,
SVCreateTbReq
*
pReq
);
int
tSerializeSVCreateTbReq
(
void
**
buf
,
const
SVCreateTbReq
*
pReq
);
void
*
tDeserializeSVCreateTbReq
(
void
*
buf
,
SVCreateTbReq
*
pReq
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pReq
->
ver
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
name
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
ttl
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
keep
);
tlen
+=
taosEncodeFixedU8
(
buf
,
pReq
->
type
);
switch
(
pReq
->
type
)
{
case
TD_SUPER_TABLE
:
tlen
+=
taosEncodeFixedU64
(
buf
,
pReq
->
stbCfg
.
suid
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
stbCfg
.
nCols
);
for
(
uint32_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nCols
;
i
++
)
{
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
colId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
bytes
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
name
);
}
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
stbCfg
.
nTagCols
);
for
(
uint32_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nTagCols
;
i
++
)
{
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
colId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
bytes
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
name
);
}
break
;
case
TD_CHILD_TABLE
:
tlen
+=
taosEncodeFixedU64
(
buf
,
pReq
->
ctbCfg
.
suid
);
tlen
+=
tdEncodeKVRow
(
buf
,
pReq
->
ctbCfg
.
pTag
);
break
;
case
TD_NORMAL_TABLE
:
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
ntbCfg
.
nCols
);
for
(
uint32_t
i
=
0
;
i
<
pReq
->
ntbCfg
.
nCols
;
i
++
)
{
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
colId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
bytes
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
name
);
}
break
;
default:
ASSERT
(
0
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDeserializeSVCreateTbReq
(
void
*
buf
,
SVCreateTbReq
*
pReq
)
{
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pReq
->
ver
));
buf
=
taosDecodeString
(
buf
,
&
(
pReq
->
name
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pReq
->
ttl
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pReq
->
keep
));
buf
=
taosDecodeFixedU8
(
buf
,
&
(
pReq
->
type
));
switch
(
pReq
->
type
)
{
case
TD_SUPER_TABLE
:
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pReq
->
stbCfg
.
suid
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pReq
->
stbCfg
.
nCols
));
pReq
->
stbCfg
.
pSchema
=
(
SSchema
*
)
malloc
(
pReq
->
stbCfg
.
nCols
*
sizeof
(
SSchema
));
for
(
uint32_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nCols
;
i
++
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
(
pReq
->
stbCfg
.
pSchema
[
i
].
type
));
buf
=
taosDecodeFixedI32
(
buf
,
&
(
pReq
->
stbCfg
.
pSchema
[
i
].
colId
));
buf
=
taosDecodeFixedI32
(
buf
,
&
(
pReq
->
stbCfg
.
pSchema
[
i
].
bytes
));
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
name
);
}
buf
=
taosDecodeFixedU32
(
buf
,
&
pReq
->
stbCfg
.
nTagCols
);
pReq
->
stbCfg
.
pTagSchema
=
(
SSchema
*
)
malloc
(
pReq
->
stbCfg
.
nTagCols
*
sizeof
(
SSchema
));
for
(
uint32_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nTagCols
;
i
++
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
(
pReq
->
stbCfg
.
pTagSchema
[
i
].
type
));
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
stbCfg
.
pTagSchema
[
i
].
colId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
stbCfg
.
pTagSchema
[
i
].
bytes
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
name
);
}
break
;
case
TD_CHILD_TABLE
:
buf
=
taosDecodeFixedU64
(
buf
,
&
pReq
->
ctbCfg
.
suid
);
buf
=
tdDecodeKVRow
(
buf
,
&
pReq
->
ctbCfg
.
pTag
);
break
;
case
TD_NORMAL_TABLE
:
buf
=
taosDecodeFixedU32
(
buf
,
&
pReq
->
ntbCfg
.
nCols
);
pReq
->
ntbCfg
.
pSchema
=
(
SSchema
*
)
malloc
(
pReq
->
ntbCfg
.
nCols
*
sizeof
(
SSchema
));
for
(
uint32_t
i
=
0
;
i
<
pReq
->
ntbCfg
.
nCols
;
i
++
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
ntbCfg
.
pSchema
[
i
].
type
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
ntbCfg
.
pSchema
[
i
].
colId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
ntbCfg
.
pSchema
[
i
].
bytes
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
name
);
}
break
;
default:
ASSERT
(
0
);
}
return
buf
;
}
typedef
struct
SVCreateTbRsp
{
}
SVCreateTbRsp
;
...
...
include/dnode/mgmt/dnode.h
浏览文件 @
2010f837
...
...
@@ -27,8 +27,8 @@ typedef struct SDnode SDnode;
typedef
struct
{
int32_t
sver
;
int
16
_t
numOfCores
;
int
16
_t
numOfSupportVnodes
;
int
32
_t
numOfCores
;
int
32
_t
numOfSupportVnodes
;
int16_t
numOfCommitThreads
;
int8_t
enableTelem
;
int32_t
statusInterval
;
...
...
include/os/osEnv.h
浏览文件 @
2010f837
...
...
@@ -21,7 +21,6 @@ extern "C" {
#endif
extern
char
tsOsName
[];
extern
char
tsDnodeDir
[];
extern
char
tsDataDir
[];
extern
char
tsLogDir
[];
extern
char
tsScriptDir
[];
...
...
include/util/encode.h
浏览文件 @
2010f837
...
...
@@ -26,8 +26,8 @@ extern "C" {
typedef
struct
{
td_endian_t
endian
;
uint8_t
*
data
;
int
64
_t
size
;
int
64
_t
pos
;
int
32
_t
size
;
int
32
_t
pos
;
}
SEncoder
,
SDecoder
;
#define tPut(TYPE, BUF, VAL) ((TYPE*)(BUF))[0] = (VAL)
...
...
@@ -62,7 +62,7 @@ typedef struct {
#define TD_CHECK_CODER_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
/* ------------------------ FOR ENCODER ------------------------ */
static
FORCE_INLINE
void
tInitEncoder
(
SEncoder
*
pEncoder
,
td_endian_t
endian
,
uint8_t
*
data
,
int
64
_t
size
)
{
static
FORCE_INLINE
void
tInitEncoder
(
SEncoder
*
pEncoder
,
td_endian_t
endian
,
uint8_t
*
data
,
int
32
_t
size
)
{
pEncoder
->
endian
=
endian
;
pEncoder
->
data
=
data
;
pEncoder
->
size
=
(
data
)
?
size
:
0
;
...
...
@@ -266,7 +266,7 @@ static FORCE_INLINE int tEncodeCStr(SEncoder* pEncoder, const char* val) {
}
/* ------------------------ FOR DECODER ------------------------ */
static
FORCE_INLINE
void
tInitDecoder
(
SDecoder
*
pDecoder
,
td_endian_t
endian
,
uint8_t
*
data
,
int
64
_t
size
)
{
static
FORCE_INLINE
void
tInitDecoder
(
SDecoder
*
pDecoder
,
td_endian_t
endian
,
uint8_t
*
data
,
int
32
_t
size
)
{
ASSERT
(
!
TD_IS_NULL
(
data
));
pDecoder
->
endian
=
endian
;
pDecoder
->
data
=
data
;
...
...
source/client/inc/clientInt.h
浏览文件 @
2010f837
...
...
@@ -128,6 +128,7 @@ typedef struct SRequestObj {
char
*
msgBuf
;
void
*
pInfo
;
// sql parse info, generated by parser module
int32_t
code
;
uint64_t
affectedRows
;
SQueryExecMetric
metric
;
SRequestSendRecvBody
body
;
}
SRequestObj
;
...
...
source/client/src/clientEnv.c
浏览文件 @
2010f837
...
...
@@ -18,6 +18,7 @@
#include "clientInt.h"
#include "clientLog.h"
#include "query.h"
#include "scheduler.h"
#include "tmsg.h"
#include "tcache.h"
#include "tconfig.h"
...
...
@@ -230,6 +231,8 @@ void taos_init_imp(void) {
SCatalogCfg
cfg
=
{.
maxDBCacheNum
=
100
,
.
maxTblCacheNum
=
100
};
catalogInit
(
&
cfg
);
SSchedulerCfg
scfg
=
{.
maxJobNum
=
100
};
schedulerInit
(
&
scfg
);
tscDebug
(
"starting to initialize TAOS driver, local ep: %s"
,
tsLocalEp
);
taosSetCoreDump
(
true
);
...
...
source/client/src/clientImpl.c
浏览文件 @
2010f837
...
...
@@ -196,7 +196,15 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQueryNode
*
pQuery
,
SQueryDag
**
pDag
)
{
pRequest
->
type
=
pQuery
->
type
;
return
qCreateQueryDag
(
pQuery
,
pDag
);
}
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
if
(
TSDB_SQL_INSERT
==
pRequest
->
type
)
{
return
scheduleExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
/*todo appInfo.xxx*/
,
pDag
,
pJob
,
&
pRequest
->
affectedRows
);
}
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
/*todo appInfo.xxx*/
,
pDag
,
pJob
);
}
...
...
@@ -283,7 +291,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
if
(
qIsDdlQuery
(
pQuery
))
{
CHECK_CODE_GOTO
(
execDdlQuery
(
pRequest
,
pQuery
),
_return
);
}
else
{
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQuery
,
&
pDag
),
_return
);
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQuery
,
&
pDag
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pDag
,
&
pJob
),
_return
);
}
...
...
source/common/src/tglobal.c
浏览文件 @
2010f837
...
...
@@ -15,17 +15,18 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "ulog.h"
#include "tlog.h"
#include "tcompare.h"
#include "tconfig.h"
#include "tep.h"
#include "tglobal.h"
#include "tcompare.h"
#include "tutil.h"
#include "ttimezone.h"
#include "tlocale.h"
#include "tep.h"
#include "tlog.h"
#include "ttimezone.h"
#include "tutil.h"
#include "ulog.h"
// cluster
char
tsFirst
[
TSDB_EP_LEN
]
=
{
0
};
...
...
@@ -36,11 +37,12 @@ uint16_t tsServerPort = 6030;
int32_t
tsStatusInterval
=
1
;
// second
int8_t
tsEnableTelemetryReporting
=
0
;
char
tsEmail
[
TSDB_FQDN_LEN
]
=
{
0
};
int32_t
tsNumOfSupportVnodes
=
16
;
// common
int32_t
tsRpcTimer
=
300
;
int32_t
tsRpcMaxTime
=
600
;
// seconds;
int32_t
tsRpcForceTcp
=
1
;
//
disable this, means query, show command use udp protocol as default
int32_t
tsRpcForceTcp
=
1
;
//
disable this, means query, show command use udp protocol as default
int32_t
tsMaxShellConns
=
50000
;
int32_t
tsMaxConnections
=
5000
;
int32_t
tsShellActivityTimer
=
3
;
// second
...
...
@@ -50,8 +52,9 @@ float tsRatioOfQueryCores = 1.0f;
int8_t
tsDaylight
=
0
;
int8_t
tsEnableCoreFile
=
0
;
int32_t
tsMaxBinaryDisplayWidth
=
30
;
int64_t
tsMaxVnodeQueuedBytes
=
1024
*
1024
*
1024
;
//1GB
int8_t
tsEnableSlaveQuery
=
1
;
int8_t
tsEnableAdjustMaster
=
1
;
int8_t
tsPrintAuth
=
0
;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
...
...
@@ -101,7 +104,7 @@ int32_t tsMaxStreamComputDelay = 20000;
int32_t
tsStreamCompStartDelay
=
10000
;
// the stream computing delay time after executing failed, change accordingly
int32_t
tsRetryStreamCompDelay
=
10
*
1000
;
int32_t
tsRetryStreamCompDelay
=
10
*
1000
;
// The delayed computing ration. 10% of the whole computing time window by default.
float
tsStreamComputDelayRatio
=
0
.
1
f
;
...
...
@@ -122,27 +125,13 @@ int32_t tsRetrieveBlockingModel = 0;
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
int8_t
tsKeepOriginalColumnName
=
0
;
// long query death-lock
int8_t
tsDeadLockKillQuery
=
0
;
// tsdb config
// For backward compatibility
bool
tsdbForceKeepFile
=
false
;
// balance
int8_t
tsEnableFlowCtrl
=
1
;
int8_t
tsEnableSlaveQuery
=
1
;
int8_t
tsEnableAdjustMaster
=
1
;
// monitor
char
tsMonitorDbName
[
TSDB_DB_NAME_LEN
]
=
"log"
;
char
tsInternalPass
[]
=
"secretkey"
;
// internal
int8_t
tsCompactMnodeWal
=
0
;
int8_t
tsPrintAuth
=
0
;
char
tsVnodeDir
[
PATH_MAX
]
=
{
0
};
char
tsDnodeDir
[
PATH_MAX
]
=
{
0
};
char
tsMnodeDir
[
PATH_MAX
]
=
{
0
};
int32_t
tsDiskCfgNum
=
0
;
#ifndef _STORAGE
...
...
@@ -170,21 +159,18 @@ float tsMinimalDataDirGB = 2.0f;
int32_t
tsTotalMemoryMB
=
0
;
uint32_t
tsVersion
=
0
;
#ifdef TD_TSZ
//
// lossy compress 6
//
char
lossyColumns
[
32
]
=
""
;
// "float|double" means all float and double columns can be lossy compressed. set empty can close lossy compress.
char
tsLossyColumns
[
32
]
=
""
;
// "float|double" means all float and double columns can be lossy compressed. set empty
// can close lossy compress.
// below option can take effect when tsLossyColumns not empty
double
fPrecision
=
1E-8
;
// float column precision
double
dPrecision
=
1E-16
;
// double column precision
uint32_t
maxRange
=
500
;
// max range
uint32_t
curRange
=
100
;
// range
char
Compressor
[
32
]
=
"ZSTD_COMPRESSOR"
;
// ZSTD_COMPRESSOR or GZIP_COMPRESSOR
#endif
double
tsFPrecision
=
1E-8
;
// float column precision
double
tsDPrecision
=
1E-16
;
// double column precision
uint32_t
tsMaxRange
=
500
;
// max range
uint32_t
tsCurRange
=
100
;
// range
char
tsCompressor
[
32
]
=
"ZSTD_COMPRESSOR"
;
// ZSTD_COMPRESSOR or GZIP_COMPRESSOR
// long query death-lock
int8_t
tsDeadLockKillQuery
=
0
;
int32_t
(
*
monStartSystemFp
)()
=
NULL
;
void
(
*
monStopSystemFp
)()
=
NULL
;
...
...
@@ -200,7 +186,6 @@ void taosSetAllDebugFlag() {
dDebugFlag
=
debugFlag
;
vDebugFlag
=
debugFlag
;
jniDebugFlag
=
debugFlag
;
odbcDebugFlag
=
debugFlag
;
qDebugFlag
=
debugFlag
;
rpcDebugFlag
=
debugFlag
;
uDebugFlag
=
debugFlag
;
...
...
@@ -218,7 +203,7 @@ int32_t taosCfgDynamicOptions(char *msg) {
int32_t
vint
=
0
;
paGetToken
(
msg
,
&
option
,
&
olen
);
if
(
olen
==
0
)
return
-
1
;
;
if
(
olen
==
0
)
return
-
1
;
paGetToken
(
option
+
olen
+
1
,
&
value
,
&
vlen
);
if
(
vlen
==
0
)
...
...
@@ -231,7 +216,7 @@ int32_t taosCfgDynamicOptions(char *msg) {
for
(
int32_t
i
=
0
;
i
<
tsGlobalConfigNum
;
++
i
)
{
SGlobalCfg
*
cfg
=
tsGlobalConfig
+
i
;
//if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
//
if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
if
(
cfg
->
valType
!=
TAOS_CFG_VTYPE_INT32
&&
cfg
->
valType
!=
TAOS_CFG_VTYPE_INT8
)
continue
;
int32_t
cfgLen
=
(
int32_t
)
strlen
(
cfg
->
option
);
...
...
@@ -366,6 +351,16 @@ static void doInitGlobalConfig(void) {
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosAddConfigOption
(
cfg
);
cfg
.
option
=
"supportVnodes"
;
cfg
.
ptr
=
&
tsNumOfSupportVnodes
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
65536
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosAddConfigOption
(
cfg
);
// directory
cfg
.
option
=
"configDir"
;
cfg
.
ptr
=
configDir
;
...
...
@@ -442,8 +437,8 @@ static void doInitGlobalConfig(void) {
cfg
.
ptr
=
&
tsMaxNumOfDistinctResults
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
minValue
=
10
*
10000
;
cfg
.
maxValue
=
10000
*
10000
;
cfg
.
minValue
=
10
*
10000
;
cfg
.
maxValue
=
10000
*
10000
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosAddConfigOption
(
cfg
);
...
...
@@ -749,17 +744,6 @@ static void doInitGlobalConfig(void) {
cfg
.
maxValue
=
10000000
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_GB
;
taosAddConfigOption
(
cfg
);
// module configs
cfg
.
option
=
"flowctrl"
;
cfg
.
ptr
=
&
tsEnableFlowCtrl
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT8
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosAddConfigOption
(
cfg
);
cfg
.
option
=
"slaveQuery"
;
...
...
@@ -893,16 +877,6 @@ static void doInitGlobalConfig(void) {
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosAddConfigOption
(
cfg
);
cfg
.
option
=
"odbcDebugFlag"
;
cfg
.
ptr
=
&
odbcDebugFlag
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_LOG
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
255
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosAddConfigOption
(
cfg
);
cfg
.
option
=
"uDebugFlag"
;
cfg
.
ptr
=
&
uDebugFlag
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
...
...
@@ -1066,7 +1040,6 @@ static void doInitGlobalConfig(void) {
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosAddConfigOption
(
cfg
);
cfg
.
option
=
"dPrecision"
;
...
...
@@ -1100,14 +1073,11 @@ static void doInitGlobalConfig(void) {
taosAddConfigOption
(
cfg
);
assert
(
tsGlobalConfigNum
==
TSDB_CFG_MAX_NUM
);
#else
//assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
//
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
#endif
}
void
taosInitGlobalCfg
()
{
pthread_once
(
&
tsInitGlobalCfgOnce
,
doInitGlobalConfig
);
}
void
taosInitGlobalCfg
()
{
pthread_once
(
&
tsInitGlobalCfgOnce
,
doInitGlobalConfig
);
}
int32_t
taosCheckAndPrintCfg
()
{
char
fqdn
[
TSDB_FQDN_LEN
];
...
...
source/common/src/tmsg.c
浏览文件 @
2010f837
...
...
@@ -26,3 +26,229 @@
#define TD_MSG_DICT_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
static
int
tmsgStartEncode
(
SMsgEncoder
*
pME
);
static
void
tmsgEndEncode
(
SMsgEncoder
*
pME
);
static
int
tmsgStartDecode
(
SMsgDecoder
*
pMD
);
static
void
tmsgEndDecode
(
SMsgDecoder
*
pMD
);
/* ------------------------ ENCODE/DECODE FUNCTIONS ------------------------ */
void
tmsgInitMsgEncoder
(
SMsgEncoder
*
pME
,
td_endian_t
endian
,
uint8_t
*
data
,
int64_t
size
)
{
tInitEncoder
(
&
(
pME
->
coder
),
endian
,
data
,
size
);
TD_SLIST_INIT
(
&
(
pME
->
eStack
));
}
void
tmsgClearMsgEncoder
(
SMsgEncoder
*
pME
)
{
struct
SMEListNode
*
pNode
;
for
(;;)
{
pNode
=
TD_SLIST_HEAD
(
&
(
pME
->
eStack
));
if
(
TD_IS_NULL
(
pNode
))
break
;
TD_SLIST_POP
(
&
(
pME
->
eStack
));
free
(
pNode
);
}
}
void
tmsgInitMsgDecoder
(
SMsgDecoder
*
pMD
,
td_endian_t
endian
,
uint8_t
*
data
,
int64_t
size
)
{
tInitDecoder
(
&
pMD
->
coder
,
endian
,
data
,
size
);
TD_SLIST_INIT
(
&
(
pMD
->
dStack
));
TD_SLIST_INIT
(
&
(
pMD
->
freeList
));
}
void
tmsgClearMsgDecoder
(
SMsgDecoder
*
pMD
)
{
{
struct
SMDFreeListNode
*
pNode
;
for
(;;)
{
pNode
=
TD_SLIST_HEAD
(
&
(
pMD
->
freeList
));
if
(
TD_IS_NULL
(
pNode
))
break
;
TD_SLIST_POP
(
&
(
pMD
->
freeList
));
free
(
pNode
);
}
}
{
struct
SMDListNode
*
pNode
;
for
(;;)
{
pNode
=
TD_SLIST_HEAD
(
&
(
pMD
->
dStack
));
if
(
TD_IS_NULL
(
pNode
))
break
;
TD_SLIST_POP
(
&
(
pMD
->
dStack
));
free
(
pNode
);
}
}
}
/* ------------------------ MESSAGE ENCODE/DECODE ------------------------ */
int
tmsgSVCreateTbReqEncode
(
SMsgEncoder
*
pCoder
,
SVCreateTbReq
*
pReq
)
{
tmsgStartEncode
(
pCoder
);
// TODO
tmsgEndEncode
(
pCoder
);
return
0
;
}
int
tmsgSVCreateTbReqDecode
(
SMsgDecoder
*
pCoder
,
SVCreateTbReq
*
pReq
)
{
tmsgStartDecode
(
pCoder
);
// TODO: decode
// Decode is not end
if
(
pCoder
->
coder
.
pos
!=
pCoder
->
coder
.
size
)
{
// Continue decode
}
tmsgEndDecode
(
pCoder
);
return
0
;
}
int
tSerializeSVCreateTbReq
(
void
**
buf
,
const
SVCreateTbReq
*
pReq
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedU64
(
buf
,
pReq
->
ver
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
name
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
ttl
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
keep
);
tlen
+=
taosEncodeFixedU8
(
buf
,
pReq
->
type
);
switch
(
pReq
->
type
)
{
case
TD_SUPER_TABLE
:
tlen
+=
taosEncodeFixedU64
(
buf
,
pReq
->
stbCfg
.
suid
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
stbCfg
.
nCols
);
for
(
uint32_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nCols
;
i
++
)
{
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
colId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
bytes
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
name
);
}
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
stbCfg
.
nTagCols
);
for
(
uint32_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nTagCols
;
i
++
)
{
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
colId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
bytes
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
name
);
}
break
;
case
TD_CHILD_TABLE
:
tlen
+=
taosEncodeFixedU64
(
buf
,
pReq
->
ctbCfg
.
suid
);
tlen
+=
tdEncodeKVRow
(
buf
,
pReq
->
ctbCfg
.
pTag
);
break
;
case
TD_NORMAL_TABLE
:
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
ntbCfg
.
nCols
);
for
(
uint32_t
i
=
0
;
i
<
pReq
->
ntbCfg
.
nCols
;
i
++
)
{
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
colId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
bytes
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
name
);
}
break
;
default:
ASSERT
(
0
);
}
return
tlen
;
}
void
*
tDeserializeSVCreateTbReq
(
void
*
buf
,
SVCreateTbReq
*
pReq
)
{
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pReq
->
ver
));
buf
=
taosDecodeString
(
buf
,
&
(
pReq
->
name
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pReq
->
ttl
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pReq
->
keep
));
buf
=
taosDecodeFixedU8
(
buf
,
&
(
pReq
->
type
));
switch
(
pReq
->
type
)
{
case
TD_SUPER_TABLE
:
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pReq
->
stbCfg
.
suid
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pReq
->
stbCfg
.
nCols
));
pReq
->
stbCfg
.
pSchema
=
(
SSchema
*
)
malloc
(
pReq
->
stbCfg
.
nCols
*
sizeof
(
SSchema
));
for
(
uint32_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nCols
;
i
++
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
(
pReq
->
stbCfg
.
pSchema
[
i
].
type
));
buf
=
taosDecodeFixedI32
(
buf
,
&
(
pReq
->
stbCfg
.
pSchema
[
i
].
colId
));
buf
=
taosDecodeFixedI32
(
buf
,
&
(
pReq
->
stbCfg
.
pSchema
[
i
].
bytes
));
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
stbCfg
.
pSchema
[
i
].
name
);
}
buf
=
taosDecodeFixedU32
(
buf
,
&
pReq
->
stbCfg
.
nTagCols
);
pReq
->
stbCfg
.
pTagSchema
=
(
SSchema
*
)
malloc
(
pReq
->
stbCfg
.
nTagCols
*
sizeof
(
SSchema
));
for
(
uint32_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nTagCols
;
i
++
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
(
pReq
->
stbCfg
.
pTagSchema
[
i
].
type
));
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
stbCfg
.
pTagSchema
[
i
].
colId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
stbCfg
.
pTagSchema
[
i
].
bytes
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
stbCfg
.
pTagSchema
[
i
].
name
);
}
break
;
case
TD_CHILD_TABLE
:
buf
=
taosDecodeFixedU64
(
buf
,
&
pReq
->
ctbCfg
.
suid
);
buf
=
tdDecodeKVRow
(
buf
,
&
pReq
->
ctbCfg
.
pTag
);
break
;
case
TD_NORMAL_TABLE
:
buf
=
taosDecodeFixedU32
(
buf
,
&
pReq
->
ntbCfg
.
nCols
);
pReq
->
ntbCfg
.
pSchema
=
(
SSchema
*
)
malloc
(
pReq
->
ntbCfg
.
nCols
*
sizeof
(
SSchema
));
for
(
uint32_t
i
=
0
;
i
<
pReq
->
ntbCfg
.
nCols
;
i
++
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
ntbCfg
.
pSchema
[
i
].
type
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
ntbCfg
.
pSchema
[
i
].
colId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
ntbCfg
.
pSchema
[
i
].
bytes
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
name
);
}
break
;
default:
ASSERT
(
0
);
}
return
buf
;
}
/* ------------------------ STATIC METHODS ------------------------ */
static
int
tmsgStartEncode
(
SMsgEncoder
*
pME
)
{
struct
SMEListNode
*
pNode
=
(
struct
SMEListNode
*
)
malloc
(
sizeof
(
*
pNode
));
if
(
TD_IS_NULL
(
pNode
))
return
-
1
;
pNode
->
coder
=
pME
->
coder
;
TD_SLIST_PUSH
(
&
(
pME
->
eStack
),
pNode
);
TD_CODER_MOVE_POS
(
&
(
pME
->
coder
),
sizeof
(
int32_t
));
return
0
;
}
static
void
tmsgEndEncode
(
SMsgEncoder
*
pME
)
{
int32_t
size
;
struct
SMEListNode
*
pNode
;
pNode
=
TD_SLIST_HEAD
(
&
(
pME
->
eStack
));
ASSERT
(
pNode
);
TD_SLIST_POP
(
&
(
pME
->
eStack
));
size
=
pME
->
coder
.
pos
-
pNode
->
coder
.
pos
;
tEncodeI32
(
&
(
pNode
->
coder
),
size
);
free
(
pNode
);
}
static
int
tmsgStartDecode
(
SMsgDecoder
*
pMD
)
{
struct
SMDListNode
*
pNode
;
int32_t
size
;
pNode
=
(
struct
SMDListNode
*
)
malloc
(
sizeof
(
*
pNode
));
if
(
pNode
==
NULL
)
return
-
1
;
tDecodeI32
(
&
(
pMD
->
coder
),
&
size
);
pNode
->
coder
=
pMD
->
coder
;
TD_SLIST_PUSH
(
&
(
pMD
->
dStack
),
pNode
);
pMD
->
coder
.
pos
=
0
;
pMD
->
coder
.
size
=
size
-
sizeof
(
int32_t
);
pMD
->
coder
.
data
=
TD_CODER_CURRENT
(
&
(
pNode
->
coder
));
return
0
;
}
static
void
tmsgEndDecode
(
SMsgDecoder
*
pMD
)
{
ASSERT
(
pMD
->
coder
.
pos
==
pMD
->
coder
.
size
);
struct
SMDListNode
*
pNode
;
pNode
=
TD_SLIST_HEAD
(
&
(
pMD
->
dStack
));
ASSERT
(
pNode
);
TD_SLIST_POP
(
&
(
pMD
->
dStack
));
pNode
->
coder
.
pos
+=
pMD
->
coder
.
size
;
pMD
->
coder
=
pNode
->
coder
;
free
(
pNode
);
}
\ No newline at end of file
source/common/test/CMakeLists.txt
浏览文件 @
2010f837
...
...
@@ -18,11 +18,11 @@ TARGET_INCLUDE_DIRECTORIES(
)
# tmsg test
add_executable
(
tmsgTest
""
)
target_sources
(
tmsgTest
PRIVATE
"tmsgTest.cpp"
"../src/tmsg.c"
)
target_include_directories
(
tmsgTest PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/common/"
)
target_link_libraries
(
tmsgTest PUBLIC os util gtest gtest_main
)
\ No newline at end of file
# add_executable(tmsgTest "")
# target_sources(tmsgTest
# PRIVATE
# "tmsgTest.cpp"
# "../src/tmsg.c"
# )
# target_include_directories(tmsgTest PUBLIC "${CMAKE_SOURCE_DIR}/include/common/")
# target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main)
\ No newline at end of file
source/dnode/mgmt/daemon/src/daemon.c
浏览文件 @
2010f837
...
...
@@ -139,7 +139,7 @@ void dmnWaitSignal() {
void
dmnInitOption
(
SDnodeOpt
*
pOption
)
{
pOption
->
sver
=
30000000
;
//3.0.0.0
pOption
->
numOfCores
=
tsNumOfCores
;
pOption
->
numOfSupportVnodes
=
16
;
pOption
->
numOfSupportVnodes
=
tsNumOfSupportVnodes
;
pOption
->
numOfCommitThreads
=
1
;
pOption
->
statusInterval
=
tsStatusInterval
;
pOption
->
numOfThreadsPerCore
=
tsNumOfThreadsPerCore
;
...
...
source/dnode/mgmt/impl/src/dndDnode.c
浏览文件 @
2010f837
...
...
@@ -370,8 +370,8 @@ void dndSendStatusMsg(SDnode *pDnode) {
pStatus
->
clusterId
=
htobe64
(
pMgmt
->
clusterId
);
pStatus
->
rebootTime
=
htobe64
(
pMgmt
->
rebootTime
);
pStatus
->
updateTime
=
htobe64
(
pMgmt
->
updateTime
);
pStatus
->
numOfCores
=
hton
s
(
pDnode
->
opt
.
numOfCores
);
pStatus
->
numOfSupportVnodes
=
hton
s
(
pDnode
->
opt
.
numOfSupportVnodes
);
pStatus
->
numOfCores
=
hton
l
(
pDnode
->
opt
.
numOfCores
);
pStatus
->
numOfSupportVnodes
=
hton
l
(
pDnode
->
opt
.
numOfSupportVnodes
);
tstrncpy
(
pStatus
->
dnodeEp
,
pDnode
->
opt
.
localEp
,
TSDB_EP_LEN
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
pDnode
->
opt
.
statusInterval
);
...
...
@@ -397,7 +397,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
static
void
dndUpdateDnodeCfg
(
SDnode
*
pDnode
,
SDnodeCfg
*
pCfg
)
{
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
if
(
pMgmt
->
dnodeId
==
0
)
{
dInfo
(
"set dnodeId:%d clusterId:%
"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
dInfo
(
"set dnodeId:%d clusterId:%"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
taosWLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dnodeId
=
pCfg
->
dnodeId
;
pMgmt
->
clusterId
=
pCfg
->
clusterId
;
...
...
@@ -440,6 +440,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
}
SStatusRsp
*
pRsp
=
pMsg
->
pCont
;
if
(
pMsg
->
pCont
!=
NULL
&&
pMsg
->
contLen
!=
0
)
{
SDnodeCfg
*
pCfg
=
&
pRsp
->
dnodeCfg
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
clusterId
=
htobe64
(
pCfg
->
clusterId
);
...
...
@@ -453,6 +454,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
}
dndUpdateDnodeEps
(
pDnode
,
pDnodeEps
);
}
pMgmt
->
statusSent
=
0
;
}
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
2010f837
...
...
@@ -103,7 +103,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_HEARTBEAT
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW_RETRIEVE
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_STATUS
)]
=
dndProcessMnode
Write
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_STATUS
)]
=
dndProcessMnode
Read
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_STATUS_RSP
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_TRANS
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_TRANS_RSP
)]
=
dndProcessMnodeWriteMsg
;
...
...
source/dnode/mgmt/impl/test/dnode/dnode.cpp
浏览文件 @
2010f837
...
...
@@ -57,7 +57,7 @@ TEST_F(DndTestDnode, 01_ShowDnode) {
CHECK_SCHEMA
(
0
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"id"
);
CHECK_SCHEMA
(
1
,
TSDB_DATA_TYPE_BINARY
,
TSDB_EP_LEN
+
VARSTR_HEADER_SIZE
,
"endpoint"
);
CHECK_SCHEMA
(
2
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"vnodes"
);
CHECK_SCHEMA
(
3
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"
max
_vnodes"
);
CHECK_SCHEMA
(
3
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"
support
_vnodes"
);
CHECK_SCHEMA
(
4
,
TSDB_DATA_TYPE_BINARY
,
10
+
VARSTR_HEADER_SIZE
,
"status"
);
CHECK_SCHEMA
(
5
,
TSDB_DATA_TYPE_TIMESTAMP
,
8
,
"create_time"
);
CHECK_SCHEMA
(
6
,
TSDB_DATA_TYPE_BINARY
,
24
+
VARSTR_HEADER_SIZE
,
"offline_reason"
);
...
...
source/dnode/mgmt/impl/test/sut/src/base.cpp
浏览文件 @
2010f837
...
...
@@ -24,7 +24,6 @@ void Testbase::InitLog(const char* path) {
tmrDebugFlag
=
0
;
uDebugFlag
=
143
;
rpcDebugFlag
=
0
;
odbcDebugFlag
=
0
;
qDebugFlag
=
0
;
wDebugFlag
=
0
;
sDebugFlag
=
0
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
2010f837
...
...
@@ -74,13 +74,6 @@ typedef enum {
typedef
enum
{
TRN_POLICY_ROLLBACK
=
0
,
TRN_POLICY_RETRY
=
1
}
ETrnPolicy
;
typedef
enum
{
DND_STATUS_OFFLINE
=
0
,
DND_STATUS_READY
=
1
,
DND_STATUS_CREATING
=
2
,
DND_STATUS_DROPPING
=
3
}
EDndStatus
;
typedef
enum
{
DND_REASON_ONLINE
=
0
,
DND_REASON_STATUS_MSG_TIMEOUT
,
...
...
@@ -125,9 +118,8 @@ typedef struct {
int64_t
lastAccessTime
;
int32_t
accessTimes
;
int16_t
numOfVnodes
;
int16_t
numOfSupportVnodes
;
int16_t
numOfCores
;
EDndStatus
status
;
int32_t
numOfSupportVnodes
;
int32_t
numOfCores
;
EDndReason
offlineReason
;
uint16_t
port
;
char
fqdn
[
TSDB_FQDN_LEN
];
...
...
source/dnode/mnode/impl/inc/mndDnode.h
浏览文件 @
2010f837
...
...
@@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
void
mndReleaseDnode
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
);
SEpSet
mndGetDnodeEpset
(
SDnodeObj
*
pDnode
);
int32_t
mndGetDnodeSize
(
SMnode
*
pMnode
);
bool
mndIsDnode
InReadyStatus
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
);
bool
mndIsDnode
Online
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
int64_t
curMs
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
2010f837
...
...
@@ -40,8 +40,6 @@ static const char *offlineReason[] = {
"unknown"
,
};
static
const
char
*
dnodeStatus
[]
=
{
"offline"
,
"ready"
,
"creating"
,
"dropping"
};
static
int32_t
mndCreateDefaultDnode
(
SMnode
*
pMnode
);
static
SSdbRaw
*
mndDnodeActionEncode
(
SDnodeObj
*
pDnode
);
static
SSdbRow
*
mndDnodeActionDecode
(
SSdbRaw
*
pRaw
);
...
...
@@ -208,10 +206,12 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
return
sdbGetSize
(
pSdb
,
SDB_DNODE
);
}
bool
mndIsDnodeInReadyStatus
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
)
{
int64_t
ms
=
taosGetTimestampMs
();
int64_t
interval
=
ABS
(
pDnode
->
lastAccessTime
-
ms
);
bool
mndIsDnodeOnline
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
int64_t
curMs
)
{
int64_t
interval
=
ABS
(
pDnode
->
lastAccessTime
-
curMs
);
if
(
interval
>
3500
*
pMnode
->
cfg
.
statusInterval
)
{
if
(
pDnode
->
rebootTime
>
0
)
{
pDnode
->
offlineReason
=
DND_REASON_STATUS_MSG_TIMEOUT
;
}
return
false
;
}
return
true
;
...
...
@@ -278,8 +278,8 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus
->
clusterId
=
htobe64
(
pStatus
->
clusterId
);
pStatus
->
rebootTime
=
htobe64
(
pStatus
->
rebootTime
);
pStatus
->
updateTime
=
htobe64
(
pStatus
->
updateTime
);
pStatus
->
numOfCores
=
hton
s
(
pStatus
->
numOfCores
);
pStatus
->
numOfSupportVnodes
=
hton
s
(
pStatus
->
numOfSupportVnodes
);
pStatus
->
numOfCores
=
hton
l
(
pStatus
->
numOfCores
);
pStatus
->
numOfSupportVnodes
=
hton
l
(
pStatus
->
numOfSupportVnodes
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
pStatus
->
clusterCfg
.
statusInterval
);
pStatus
->
clusterCfg
.
checkTime
=
htobe64
(
pStatus
->
clusterCfg
.
checkTime
);
}
...
...
@@ -287,85 +287,83 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
static
int32_t
mndProcessStatusMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SStatusMsg
*
pStatus
=
pMsg
->
rpcMsg
.
pCont
;
SDnodeObj
*
pDnode
=
NULL
;
int32_t
code
=
-
1
;
mndParseStatusMsg
(
pStatus
);
SDnodeObj
*
pDnode
=
NULL
;
if
(
pStatus
->
dnodeId
==
0
)
{
pDnode
=
mndAcquireDnodeByEp
(
pMnode
,
pStatus
->
dnodeEp
);
if
(
pDnode
==
NULL
)
{
mDebug
(
"dnode:%s, not created yet"
,
pStatus
->
dnodeEp
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
goto
PROCESS_STATUS_MSG_OVER
;
}
}
else
{
pDnode
=
mndAcquireDnode
(
pMnode
,
pStatus
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
pDnode
=
mndAcquireDnodeByEp
(
pMnode
,
pStatus
->
dnodeEp
);
if
(
pDnode
!=
NULL
&&
pDnode
->
status
!=
DND_STATUS_READY
)
{
if
(
pDnode
!=
NULL
)
{
pDnode
->
offlineReason
=
DND_REASON_DNODE_ID_NOT_MATCH
;
}
mError
(
"dnode:%d, %s not exist"
,
pStatus
->
dnodeId
,
pStatus
->
dnodeEp
);
mndReleaseDnode
(
pMnode
,
pDnode
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
goto
PROCESS_STATUS_MSG_OVER
;
}
}
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
bool
needCheckCfg
=
!
(
online
&&
pDnode
->
rebootTime
==
pStatus
->
rebootTime
);
if
(
needCheckCfg
)
{
if
(
pStatus
->
sver
!=
pMnode
->
cfg
.
sver
)
{
if
(
pDnode
!=
NULL
&&
pDnode
->
status
!=
DND_STATUS_READY
)
{
if
(
pDnode
!=
NULL
)
{
pDnode
->
offlineReason
=
DND_REASON_VERSION_NOT_MATCH
;
}
mndReleaseDnode
(
pMnode
,
pDnode
);
mError
(
"dnode:%d, status msg version:%d not match cluster:%d"
,
pStatus
->
dnodeId
,
pStatus
->
sver
,
pMnode
->
cfg
.
sver
);
terrno
=
TSDB_CODE_MND_INVALID_MSG_VERSION
;
return
-
1
;
goto
PROCESS_STATUS_MSG_OVER
;
}
if
(
pStatus
->
dnodeId
==
0
)
{
mDebug
(
"dnode:%d %s, first access, set clusterId %"
PRId64
,
pDnode
->
id
,
pDnode
->
ep
,
pMnode
->
clusterId
);
}
else
{
if
(
pStatus
->
clusterId
!=
pMnode
->
clusterId
)
{
if
(
pDnode
!=
NULL
&&
pDnode
->
status
!=
DND_STATUS_READY
)
{
if
(
pDnode
!=
NULL
)
{
pDnode
->
offlineReason
=
DND_REASON_CLUSTER_ID_NOT_MATCH
;
}
mError
(
"dnode:%d, clusterId %"
PRId64
" not match exist %"
PRId64
,
pDnode
->
id
,
pStatus
->
clusterId
,
pMnode
->
clusterId
);
mndReleaseDnode
(
pMnode
,
pDnode
);
terrno
!=
TSDB_CODE_MND_INVALID_CLUSTER_ID
;
return
-
1
;
terrno
=
TSDB_CODE_MND_INVALID_CLUSTER_ID
;
goto
PROCESS_STATUS_MSG_OVER
;
}
else
{
pDnode
->
accessTimes
++
;
mTrace
(
"dnode:%d, status received, access times %d"
,
pDnode
->
id
,
pDnode
->
accessTimes
);
}
}
if
(
pDnode
->
status
==
DND_STATUS_OFFLINE
)
{
// Verify whether the cluster parameters are consistent when status change from offline to ready
int32_t
ret
=
mndCheckClusterCfgPara
(
pMnode
,
&
pStatus
->
clusterCfg
);
if
(
0
!=
ret
)
{
pDnode
->
offlineReason
=
ret
;
mError
(
"dnode:%d, cluster cfg inconsistent since:%s"
,
pDnode
->
id
,
offlineReason
[
ret
]);
mndReleaseDnode
(
pMnode
,
pDnode
);
terrno
=
TSDB_CODE_MND_INVALID_CLUSTER_CFG
;
return
-
1
;
goto
PROCESS_STATUS_MSG_OVER
;
}
mInfo
(
"dnode:%d, from offline to online"
,
pDnode
->
id
);
}
pDnode
->
rebootTime
=
pStatus
->
rebootTime
;
pDnode
->
numOfCores
=
pStatus
->
numOfCores
;
pDnode
->
numOfSupportVnodes
=
pStatus
->
numOfSupportVnodes
;
pDnode
->
lastAccessTime
=
taosGetTimestampMs
();
pDnode
->
status
=
DND_STATUS_READY
;
int32_t
numOfEps
=
mndGetDnodeSize
(
pMnode
);
int32_t
contLen
=
sizeof
(
SStatusRsp
)
+
numOfEps
*
sizeof
(
SDnodeEp
);
SStatusRsp
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
mndReleaseDnode
(
pMnode
,
pDnode
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
goto
PROCESS_STATUS_MSG_OVER
;
}
pRsp
->
dnodeCfg
.
dnodeId
=
htonl
(
pDnode
->
id
);
...
...
@@ -374,9 +372,14 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pMsg
->
contLen
=
contLen
;
pMsg
->
pCont
=
pRsp
;
mndReleaseDnode
(
pMnode
,
pDnode
);
}
return
0
;
pDnode
->
lastAccessTime
=
curMs
;
code
=
0
;
PROCESS_STATUS_MSG_OVER:
mndReleaseDnode
(
pMnode
,
pDnode
);
return
code
;
}
static
int32_t
mndCreateDnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SCreateDnodeMsg
*
pCreate
)
{
...
...
@@ -638,7 +641,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"
max
_vnodes"
);
strcpy
(
pSchema
[
cols
].
name
,
"
support
_vnodes"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
...
...
@@ -682,10 +685,12 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
int32_t
cols
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
char
*
pWrite
;
int64_t
curMs
=
taosGetTimestampMs
();
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_DNODE
,
pShow
->
pIter
,
(
void
**
)
&
pDnode
);
if
(
pShow
->
pIter
==
NULL
)
break
;
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
cols
=
0
;
...
...
@@ -706,8 +711,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
const
char
*
status
=
dnodeStatus
[
pDnode
->
status
];
STR_TO_VARSTR
(
pWrite
,
status
);
STR_TO_VARSTR
(
pWrite
,
online
?
"ready"
:
"offline"
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
@@ -715,11 +719,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
if
(
pDnode
->
status
==
DND_STATUS_READY
)
{
STR_TO_VARSTR
(
pWrite
,
""
);
}
else
{
STR_TO_VARSTR
(
pWrite
,
offlineReason
[
pDnode
->
offlineReason
]);
}
STR_TO_VARSTR
(
pWrite
,
online
?
""
:
offlineReason
[
pDnode
->
offlineReason
]);
cols
++
;
numOfRows
++
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
2010f837
...
...
@@ -260,13 +260,14 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) {
pDnode
->
numOfVnodes
++
;
}
bool
isReady
=
mndIsDnodeInReadyStatus
(
pMnode
,
pDnode
);
if
(
isReady
)
{
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
if
(
online
)
{
taosArrayPush
(
pArray
,
pDnode
);
}
mDebug
(
"dnode:%d,
numOfVnodes:%d numOfSupportVnodes:%d isMnode:%d ready
:%d"
,
pDnode
->
id
,
numOfVnodes
,
pDnode
->
numOfSupportVnodes
,
isMnode
,
isReady
);
mDebug
(
"dnode:%d,
vnodes:%d supportVnodes:%d isMnode:%d online
:%d"
,
pDnode
->
id
,
numOfVnodes
,
pDnode
->
numOfSupportVnodes
,
isMnode
,
online
);
sdbRelease
(
pSdb
,
pDnode
);
}
...
...
@@ -333,6 +334,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
uint32_t
hashMax
=
UINT32_MAX
;
uint32_t
hashInterval
=
(
hashMax
-
hashMin
)
/
pDb
->
cfg
.
numOfVgroups
;
if
(
maxVgId
<
2
)
maxVgId
=
2
;
for
(
uint32_t
v
=
0
;
v
<
pDb
->
cfg
.
numOfVgroups
;
v
++
)
{
SVgObj
*
pVgroup
=
&
pVgroups
[
v
];
pVgroup
->
vgId
=
maxVgId
++
;
...
...
source/dnode/vnode/meta/src/metaBDBImpl.c
浏览文件 @
2010f837
...
...
@@ -444,6 +444,7 @@ int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg) {
void
*
pBuf
;
int
tlen
;
STableMetaMsg
*
pMsg
;
SSchema
*
pSchema
;
key
.
data
=
tbname
;
key
.
size
=
strlen
(
tbname
)
+
1
;
...
...
@@ -487,10 +488,9 @@ int metaGetTableInfo(SMeta *pMeta, char *tbname, STableMetaMsg **ppMsg) {
pMsg
->
tversion
=
0
;
pMsg
->
suid
=
tbCfg
.
stbCfg
.
suid
;
pMsg
->
tuid
=
tbCfg
.
stbCfg
.
suid
;
for
(
size_t
i
=
0
;
i
<
tbCfg
.
stbCfg
.
nTagCols
;
i
++
)
{
}
memcpy
(
pMsg
->
pSchema
,
tbCfg
.
stbCfg
.
pSchema
,
sizeof
(
SSchema
)
*
tbCfg
.
stbCfg
.
nCols
);
memcpy
(
POINTER_SHIFT
(
pMsg
->
pSchema
,
sizeof
(
SSchema
)
*
tbCfg
.
stbCfg
.
nCols
),
tbCfg
.
stbCfg
.
pTagSchema
,
sizeof
(
SSchema
)
*
tbCfg
.
stbCfg
.
nTagCols
);
break
;
case
META_CHILD_TABLE
:
ASSERT
(
0
);
...
...
source/libs/parser/src/insertParser.c
浏览文件 @
2010f837
...
...
@@ -64,64 +64,6 @@ typedef struct SInsertParseContext {
SInsertStmtInfo
*
pOutput
;
}
SInsertParseContext
;
static
FORCE_INLINE
int32_t
toDouble
(
SToken
*
pToken
,
double
*
value
,
char
**
endPtr
)
{
errno
=
0
;
*
value
=
strtold
(
pToken
->
z
,
endPtr
);
// not a valid integer number, return error
if
((
*
endPtr
-
pToken
->
z
)
!=
pToken
->
n
)
{
return
TK_ILLEGAL
;
}
return
pToken
->
type
;
}
static
int32_t
toInt64
(
const
char
*
z
,
int16_t
type
,
int32_t
n
,
int64_t
*
value
,
bool
issigned
)
{
errno
=
0
;
int32_t
ret
=
0
;
char
*
endPtr
=
NULL
;
if
(
type
==
TK_FLOAT
)
{
double
v
=
strtod
(
z
,
&
endPtr
);
if
((
errno
==
ERANGE
&&
v
==
HUGE_VALF
)
||
isinf
(
v
)
||
isnan
(
v
))
{
ret
=
-
1
;
}
else
if
((
issigned
&&
(
v
<
INT64_MIN
||
v
>
INT64_MAX
))
||
((
!
issigned
)
&&
(
v
<
0
||
v
>
UINT64_MAX
)))
{
ret
=
-
1
;
}
else
{
*
value
=
(
int64_t
)
round
(
v
);
}
errno
=
0
;
return
ret
;
}
int32_t
radix
=
10
;
if
(
type
==
TK_HEX
)
{
radix
=
16
;
}
else
if
(
type
==
TK_BIN
)
{
radix
=
2
;
}
// the string may be overflow according to errno
if
(
!
issigned
)
{
const
char
*
p
=
z
;
while
(
*
p
!=
0
&&
*
p
==
' '
)
p
++
;
if
(
*
p
!=
0
&&
*
p
==
'-'
)
{
return
-
1
;}
*
value
=
strtoull
(
z
,
&
endPtr
,
radix
);
}
else
{
*
value
=
strtoll
(
z
,
&
endPtr
,
radix
);
}
// not a valid integer number, return error
if
(
endPtr
-
z
!=
n
||
errno
==
ERANGE
)
{
ret
=
-
1
;
}
errno
=
0
;
return
ret
;
}
static
int32_t
skipInsertInto
(
SInsertParseContext
*
pCxt
)
{
SToken
sToken
;
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
...
...
@@ -159,10 +101,8 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
char
tableName
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
name
,
tableName
);
SParseBasicCtx
*
pBasicCtx
=
&
pCxt
->
pComCxt
->
ctx
;
CHECK_CODE
(
catalogGetTableMeta
(
pBasicCtx
->
pCatalog
,
pBasicCtx
->
pTransporter
,
&
pBasicCtx
->
mgmtEpSet
,
&
name
,
&
pCxt
->
pTableMeta
));
SVgroupInfo
vg
;
CHECK_CODE
(
catalogGetTableHashVgroup
(
pBasicCtx
->
pCatalog
,
pBasicCtx
->
pTransporter
,
&
pBasicCtx
->
mgmtEpSet
,
&
name
,
&
vg
));
CHECK_CODE
(
taosHashPut
(
pCxt
->
pVgroupsHashObj
,
(
const
char
*
)
&
vg
.
vgId
,
sizeof
(
vg
.
vgId
),
(
char
*
)
&
vg
,
sizeof
(
vg
)));
...
...
@@ -349,207 +289,6 @@ static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *p
return
TSDB_CODE_SUCCESS
;
}
//static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
// if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
// type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
// (pToken->n == 0) || (type == TK_RP)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
// }
//
// if (IS_NUMERIC_TYPE(type) && pToken->n == 0) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z);
// }
//
// // Remove quotation marks
// if (TK_STRING == type) {
// if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
// return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
// }
//
// // delete escape character: \\, \', \"
// char delim = pToken->z[0];
// int32_t cnt = 0;
// int32_t j = 0;
// for (uint32_t k = 1; k < pToken->n - 1; ++k) {
// if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
// tmpTokenBuf[j] = pToken->z[k + 1];
// cnt++;
// j++;
// k++;
// continue;
// }
// tmpTokenBuf[j] = pToken->z[k];
// j++;
// }
//
// tmpTokenBuf[j] = 0;
// pToken->z = tmpTokenBuf;
// pToken->n -= 2 + cnt;
// }
//
// return TSDB_CODE_SUCCESS;
//}
//static FORCE_INLINE int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
// int64_t iv;
// char *endptr = NULL;
// bool isSigned = false;
//
// CHECK_CODE(checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf));
//
// if (isNullStr(pToken)) {
// if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
// int64_t tmpVal = 0;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// return func(getNullValue(pSchema->type), 0, param);
// }
//
// switch (pSchema->type) {
// case TSDB_DATA_TYPE_BOOL: {
// if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
// if (strncmp(pToken->z, "true", pToken->n) == 0) {
// return func(&TRUE_VALUE, pSchema->bytes, param);
// } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
// return func(&FALSE_VALUE, pSchema->bytes, param);
// } else {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
// }
// } else if (pToken->type == TK_INTEGER) {
// return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
// } else if (pToken->type == TK_FLOAT) {
// return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
// } else {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
// }
// }
//
// case TSDB_DATA_TYPE_TINYINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
// } else if (!IS_VALID_TINYINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
// }
//
// uint8_t tmpVal = (uint8_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UTINYINT:{
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
// } else if (!IS_VALID_UTINYINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
// }
// uint8_t tmpVal = (uint8_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_SMALLINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
// } else if (!IS_VALID_SMALLINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
// }
// int16_t tmpVal = (int16_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_USMALLINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
// } else if (!IS_VALID_USMALLINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
// }
// uint16_t tmpVal = (uint16_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_INT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
// } else if (!IS_VALID_INT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
// }
// int32_t tmpVal = (int32_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
// } else if (!IS_VALID_UINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
// }
// uint32_t tmpVal = (uint32_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_BIGINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
// } else if (!IS_VALID_BIGINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
// }
// return func(&iv, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UBIGINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
// } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
// }
// uint64_t tmpVal = (uint64_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_FLOAT: {
// double dv;
// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
// }
// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
// }
// float tmpVal = (float)dv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_DOUBLE: {
// double dv;
// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
// }
// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
// }
// return func(&dv, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_BINARY: {
// // too long values will return invalid sql, not be truncated automatically
// if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
// return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
// }
// return func(pToken->z, pToken->n, param);
// }
// case TSDB_DATA_TYPE_NCHAR: {
// return func(pToken->z, pToken->n, param);
// }
// case TSDB_DATA_TYPE_TIMESTAMP: {
// int64_t tmpVal;
// if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
// }
// return func(&tmpVal, pSchema->bytes, param);
// }
// }
//
// return TSDB_CODE_FAILED;
//}
// pSql -> tag1_name, ...)
static
int32_t
parseBoundColumns
(
SInsertParseContext
*
pCxt
,
SParsedDataColInfo
*
pColList
,
SSchema
*
pSchema
)
{
int32_t
nCols
=
pColList
->
numOfCols
;
...
...
source/libs/parser/src/parserUtil.c
浏览文件 @
2010f837
...
...
@@ -1639,9 +1639,9 @@ static bool isNullStr(SToken *pToken) {
}
static
FORCE_INLINE
int32_t
checkAndTrimValue
(
SToken
*
pToken
,
uint32_t
type
,
char
*
tmpTokenBuf
,
SMsgBuf
*
pMsgBuf
)
{
if
((
type
!=
TK_NOW
&&
type
!=
TK_INTEGER
&&
type
!=
TK_STRING
&&
type
!=
TK_FLOAT
&&
type
!=
TK_BOOL
&&
type
!=
TK_NULL
&&
type
!=
TK_HEX
&&
type
!=
TK_OCT
&&
type
!=
TK_BIN
)
||
(
pToken
->
n
==
0
)
||
(
type
==
TK_RP
))
{
if
((
pToken
->
type
!=
TK_NOW
&&
pToken
->
type
!=
TK_INTEGER
&&
pToken
->
type
!=
TK_STRING
&&
pToken
->
type
!=
TK_FLOAT
&&
pToken
->
type
!=
TK_BOOL
&&
pToken
->
type
!=
TK_NULL
&&
pToken
->
type
!=
TK_HEX
&&
pToken
->
type
!=
TK_OCT
&&
pToken
->
type
!=
TK_BIN
)
||
(
pToken
->
n
==
0
)
||
(
pToken
->
type
==
TK_RP
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid data or symbol"
,
pToken
->
z
);
}
...
...
@@ -1785,7 +1785,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case
TSDB_DATA_TYPE_TINYINT
:
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
pToken
->
type
,
&
iv
,
&
isSigned
))
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
iv
,
&
isSigned
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid tinyint data"
,
pToken
->
z
);
}
else
if
(
!
IS_VALID_TINYINT
(
iv
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"tinyint data overflow"
,
pToken
->
z
);
...
...
@@ -1796,7 +1796,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case
TSDB_DATA_TYPE_UTINYINT
:{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
pToken
->
type
,
&
iv
,
&
isSigned
))
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
iv
,
&
isSigned
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid unsigned tinyint data"
,
pToken
->
z
);
}
else
if
(
!
IS_VALID_UTINYINT
(
iv
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"unsigned tinyint data overflow"
,
pToken
->
z
);
...
...
@@ -1806,7 +1806,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
pToken
->
type
,
&
iv
,
&
isSigned
))
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
iv
,
&
isSigned
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid smallint data"
,
pToken
->
z
);
}
else
if
(
!
IS_VALID_SMALLINT
(
iv
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"smallint data overflow"
,
pToken
->
z
);
...
...
@@ -1816,7 +1816,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case
TSDB_DATA_TYPE_USMALLINT
:
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
pToken
->
type
,
&
iv
,
&
isSigned
))
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
iv
,
&
isSigned
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid unsigned smallint data"
,
pToken
->
z
);
}
else
if
(
!
IS_VALID_USMALLINT
(
iv
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"unsigned smallint data overflow"
,
pToken
->
z
);
...
...
@@ -1826,7 +1826,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case
TSDB_DATA_TYPE_INT
:
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
pToken
->
type
,
&
iv
,
&
isSigned
))
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
iv
,
&
isSigned
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid int data"
,
pToken
->
z
);
}
else
if
(
!
IS_VALID_INT
(
iv
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"int data overflow"
,
pToken
->
z
);
...
...
@@ -1836,7 +1836,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case
TSDB_DATA_TYPE_UINT
:
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
pToken
->
type
,
&
iv
,
&
isSigned
))
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
iv
,
&
isSigned
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid unsigned int data"
,
pToken
->
z
);
}
else
if
(
!
IS_VALID_UINT
(
iv
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"unsigned int data overflow"
,
pToken
->
z
);
...
...
@@ -1846,7 +1846,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case
TSDB_DATA_TYPE_BIGINT
:
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
pToken
->
type
,
&
iv
,
&
isSigned
))
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
iv
,
&
isSigned
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid bigint data"
,
pToken
->
z
);
}
else
if
(
!
IS_VALID_BIGINT
(
iv
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"bigint data overflow"
,
pToken
->
z
);
...
...
@@ -1855,7 +1855,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
}
case
TSDB_DATA_TYPE_UBIGINT
:
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
pToken
->
type
,
&
iv
,
&
isSigned
))
{
if
(
TSDB_CODE_SUCCESS
!=
toInteger
(
pToken
->
z
,
pToken
->
n
,
10
,
&
iv
,
&
isSigned
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid unsigned bigint data"
,
pToken
->
z
);
}
else
if
(
!
IS_VALID_UBIGINT
((
uint64_t
)
iv
))
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"unsigned bigint data overflow"
,
pToken
->
z
);
...
...
source/libs/parser/test/mockCatalogService.cpp
浏览文件 @
2010f837
...
...
@@ -97,8 +97,8 @@ public:
int32_t
catalogGetTableMeta
(
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
const
{
std
::
unique_ptr
<
STableMeta
>
table
;
char
db
[
TSDB_DB_
F
NAME_LEN
]
=
{
0
};
tNameGet
Full
DbName
(
pTableName
,
db
);
char
db
[
TSDB_DB_NAME_LEN
]
=
{
0
};
tNameGetDbName
(
pTableName
,
db
);
const
char
*
tname
=
tNameGetTableName
(
pTableName
);
int32_t
code
=
copyTableSchemaMeta
(
db
,
tname
,
&
table
);
...
...
@@ -111,6 +111,7 @@ public:
int32_t
catalogGetTableHashVgroup
(
const
SName
*
pTableName
,
SVgroupInfo
*
vgInfo
)
const
{
// todo
vgInfo
->
vgId
=
1
;
return
0
;
}
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
2010f837
...
...
@@ -207,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
}
taosArrayPush
(
currentLevel
,
&
subplan
);
pCxt
->
pCurrentSubplan
=
subplan
;
++
(
pCxt
->
pDag
->
numOfSubplans
);
return
subplan
;
}
...
...
@@ -293,11 +294,14 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SArray
*
vgs
=
(
SArray
*
)
pPlanNode
->
pExtInfo
;
size_t
numOfVg
=
taosArrayGetSize
(
vgs
);
for
(
int32_t
i
=
0
;
i
<
numOfVg
;
++
i
)
{
STORE_CURRENT_SUBPLAN
(
pCxt
);
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_MODIFY
);
SVgDataBlocks
*
blocks
=
(
SVgDataBlocks
*
)
taosArrayGetP
(
vgs
,
i
);
vgroupInfoToEpSet
(
&
blocks
->
vg
,
&
subplan
->
execEpSet
);
subplan
->
pNode
=
NULL
;
subplan
->
pDataSink
=
createDataInserter
(
pCxt
,
blocks
);
subplan
->
type
=
QUERY_TYPE_MODIFY
;
RECOVERY_CURRENT_SUBPLAN
(
pCxt
);
}
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
2010f837
...
...
@@ -166,7 +166,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
}
for
(
int32_t
n
=
0
;
n
<
levelPlanNum
;
++
n
)
{
SSubplan
*
plan
=
taosArrayGet
(
levelPlans
,
n
);
SSubplan
*
plan
=
taosArrayGet
P
(
levelPlans
,
n
);
SSchTask
task
=
{
0
};
if
(
plan
->
type
==
QUERY_TYPE_MODIFY
)
{
...
...
source/util/src/tlog.c
浏览文件 @
2010f837
...
...
@@ -85,7 +85,6 @@ int32_t dDebugFlag = 135;
int32_t
vDebugFlag
=
135
;
int32_t
cDebugFlag
=
131
;
int32_t
jniDebugFlag
=
131
;
int32_t
odbcDebugFlag
=
131
;
int32_t
qDebugFlag
=
131
;
int32_t
rpcDebugFlag
=
131
;
int32_t
uDebugFlag
=
131
;
...
...
tests/script/sh/deploy.sh
浏览文件 @
2010f837
...
...
@@ -120,6 +120,7 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG
echo
"secondEp
${
HOSTNAME
}
:7200"
>>
$TAOS_CFG
echo
"fqdn
${
HOSTNAME
}
"
>>
$TAOS_CFG
echo
"serverPort
${
NODE
}
"
>>
$TAOS_CFG
echo
"supportVnodes 16"
>>
$TAOS_CFG
echo
"dataDir
$DATA_DIR
"
>>
$TAOS_CFG
echo
"logDir
$LOG_DIR
"
>>
$TAOS_CFG
echo
"debugFlag 0"
>>
$TAOS_CFG
...
...
tests/script/tmp/dnodes.sim
浏览文件 @
2010f837
system sh/stop_dnodes.sh
############## config parameter #####################
$node1 = 192.168.0.201
$node2 = 192.168.0.202
...
...
@@ -10,16 +7,36 @@ $node4 = 192.168.0.204
$self = $node1
$num = 25
############### deploy firstEp #####################
#deploy = 0, start = 1, stop = 2
$option = 0
print =============== option:$option
############### stop dnodes #####################
if $option == 0 then
system sh/stop_dnodes.sh
endi
############### process firstEp #####################
$firstEp = $node1 . :7100
$firstPort = 7100
if $self == $node1 then
if $option == 1 then
system sh/exec.sh -n dnode1 -s start
endi
if $option == 2 then
system sh/exec.sh -n dnode1 -s stop -x SIGINT
endi
if $option == 0 then
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp
system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp
system sh/cfg.sh -n dnode1 -c fqdn -v $node1
system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
sql connect
...
...
@@ -55,9 +72,10 @@ if $self == $node1 then
$i = $i + 1
sql create dnode $node4 port $port
endw
endi
endi
###############
deploy nodes
#####################
###############
process nodes
#####################
$i = 0
while $i < $num
...
...
@@ -67,6 +85,15 @@ while $i < $num
$dnodename = dnode . $index
$i = $i + 1
if $option == 1 then
system sh/exec.sh -n $dnodename -s start
endi
if $option == 2 then
system sh/exec.sh -n $dnodename -s stop -x SIGINT
endi
if $option == 0 then
system sh/deploy.sh -n $dnodename -i 1
system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp
system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp
...
...
@@ -74,4 +101,5 @@ while $i < $num
system sh/cfg.sh -n $dnodename -c serverPort -v $port
system sh/exec.sh -n $dnodename -s start
endi
endw
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录