Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
81735172
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
81735172
编写于
2月 10, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/tkv
上级
abc92880
4a07d2e7
变更
12
展开全部
显示空白变更内容
内联
并排
Showing
12 changed file
with
598 addition
and
334 deletion
+598
-334
include/common/tmsg.h
include/common/tmsg.h
+5
-2
include/nodes/nodes.h
include/nodes/nodes.h
+9
-1
include/util/taoserror.h
include/util/taoserror.h
+7
-4
source/common/src/tmsg.c
source/common/src/tmsg.c
+42
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+93
-79
source/dnode/mnode/impl/test/stb/stb.cpp
source/dnode/mnode/impl/test/stb/stb.cpp
+242
-201
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+8
-0
source/libs/parser/src/astCreateFuncs.c
source/libs/parser/src/astCreateFuncs.c
+10
-2
source/libs/parser/src/parserImpl.c
source/libs/parser/src/parserImpl.c
+99
-13
source/libs/parser/test/newParserTest.cpp
source/libs/parser/test/newParserTest.cpp
+59
-7
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+5
-3
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+19
-22
未找到文件。
include/common/tmsg.h
浏览文件 @
81735172
...
...
@@ -278,10 +278,13 @@ void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq);
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
alterType
;
int32_t
numOf
Schema
s
;
S
Schema
pSchemas
[]
;
int32_t
numOf
Field
s
;
S
Array
*
pFields
;
}
SMAltertbReq
;
int32_t
tSerializeSMAlterStbReq
(
void
**
buf
,
SMAltertbReq
*
pReq
);
void
*
tDeserializeSMAlterStbReq
(
void
*
buf
,
SMAltertbReq
*
pReq
);
typedef
struct
{
int32_t
pid
;
char
app
[
TSDB_APP_NAME_LEN
];
...
...
include/nodes/nodes.h
浏览文件 @
81735172
...
...
@@ -121,6 +121,14 @@ typedef struct SColumnNode {
typedef
struct
SValueNode
{
SExprNode
node
;
// QUERY_NODE_VALUE
char
*
literal
;
bool
isDuration
;
union
{
bool
b
;
int64_t
i
;
uint64_t
u
;
double
d
;
char
*
p
;
}
datum
;
}
SValueNode
;
typedef
enum
EOperatorType
{
...
...
@@ -181,7 +189,7 @@ typedef struct SNodeListNode {
}
SNodeListNode
;
typedef
struct
SFunctionNode
{
SExprNode
typ
e
;
// QUERY_NODE_FUNCTION
SExprNode
nod
e
;
// QUERY_NODE_FUNCTION
char
functionName
[
TSDB_FUNC_NAME_LEN
];
int32_t
funcId
;
SNodeList
*
pParameterList
;
...
...
include/util/taoserror.h
浏览文件 @
81735172
...
...
@@ -444,10 +444,13 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error
//parser
#define TSDB_CODE_PARSER_INVALID_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2601) //invalid column name
#define TSDB_CODE_PARSER_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x2602) //table not exist
#define TSDB_CODE_PARSER_AMBIGUOUS_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2603) //ambiguous column
#define TSDB_CODE_PARSER_WRONG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2604) //wrong value type
#define TSDB_CODE_PAR_INVALID_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2601) //invalid column name
#define TSDB_CODE_PAR_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x2602) //table not exist
#define TSDB_CODE_PAR_AMBIGUOUS_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2603) //ambiguous column
#define TSDB_CODE_PAR_WRONG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2604) //wrong value type
#define TSDB_CODE_PAR_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2605) //invalid number of arguments
#define TSDB_CODE_PAR_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2606) //inconsistent datatypes
#define TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION TAOS_DEF_ERROR_CODE(0, 0x2607) //there mustn't be aggregation
#ifdef __cplusplus
}
...
...
source/common/src/tmsg.c
浏览文件 @
81735172
...
...
@@ -415,3 +415,45 @@ void *tDeserializeSMDropStbReq(void *buf, SMDropStbReq *pReq) {
return
buf
;
}
int32_t
tSerializeSMAlterStbReq
(
void
**
buf
,
SMAltertbReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pReq
->
name
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
alterType
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
numOfFields
);
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFields
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pFields
,
i
);
tlen
+=
taosEncodeFixedU8
(
buf
,
pField
->
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pField
->
bytes
);
tlen
+=
taosEncodeString
(
buf
,
pField
->
name
);
}
return
tlen
;
}
void
*
tDeserializeSMAlterStbReq
(
void
*
buf
,
SMAltertbReq
*
pReq
)
{
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
name
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
alterType
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
numOfFields
);
pReq
->
pFields
=
taosArrayInit
(
pReq
->
numOfFields
,
sizeof
(
SField
));
if
(
pReq
->
pFields
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFields
;
++
i
)
{
SField
field
=
{
0
};
buf
=
taosDecodeFixedU8
(
buf
,
&
field
.
type
);
buf
=
taosDecodeFixedI32
(
buf
,
&
field
.
bytes
);
buf
=
taosDecodeStringTo
(
buf
,
field
.
name
);
if
(
taosArrayPush
(
pReq
->
pFields
,
&
field
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
}
return
buf
;
}
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
81735172
...
...
@@ -601,26 +601,23 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
}
static
int32_t
mndCheckAlterStbReq
(
SMAltertbReq
*
pAlter
)
{
pAlter
->
numOfSchemas
=
htonl
(
pAlter
->
numOfSchemas
);
for
(
int32_t
i
=
0
;
i
<
pAlter
->
numOfSchemas
;
++
i
)
{
SSchema
*
pSchema
=
&
pAlter
->
pSchemas
[
i
];
pSchema
->
colId
=
htonl
(
pSchema
->
colId
);
pSchema
->
bytes
=
htonl
(
pSchema
->
bytes
);
if
(
pSchema
->
type
<=
0
)
{
if
(
pAlter
->
numOfFields
<
1
||
pAlter
->
numOfFields
!=
(
int32_t
)
taosArrayGetSize
(
pAlter
->
pFields
))
{
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
if
(
pSchema
->
colId
<
0
||
pSchema
->
colId
>=
(
TSDB_MAX_COLUMNS
+
TSDB_MAX_TAGS
))
{
for
(
int32_t
i
=
0
;
i
<
pAlter
->
numOfFields
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pAlter
->
pFields
,
i
);
if
(
pField
->
type
<=
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
if
(
p
Schema
->
bytes
<=
0
)
{
if
(
p
Field
->
bytes
<=
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
if
(
p
Schema
->
name
[
0
]
==
0
)
{
if
(
p
Field
->
name
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
...
...
@@ -662,7 +659,7 @@ static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
return
0
;
}
static
int32_t
mndAddSuperTableTag
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
const
SSchema
*
pSchema
s
,
int32_t
ntags
)
{
static
int32_t
mndAddSuperTableTag
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
SArray
*
pField
s
,
int32_t
ntags
)
{
if
(
pOld
->
numOfTags
+
ntags
>
TSDB_MAX_TAGS
)
{
terrno
=
TSDB_CODE_MND_TOO_MANY_TAGS
;
return
-
1
;
...
...
@@ -673,33 +670,34 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const SSc
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
ntags
;
i
++
)
{
if
(
mndFindSuperTableColumnIndex
(
pOld
,
pSchemas
[
i
].
name
)
>
0
)
{
terrno
=
TSDB_CODE_MND_COLUMN_ALREADY_EXIST
;
pNew
->
numOfTags
=
pNew
->
numOfTags
+
ntags
;
if
(
mndAllocStbSchemas
(
pOld
,
pNew
)
!=
0
)
{
return
-
1
;
}
if
(
mndFindSuperTableTagIndex
(
pOld
,
pSchemas
[
i
].
name
)
>
0
)
{
terrno
=
TSDB_CODE_MND_TAG_ALREADY_EXIST
;
for
(
int32_t
i
=
0
;
i
<
ntags
;
i
++
)
{
SField
*
pField
=
taosArrayGet
(
pFields
,
i
);
if
(
mndFindSuperTableColumnIndex
(
pOld
,
pField
->
name
)
>
0
)
{
terrno
=
TSDB_CODE_MND_COLUMN_ALREADY_EXIST
;
return
-
1
;
}
}
pNew
->
numOfTags
=
pNew
->
numOfTags
+
ntags
;
if
(
mndAllocStbSchemas
(
pOld
,
pNew
)
!=
0
)
{
if
(
mndFindSuperTableTagIndex
(
pOld
,
pField
->
name
)
>
0
)
{
terrno
=
TSDB_CODE_MND_TAG_ALREADY_EXIST
;
return
-
1
;
}
memcpy
(
pNew
->
pTags
+
pOld
->
numOfTags
,
pSchemas
,
sizeof
(
SSchema
)
*
ntags
)
;
for
(
int32_t
i
=
pOld
->
numOfTags
;
i
<
pNew
->
numOfTags
;
i
++
)
{
SSchema
*
pSchema
=
&
pNew
->
pTags
[
i
]
;
SSchema
*
pSchema
=
&
pNew
->
pTags
[
pOld
->
numOfTags
+
i
]
;
pSchema
->
bytes
=
pField
->
bytes
;
pSchema
->
type
=
pField
->
type
;
memcpy
(
pSchema
->
name
,
pField
->
name
,
TSDB_COL_NAME_LEN
)
;
pSchema
->
colId
=
pNew
->
nextColId
;
pNew
->
nextColId
++
;
mDebug
(
"stb:%s, start to add tag %s"
,
pNew
->
name
,
pSchema
->
name
);
}
pNew
->
version
++
;
mDebug
(
"stb:%s, start to add tag %s"
,
pNew
->
name
,
pSchemas
[
0
].
name
);
return
0
;
}
...
...
@@ -722,7 +720,18 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
return
0
;
}
static
int32_t
mndAlterStbTagName
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
const
char
*
oldTagName
,
const
char
*
newTagName
)
{
static
int32_t
mndAlterStbTagName
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
SArray
*
pFields
)
{
if
((
int32_t
)
taosArrayGetSize
(
pFields
)
!=
2
)
{
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
SField
*
pField0
=
taosArrayGet
(
pFields
,
0
);
SField
*
pField1
=
taosArrayGet
(
pFields
,
1
);
const
char
*
oldTagName
=
pField0
->
name
;
const
char
*
newTagName
=
pField1
->
name
;
int32_t
tag
=
mndFindSuperTableTagIndex
(
pOld
,
oldTagName
);
if
(
tag
<
0
)
{
terrno
=
TSDB_CODE_MND_TAG_NOT_EXIST
;
...
...
@@ -751,8 +760,8 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, const char
return
0
;
}
static
int32_t
mndAlterStbTagBytes
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
const
S
Schema
*
pSchema
)
{
int32_t
tag
=
mndFindSuperTableTagIndex
(
pOld
,
p
Schema
->
name
);
static
int32_t
mndAlterStbTagBytes
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
const
S
Field
*
pField
)
{
int32_t
tag
=
mndFindSuperTableTagIndex
(
pOld
,
p
Field
->
name
);
if
(
tag
<
0
)
{
terrno
=
TSDB_CODE_MND_TAG_NOT_EXIST
;
return
-
1
;
...
...
@@ -769,51 +778,52 @@ static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSc
return
-
1
;
}
if
(
p
Schema
->
bytes
<=
pTag
->
bytes
)
{
if
(
p
Field
->
bytes
<=
pTag
->
bytes
)
{
terrno
=
TSDB_CODE_MND_INVALID_ROW_BYTES
;
return
-
1
;
}
pTag
->
bytes
=
p
Schema
->
bytes
;
pTag
->
bytes
=
p
Field
->
bytes
;
pNew
->
version
++
;
mDebug
(
"stb:%s, start to modify tag len %s to %d"
,
pNew
->
name
,
p
Schema
->
name
,
pSchema
->
bytes
);
mDebug
(
"stb:%s, start to modify tag len %s to %d"
,
pNew
->
name
,
p
Field
->
name
,
pField
->
bytes
);
return
0
;
}
static
int32_t
mndAddSuperTableColumn
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
const
SSchema
*
pSchema
s
,
int32_t
ncols
)
{
static
int32_t
mndAddSuperTableColumn
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
SArray
*
pField
s
,
int32_t
ncols
)
{
if
(
pOld
->
numOfColumns
+
ncols
+
pOld
->
numOfTags
>
TSDB_MAX_COLUMNS
)
{
terrno
=
TSDB_CODE_MND_TOO_MANY_COLUMNS
;
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
ncols
;
i
++
)
{
if
(
mndFindSuperTableColumnIndex
(
pOld
,
pSchemas
[
i
].
name
)
>
0
)
{
terrno
=
TSDB_CODE_MND_COLUMN_ALREADY_EXIST
;
pNew
->
numOfColumns
=
pNew
->
numOfColumns
+
ncols
;
if
(
mndAllocStbSchemas
(
pOld
,
pNew
)
!=
0
)
{
return
-
1
;
}
if
(
mndFindSuperTableTagIndex
(
pOld
,
pSchemas
[
i
].
name
)
>
0
)
{
terrno
=
TSDB_CODE_MND_TAG_ALREADY_EXIST
;
for
(
int32_t
i
=
0
;
i
<
ncols
;
i
++
)
{
SField
*
pField
=
taosArrayGet
(
pFields
,
i
);
if
(
mndFindSuperTableColumnIndex
(
pOld
,
pField
->
name
)
>
0
)
{
terrno
=
TSDB_CODE_MND_COLUMN_ALREADY_EXIST
;
return
-
1
;
}
}
pNew
->
numOfColumns
=
pNew
->
numOfColumns
+
ncols
;
if
(
mndAllocStbSchemas
(
pOld
,
pNew
)
!=
0
)
{
if
(
mndFindSuperTableTagIndex
(
pOld
,
pField
->
name
)
>
0
)
{
terrno
=
TSDB_CODE_MND_TAG_ALREADY_EXIST
;
return
-
1
;
}
memcpy
(
pNew
->
pColumns
+
pOld
->
numOfColumns
,
pSchemas
,
sizeof
(
SSchema
)
*
ncols
)
;
for
(
int32_t
i
=
pOld
->
numOfColumns
;
i
<
pNew
->
numOfColumns
;
i
++
)
{
SSchema
*
pSchema
=
&
pNew
->
pColumns
[
i
]
;
SSchema
*
pSchema
=
&
pNew
->
pColumns
[
pOld
->
numOfColumns
+
i
]
;
pSchema
->
bytes
=
pField
->
bytes
;
pSchema
->
type
=
pField
->
type
;
memcpy
(
pSchema
->
name
,
pField
->
name
,
TSDB_COL_NAME_LEN
)
;
pSchema
->
colId
=
pNew
->
nextColId
;
pNew
->
nextColId
++
;
mDebug
(
"stb:%s, start to add column %s"
,
pNew
->
name
,
pSchema
->
name
);
}
pNew
->
version
++
;
mDebug
(
"stb:%s, start to add column %s"
,
pNew
->
name
,
pSchemas
[
0
].
name
);
return
0
;
}
...
...
@@ -846,8 +856,8 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
return
0
;
}
static
int32_t
mndAlterStbColumnBytes
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
const
S
Schema
*
pSchema
)
{
int32_t
col
=
mndFindSuperTableColumnIndex
(
pOld
,
p
Schema
->
name
);
static
int32_t
mndAlterStbColumnBytes
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
,
const
S
Field
*
pField
)
{
int32_t
col
=
mndFindSuperTableColumnIndex
(
pOld
,
p
Field
->
name
);
if
(
col
<
0
)
{
terrno
=
TSDB_CODE_MND_COLUMN_NOT_EXIST
;
return
-
1
;
...
...
@@ -855,7 +865,7 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
uint32_t
nLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
pOld
->
numOfColumns
;
++
i
)
{
nLen
+=
(
pOld
->
pColumns
[
i
].
colId
==
col
)
?
p
Schema
->
bytes
:
pOld
->
pColumns
[
i
].
bytes
;
nLen
+=
(
pOld
->
pColumns
[
i
].
colId
==
col
)
?
p
Field
->
bytes
:
pOld
->
pColumns
[
i
].
bytes
;
}
if
(
nLen
>
TSDB_MAX_BYTES_PER_ROW
)
{
...
...
@@ -873,15 +883,15 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
return
-
1
;
}
if
(
p
Schema
->
bytes
<=
pCol
->
bytes
)
{
if
(
p
Field
->
bytes
<=
pCol
->
bytes
)
{
terrno
=
TSDB_CODE_MND_INVALID_ROW_BYTES
;
return
-
1
;
}
pCol
->
bytes
=
p
Schema
->
bytes
;
pCol
->
bytes
=
p
Field
->
bytes
;
pNew
->
version
++
;
mDebug
(
"stb:%s, start to modify col len %s to %d"
,
pNew
->
name
,
p
Schema
->
name
,
pSchema
->
bytes
);
mDebug
(
"stb:%s, start to modify col len %s to %d"
,
pNew
->
name
,
p
Field
->
name
,
pField
->
bytes
);
return
0
;
}
...
...
@@ -950,28 +960,29 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *
int32_t
code
=
-
1
;
STrans
*
pTrans
=
NULL
;
SField
*
pField0
=
taosArrayGet
(
pAlter
->
pFields
,
0
);
switch
(
pAlter
->
alterType
)
{
case
TSDB_ALTER_TABLE_ADD_TAG
:
code
=
mndAddSuperTableTag
(
pOld
,
&
stbObj
,
pAlter
->
p
Schemas
,
1
);
code
=
mndAddSuperTableTag
(
pOld
,
&
stbObj
,
pAlter
->
p
Fields
,
pAlter
->
numOfFields
);
break
;
case
TSDB_ALTER_TABLE_DROP_TAG
:
code
=
mndDropSuperTableTag
(
pOld
,
&
stbObj
,
p
Alter
->
pSchemas
[
0
].
name
);
code
=
mndDropSuperTableTag
(
pOld
,
&
stbObj
,
p
Field0
->
name
);
break
;
case
TSDB_ALTER_TABLE_UPDATE_TAG_NAME
:
code
=
mndAlterStbTagName
(
pOld
,
&
stbObj
,
pAlter
->
p
Schemas
[
0
].
name
,
pAlter
->
pSchemas
[
1
].
name
);
code
=
mndAlterStbTagName
(
pOld
,
&
stbObj
,
pAlter
->
p
Fields
);
break
;
case
TSDB_ALTER_TABLE_UPDATE_TAG_BYTES
:
code
=
mndAlterStbTagBytes
(
pOld
,
&
stbObj
,
&
pAlter
->
pSchemas
[
0
]
);
code
=
mndAlterStbTagBytes
(
pOld
,
&
stbObj
,
pField0
);
break
;
case
TSDB_ALTER_TABLE_ADD_COLUMN
:
code
=
mndAddSuperTableColumn
(
pOld
,
&
stbObj
,
pAlter
->
p
Schemas
,
1
);
code
=
mndAddSuperTableColumn
(
pOld
,
&
stbObj
,
pAlter
->
p
Fields
,
pAlter
->
numOfFields
);
break
;
case
TSDB_ALTER_TABLE_DROP_COLUMN
:
code
=
mndDropSuperTableColumn
(
pOld
,
&
stbObj
,
p
Alter
->
pSchemas
[
0
].
name
);
code
=
mndDropSuperTableColumn
(
pOld
,
&
stbObj
,
p
Field0
->
name
);
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:
code
=
mndAlterStbColumnBytes
(
pOld
,
&
stbObj
,
&
pAlter
->
pSchemas
[
0
]
);
code
=
mndAlterStbColumnBytes
(
pOld
,
&
stbObj
,
pField0
);
break
;
default:
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
...
...
@@ -1002,39 +1013,42 @@ ALTER_STB_OVER:
static
int32_t
mndProcessMAlterStbReq
(
SMnodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pMnode
;
SMAltertbReq
*
pAlter
=
pReq
->
rpcMsg
.
pCont
;
int32_t
code
=
-
1
;
SDbObj
*
pDb
=
NULL
;
SStbObj
*
pStb
=
NULL
;
SMAltertbReq
alterReq
=
{
0
};
mDebug
(
"stb:%s, start to alter"
,
pAlter
->
name
)
;
if
(
tDeserializeSMAlterStbReq
(
pReq
->
rpcMsg
.
pCont
,
&
alterReq
)
==
NULL
)
goto
ALTER_STB_OVER
;
if
(
mndCheckAlterStbReq
(
pAlter
)
!=
0
)
{
mError
(
"stb:%s, failed to alter since %s"
,
pAlter
->
name
,
terrstr
());
return
-
1
;
}
mDebug
(
"stb:%s, start to alter"
,
alterReq
.
name
);
if
(
mndCheckAlterStbReq
(
&
alterReq
)
!=
0
)
goto
ALTER_STB_OVER
;
SDbObj
*
pDb
=
mndAcquireDbByStb
(
pMnode
,
pAlter
->
name
);
pDb
=
mndAcquireDbByStb
(
pMnode
,
alterReq
.
name
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_INVALID_DB
;
mError
(
"stb:%s, failed to alter since %s"
,
pAlter
->
name
,
terrstr
());
return
-
1
;
goto
ALTER_STB_OVER
;
}
SStbObj
*
pStb
=
mndAcquireStb
(
pMnode
,
pAlter
->
name
);
pStb
=
mndAcquireStb
(
pMnode
,
alterReq
.
name
);
if
(
pStb
==
NULL
)
{
mndReleaseDb
(
pMnode
,
pDb
);
terrno
=
TSDB_CODE_MND_STB_NOT_EXIST
;
mError
(
"stb:%s, failed to alter since %s"
,
pAlter
->
name
,
terrstr
());
return
-
1
;
goto
ALTER_STB_OVER
;
}
int32_t
code
=
mndAlterStb
(
pMnode
,
pReq
,
pAlter
,
pDb
,
pStb
);
mndReleaseStb
(
pMnode
,
pStb
);
code
=
mndAlterStb
(
pMnode
,
pReq
,
&
alterReq
,
pDb
,
pStb
);
ALTER_STB_OVER:
if
(
code
!=
0
)
{
mError
(
"stb:%s, failed to alter since %s"
,
pAlter
->
name
,
terrstr
());
return
code
;
mError
(
"stb:%s, failed to alter since %s"
,
alterReq
.
name
,
terrstr
());
}
else
{
code
=
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
mndReleaseStb
(
pMnode
,
pStb
);
mndReleaseDb
(
pMnode
,
pDb
);
taosArrayClear
(
alterReq
.
pFields
);
return
code
;
}
static
int32_t
mndProcessVAlterStbRsp
(
SMnodeMsg
*
pRsp
)
{
...
...
source/dnode/mnode/impl/test/stb/stb.cpp
浏览文件 @
81735172
此差异已折叠。
点击以展开。
source/libs/function/src/functionMgt.c
浏览文件 @
81735172
...
...
@@ -78,3 +78,11 @@ FuncDef setExecFuncs(FuncDef def, FExecGetEnv getEnv, FExecInit init, FExecProce
int32_t
registerFunc
(
FuncDef
func
)
{
}
int32_t
fmGetFuncResultType
(
FuncMgtHandle
handle
,
SFunctionNode
*
pFunc
)
{
return
TSDB_CODE_SUCCESS
;
}
bool
fmIsAggFunc
(
int32_t
funcId
)
{
return
false
;
}
source/libs/parser/src/astCreateFuncs.c
浏览文件 @
81735172
...
...
@@ -118,14 +118,22 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
val
->
literal
=
strndup
(
pLiteral
->
z
,
pLiteral
->
n
);
CHECK_OUT_OF_MEM
(
val
->
literal
);
val
->
node
.
resType
.
type
=
dataType
;
val
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_BOOL
].
bytes
;
val
->
node
.
resType
.
bytes
=
tDataTypes
[
dataType
].
bytes
;
if
(
TSDB_DATA_TYPE_TIMESTAMP
==
dataType
)
{
val
->
node
.
resType
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
}
return
(
SNode
*
)
val
;
}
SNode
*
createDurationValueNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pLiteral
)
{
SValueNode
*
val
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
CHECK_OUT_OF_MEM
(
val
);
// todo : calc, for example, 10s
val
->
literal
=
strndup
(
pLiteral
->
z
,
pLiteral
->
n
);
CHECK_OUT_OF_MEM
(
val
->
literal
);
val
->
isDuration
=
true
;
val
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_BIGINT
;
val
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_BIGINT
].
bytes
;
val
->
node
.
resType
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
return
(
SNode
*
)
val
;
}
...
...
source/libs/parser/src/parserImpl.c
浏览文件 @
81735172
...
...
@@ -16,7 +16,10 @@
#include "parserImpl.h"
#include "astCreateContext.h"
#include "functionMgt.h"
#include "parserInt.h"
#include "tglobal.h"
#include "ttime.h"
#include "ttoken.h"
typedef
void
*
(
*
FMalloc
)(
size_t
);
...
...
@@ -240,6 +243,7 @@ typedef enum ESqlClause {
typedef
struct
STranslateContext
{
SParseContext
*
pParseCxt
;
FuncMgtHandle
fmgt
;
int32_t
errCode
;
SMsgBuf
msgBuf
;
SArray
*
pNsLevel
;
// element is SArray*, the element of this subarray is STableNode*
...
...
@@ -251,21 +255,30 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
static
char
*
getSyntaxErrFormat
(
int32_t
errCode
)
{
switch
(
errCode
)
{
case
TSDB_CODE_PAR
SER
_INVALID_COLUMN
:
case
TSDB_CODE_PAR_INVALID_COLUMN
:
return
"Invalid column name : %s"
;
case
TSDB_CODE_PAR
SER
_TABLE_NOT_EXIST
:
case
TSDB_CODE_PAR_TABLE_NOT_EXIST
:
return
"Table does not exist : %s"
;
case
TSDB_CODE_PAR
SER
_AMBIGUOUS_COLUMN
:
case
TSDB_CODE_PAR_AMBIGUOUS_COLUMN
:
return
"Column ambiguously defined : %s"
;
case
TSDB_CODE_PAR
SER
_WRONG_VALUE_TYPE
:
case
TSDB_CODE_PAR_WRONG_VALUE_TYPE
:
return
"Invalid value type : %s"
;
case
TSDB_CODE_PAR_FUNTION_PARA_NUM
:
return
"Invalid number of arguments : %s"
;
case
TSDB_CODE_PAR_FUNTION_PARA_TYPE
:
return
"Inconsistent datatypes : %s"
;
case
TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION
:
return
"There mustn't be aggregation"
;
default:
return
"Unknown error"
;
}
}
static
int32_t
generateSyntaxErrMsg
(
STranslateContext
*
pCxt
,
int32_t
errCode
,
const
char
*
additionalInfo
)
{
snprintf
(
pCxt
->
msgBuf
.
buf
,
pCxt
->
msgBuf
.
len
,
getSyntaxErrFormat
(
errCode
),
additionalInfo
);
static
int32_t
generateSyntaxErrMsg
(
STranslateContext
*
pCxt
,
int32_t
errCode
,
...)
{
va_list
vArgList
;
va_start
(
vArgList
,
errCode
);
vsnprintf
(
pCxt
->
msgBuf
.
buf
,
pCxt
->
msgBuf
.
len
,
getSyntaxErrFormat
(
errCode
),
vArgList
);
va_end
(
vArgList
);
pCxt
->
errCode
=
errCode
;
return
errCode
;
}
...
...
@@ -394,7 +407,7 @@ static bool translateColumnWithPrefix(STranslateContext* pCxt, SColumnNode* pCol
if
(
findAndSetColumn
(
pCol
,
pTable
))
{
break
;
}
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR
SER
_INVALID_COLUMN
,
pCol
->
colName
);
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pCol
->
colName
);
return
false
;
}
}
...
...
@@ -409,14 +422,14 @@ static bool translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNode* p
STableNode
*
pTable
=
taosArrayGetP
(
pTables
,
i
);
if
(
findAndSetColumn
(
pCol
,
pTable
))
{
if
(
found
)
{
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR
SER
_AMBIGUOUS_COLUMN
,
pCol
->
colName
);
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_AMBIGUOUS_COLUMN
,
pCol
->
colName
);
return
false
;
}
found
=
true
;
}
}
if
(
!
found
)
{
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR
SER
_INVALID_COLUMN
,
pCol
->
colName
);
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pCol
->
colName
);
return
false
;
}
return
true
;
...
...
@@ -429,8 +442,72 @@ static bool translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
return
translateColumnWithoutPrefix
(
pCxt
,
pCol
);
}
// check literal format
static
int32_t
trimStringCopy
(
const
char
*
src
,
int32_t
len
,
char
*
dst
)
{
// delete escape character: \\, \', \"
char
delim
=
src
[
0
];
int32_t
cnt
=
0
;
int32_t
j
=
0
;
for
(
uint32_t
k
=
1
;
k
<
len
-
1
;
++
k
)
{
if
(
src
[
k
]
==
'\\'
||
(
src
[
k
]
==
delim
&&
src
[
k
+
1
]
==
delim
))
{
dst
[
j
]
=
src
[
k
+
1
];
cnt
++
;
j
++
;
k
++
;
continue
;
}
dst
[
j
]
=
src
[
k
];
j
++
;
}
dst
[
j
]
=
'\0'
;
return
j
;
}
static
bool
translateValue
(
STranslateContext
*
pCxt
,
SValueNode
*
pVal
)
{
if
(
pVal
->
isDuration
)
{
char
unit
=
0
;
if
(
parseAbsoluteDuration
(
pVal
->
literal
,
strlen
(
pVal
->
literal
),
&
pVal
->
datum
.
i
,
&
unit
,
pVal
->
node
.
resType
.
precision
)
!=
TSDB_CODE_SUCCESS
)
{
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
pVal
->
literal
);
return
false
;
}
}
else
{
switch
(
pVal
->
node
.
resType
.
type
)
{
case
TSDB_DATA_TYPE_NULL
:
break
;
case
TSDB_DATA_TYPE_BOOL
:
pVal
->
datum
.
b
=
(
0
==
strcasecmp
(
pVal
->
literal
,
"true"
));
break
;
case
TSDB_DATA_TYPE_BIGINT
:
{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
i
=
strtoull
(
pVal
->
literal
,
&
endPtr
,
10
);
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
d
=
strtold
(
pVal
->
literal
,
&
endPtr
);
break
;
}
case
TSDB_DATA_TYPE_BINARY
:
{
int32_t
n
=
strlen
(
pVal
->
literal
);
pVal
->
datum
.
p
=
calloc
(
1
,
n
);
trimStringCopy
(
pVal
->
literal
,
n
,
pVal
->
datum
.
p
);
break
;
}
case
TSDB_DATA_TYPE_TIMESTAMP
:
{
int32_t
n
=
strlen
(
pVal
->
literal
);
char
*
tmp
=
calloc
(
1
,
n
);
int32_t
len
=
trimStringCopy
(
pVal
->
literal
,
n
,
tmp
);
if
(
taosParseTime
(
tmp
,
&
pVal
->
datum
.
u
,
len
,
pVal
->
node
.
resType
.
precision
,
tsDaylight
)
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
tmp
);
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
pVal
->
literal
);
return
false
;
}
tfree
(
tmp
);
break
;
}
default:
break
;
}
}
return
true
;
}
...
...
@@ -440,7 +517,7 @@ static bool translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
if
(
nodesIsArithmeticOp
(
pOp
))
{
if
(
TSDB_DATA_TYPE_JSON
==
ldt
.
type
||
TSDB_DATA_TYPE_BLOB
==
ldt
.
type
||
TSDB_DATA_TYPE_JSON
==
rdt
.
type
||
TSDB_DATA_TYPE_BLOB
==
rdt
.
type
)
{
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR
SER
_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
return
false
;
}
pOp
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_DOUBLE
;
...
...
@@ -449,7 +526,7 @@ static bool translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
}
else
if
(
nodesIsComparisonOp
(
pOp
))
{
if
(
TSDB_DATA_TYPE_JSON
==
ldt
.
type
||
TSDB_DATA_TYPE_BLOB
==
ldt
.
type
||
TSDB_DATA_TYPE_JSON
==
rdt
.
type
||
TSDB_DATA_TYPE_BLOB
==
rdt
.
type
)
{
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR
SER
_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
return
false
;
}
pOp
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_BOOL
;
...
...
@@ -463,6 +540,15 @@ static bool translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
}
static
bool
translateFunction
(
STranslateContext
*
pCxt
,
SFunctionNode
*
pFunc
)
{
int32_t
code
=
fmGetFuncResultType
(
pCxt
->
fmgt
,
pFunc
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
generateSyntaxErrMsg
(
pCxt
,
code
,
pFunc
->
functionName
);
return
false
;
}
if
(
fmIsAggFunc
(
pFunc
->
funcId
)
&&
(
SQL_CLAUSE_FROM
==
pCxt
->
currClause
||
SQL_CLAUSE_WHERE
==
pCxt
->
currClause
))
{
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION
);
return
false
;
}
return
true
;
}
...
...
@@ -504,7 +590,7 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
code
=
catalogGetTableMeta
(
pCxt
->
pParseCxt
->
pCatalog
,
pCxt
->
pParseCxt
->
pTransporter
,
&
(
pCxt
->
pParseCxt
->
mgmtEpSet
),
toName
(
pCxt
->
pParseCxt
->
acctId
,
pRealTable
,
&
name
),
&
(
pRealTable
->
pMeta
));
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR
SER
_TABLE_NOT_EXIST
,
pRealTable
->
table
.
tableName
);
return
generateSyntaxErrMsg
(
pCxt
,
TSDB_CODE_PAR_TABLE_NOT_EXIST
,
pRealTable
->
table
.
tableName
);
}
code
=
addNamespace
(
pCxt
,
pRealTable
);
break
;
...
...
source/libs/parser/test/newParserTest.cpp
浏览文件 @
81735172
...
...
@@ -118,6 +118,53 @@ private:
return
"Unknown Data Type "
+
to_string
(
dt
.
type
);
}
void
valueNodeToStr
(
const
SValueNode
*
pVal
,
string
&
str
,
bool
isProject
)
{
switch
(
pVal
->
node
.
resType
.
type
)
{
case
TSDB_DATA_TYPE_NULL
:
str
.
append
(
"null"
);
break
;
case
TSDB_DATA_TYPE_BOOL
:
str
.
append
(
pVal
->
datum
.
b
?
"true"
:
"false"
);
break
;
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_BIGINT
:
str
.
append
(
to_string
(
pVal
->
datum
.
i
));
break
;
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_DOUBLE
:
str
.
append
(
to_string
(
pVal
->
datum
.
d
));
break
;
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
case
TSDB_DATA_TYPE_VARCHAR
:
case
TSDB_DATA_TYPE_VARBINARY
:
str
.
append
(
pVal
->
datum
.
p
);
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
str
.
append
(
to_string
(
pVal
->
datum
.
u
));
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
case
TSDB_DATA_TYPE_USMALLINT
:
case
TSDB_DATA_TYPE_UINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
str
.
append
(
to_string
(
pVal
->
datum
.
u
));
break
;
case
TSDB_DATA_TYPE_JSON
:
case
TSDB_DATA_TYPE_DECIMAL
:
case
TSDB_DATA_TYPE_BLOB
:
str
.
append
(
"JSON or DECIMAL or BLOB"
);
break
;
default:
break
;
}
str
.
append
(
" ["
+
dataTypeToStr
(
pVal
->
node
.
resType
)
+
"]"
);
if
(
isProject
)
{
str
.
append
(
" AS "
+
string
(
pVal
->
node
.
aliasName
));
}
}
void
nodeToStr
(
const
SNode
*
node
,
string
&
str
,
bool
isProject
)
{
if
(
nullptr
==
node
)
{
return
;
...
...
@@ -142,12 +189,7 @@ private:
break
;
}
case
QUERY_NODE_VALUE
:
{
SValueNode
*
pVal
=
(
SValueNode
*
)
node
;
str
.
append
(
pVal
->
literal
);
str
.
append
(
" ["
+
dataTypeToStr
(
pVal
->
node
.
resType
)
+
"]"
);
if
(
isProject
)
{
str
.
append
(
" AS "
+
string
(
pVal
->
node
.
aliasName
));
}
valueNodeToStr
((
SValueNode
*
)
node
,
str
,
isProject
);
break
;
}
case
QUERY_NODE_OPERATOR
:
{
...
...
@@ -391,10 +433,20 @@ TEST_F(NewParserTest, selectSimple) {
ASSERT_TRUE
(
run
());
}
TEST_F
(
NewParserTest
,
selectConstant
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"SELECT 123, 20.4, 'abc',
\"
wxy
\"
, TIMESTAMP '2022-02-09 17:30:20', true, false, 10s FROM t1"
);
ASSERT_TRUE
(
run
());
bind
(
"SELECT 1234567890123456789012345678901234567890, 20.1234567890123456789012345678901234567890, 'abc',
\"
wxy
\"
, TIMESTAMP '2022-02-09 17:30:20', true, false, 15s FROM t1"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
NewParserTest
,
selectExpression
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"SELECT
c1 + 10, c2
FROM t1"
);
bind
(
"SELECT
ts + 10s, c1 + 10, concat(c2, 'abc')
FROM t1"
);
ASSERT_TRUE
(
run
());
}
...
...
source/libs/transport/src/transCli.c
浏览文件 @
81735172
...
...
@@ -123,11 +123,13 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg
.
code
=
pHead
->
code
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
tDebug
(
"client conn %p %s received from %s:%d"
,
conn
,
TMSG_INFO
(
pHead
->
msgType
),
pMsg
->
ctx
->
ip
,
pMsg
->
ctx
->
port
);
if
(
pCtx
->
pSem
==
NULL
)
{
t
Debug
(
"client conn %p handle resp,
"
,
conn
);
t
Trace
(
"client conn(sync) %p handle resp
"
,
conn
);
(
pRpc
->
cfp
)(
pRpc
->
parent
,
&
rpcMsg
,
NULL
);
}
else
{
t
Debug
(
"client conn(sync) %p handle resp"
,
conn
);
t
Trace
(
"client conn(sync) %p handle resp"
,
conn
);
memcpy
((
char
*
)
pCtx
->
pRsp
,
(
char
*
)
&
rpcMsg
,
sizeof
(
rpcMsg
));
tsem_post
(
pCtx
->
pSem
);
}
...
...
@@ -370,7 +372,7 @@ static void clientWrite(SCliConn* pConn) {
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
tDebug
(
"c
lient conn %p data write out, msgType : %s, len: %d"
,
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
msgLen
);
tDebug
(
"c
onn %p %s is send to %s:%d"
,
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
pCliMsg
->
ctx
->
ip
,
pCliMsg
->
ctx
->
port
);
uv_write
(
pConn
->
writeReq
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
clientWriteCb
);
}
static
void
clientConnCb
(
uv_connect_t
*
req
,
int
status
)
{
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
81735172
...
...
@@ -33,7 +33,7 @@ typedef struct SSrvConn {
void
*
hostThrd
;
void
*
pSrvMsg
;
struct
sockaddr
peername
;
struct
sockaddr
_in
addr
;
// SRpcMsg sendMsg;
// del later
...
...
@@ -236,14 +236,6 @@ static void uvHandleReq(SSrvConn* pConn) {
assert
(
transIsReq
(
pHead
->
msgType
));
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
p
->
shandle
;
// auth here
// auth should not do in rpc thread
// int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
// if (code != 0) {
// terrno = code;
// return;
//}
pHead
->
code
=
htonl
(
pHead
->
code
);
int32_t
dlen
=
0
;
...
...
@@ -266,7 +258,8 @@ static void uvHandleReq(SSrvConn* pConn) {
transClearBuffer
(
&
pConn
->
readBuf
);
pConn
->
ref
++
;
tDebug
(
"%s received on %p"
,
TMSG_INFO
(
rpcMsg
.
msgType
),
pConn
);
tDebug
(
"%p %s received from %s:%d"
,
pConn
,
TMSG_INFO
(
rpcMsg
.
msgType
),
inet_ntoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
));
(
*
(
pRpc
->
cfp
))(
pRpc
->
parent
,
&
rpcMsg
,
NULL
);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
...
...
@@ -279,12 +272,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
t
Debug
(
"conn %p read summary, total read: %d, current read: %d"
,
conn
,
pBuf
->
len
,
(
int
)
nread
);
t
Trace
(
"conn %p read summary, total read: %d, current read: %d"
,
conn
,
pBuf
->
len
,
(
int
)
nread
);
if
(
readComplete
(
pBuf
))
{
t
Debug
(
"conn %p alread read complete packet"
,
conn
);
t
Trace
(
"conn %p alread read complete packet"
,
conn
);
uvHandleReq
(
conn
);
}
else
{
t
Debug
(
"conn %p read partial packet, continue to read"
,
conn
);
t
Trace
(
"conn %p read partial packet, continue to read"
,
conn
);
}
return
;
}
...
...
@@ -339,6 +332,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later;
tDebug
(
"conn %p prepare to send resp"
,
smsg
->
pConn
);
SRpcMsg
*
pMsg
=
&
smsg
->
msg
;
SSrvConn
*
pConn
=
smsg
->
pConn
;
if
(
pMsg
->
pCont
==
0
)
{
pMsg
->
pCont
=
(
void
*
)
rpcMallocCont
(
0
);
pMsg
->
contLen
=
0
;
...
...
@@ -351,6 +345,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
if
(
transCompressMsg
(
msg
,
len
,
NULL
))
{
// impl later
}
tDebug
(
"%p start to send %s to %s:%d"
,
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
inet_ntoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
));
pHead
->
msgLen
=
htonl
(
len
);
wb
->
base
=
msg
;
wb
->
len
=
len
;
...
...
@@ -490,8 +487,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_os_fd_t
fd
;
uv_fileno
((
const
uv_handle_t
*
)
pConn
->
pTcp
,
&
fd
);
tDebug
(
"conn %p created, fd: %d"
,
pConn
,
fd
);
int
namelen
=
sizeof
(
pConn
->
peername
);
if
(
0
!=
uv_tcp_getpeername
(
pConn
->
pTcp
,
&
pConn
->
peername
,
&
name
len
))
{
int
addrlen
=
sizeof
(
pConn
->
addr
);
if
(
0
!=
uv_tcp_getpeername
(
pConn
->
pTcp
,
(
struct
sockaddr
*
)
&
pConn
->
addr
,
&
addr
len
))
{
tError
(
"failed to get peer name"
);
destroyConn
(
pConn
,
true
);
}
else
{
...
...
@@ -732,18 +729,18 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
// pthread_mutex_unlock(&pThrd->msgMtx);
t
Debug
(
"conn %p start to send resp"
,
pConn
);
t
Trace
(
"conn %p start to send resp"
,
pConn
);
transSendAsync
(
pThrd
->
asyncPool
,
&
srvMsg
->
q
);
// uv_async_send(pThrd->workerAsync);
}
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
SSrvConn
*
pConn
=
thandle
;
struct
sockaddr
*
pPeerName
=
&
pConn
->
peername
;
//
struct sockaddr* pPeerName = &pConn->peername;
struct
sockaddr_in
caddr
=
*
(
struct
sockaddr_in
*
)(
pPeerName
)
;
pInfo
->
clientIp
=
(
uint32_t
)(
c
addr
.
sin_addr
.
s_addr
);
pInfo
->
clientPort
=
ntohs
(
c
addr
.
sin_port
);
struct
sockaddr_in
addr
=
pConn
->
addr
;
pInfo
->
clientIp
=
(
uint32_t
)(
addr
.
sin_addr
.
s_addr
);
pInfo
->
clientPort
=
ntohs
(
addr
.
sin_port
);
tstrncpy
(
pInfo
->
user
,
pConn
->
user
,
sizeof
(
pInfo
->
user
));
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录