Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
131f87ff
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
131f87ff
编写于
7月 14, 2022
作者:
wmmhello
提交者:
GitHub
7月 14, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14778 from taosdata/feature/TD-13041
feat: get meta and write meta from tmq
上级
5ecf59d8
e9accd26
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
660 addition
and
311 deletion
+660
-311
examples/c/tmq.c
examples/c/tmq.c
+58
-10
include/client/taos.h
include/client/taos.h
+3
-1
include/common/tcommon.h
include/common/tcommon.h
+0
-1
include/common/tdataformat.h
include/common/tdataformat.h
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+12
-4
include/libs/parser/parser.h
include/libs/parser/parser.h
+3
-3
include/util/tencode.h
include/util/tencode.h
+2
-2
source/client/src/clientSml.c
source/client/src/clientSml.c
+1
-1
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+6
-5
source/client/src/tmq.c
source/client/src/tmq.c
+350
-212
source/common/src/tmsg.c
source/common/src/tmsg.c
+33
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+101
-37
source/dnode/mnode/sdb/inc/sdb.h
source/dnode/mnode/sdb/inc/sdb.h
+1
-0
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+5
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-1
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+1
-0
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+8
-4
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+45
-17
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+28
-13
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+1
-0
未找到文件。
examples/c/tmq.c
浏览文件 @
131f87ff
...
@@ -30,21 +30,35 @@ static void msg_process(TAOS_RES* msg) {
...
@@ -30,21 +30,35 @@ static void msg_process(TAOS_RES* msg) {
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
tmq_raw_data
*
raw
=
tmq_get_raw_meta
(
msg
);
tmq_raw_data
*
raw
=
tmq_get_raw_meta
(
msg
);
if
(
raw
){
if
(
raw
){
TAOS
*
pConn
=
taos_connect
(
"192.168.1.86"
,
"root"
,
"taosdata"
,
"abc1"
,
0
);
TAOS
*
pConn
=
taos_connect
(
"192.168.1.86"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
if
(
pConn
==
NULL
)
{
return
;
return
;
}
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 5"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
;
}
taos_free_result
(
pRes
);
int32_t
ret
=
taos_write_raw_meta
(
pConn
,
raw
);
int32_t
ret
=
taos_write_raw_meta
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
free
(
raw
);
taos_close
(
pConn
);
taos_close
(
pConn
);
}
}
tmq_free_raw_meta
(
raw
);
char
*
result
=
tmq_get_json_meta
(
msg
);
char
*
result
=
tmq_get_json_meta
(
msg
);
if
(
result
){
if
(
result
){
printf
(
"meta result: %s
\n
"
,
result
);
printf
(
"meta result: %s
\n
"
,
result
);
free
(
result
);
}
}
printf
(
"meta:%p
\n
"
,
raw
);
tmq_free_json_meta
(
result
);
return
;
return
;
}
}
while
(
1
)
{
while
(
1
)
{
...
@@ -68,7 +82,7 @@ int32_t init_env() {
...
@@ -68,7 +82,7 @@ int32_t init_env() {
return
-
1
;
return
-
1
;
}
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups
1
"
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups
5
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
return
-
1
;
...
@@ -82,15 +96,14 @@ int32_t init_env() {
...
@@ -82,15 +96,14 @@ int32_t init_env() {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
pRes
=
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)"
);
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
return
-
1
;
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct0 using st1 tags(1000)"
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct0 using st1 tags(1000
,
\"
ttt
\"
, true
)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
return
-
1
;
...
@@ -104,13 +117,20 @@ int32_t init_env() {
...
@@ -104,13 +117,20 @@ int32_t init_env() {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1 tags(2000)"
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1
(t1)
tags(2000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
return
-
1
;
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct2 using st1(t1) tags(NULL)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(now, 3, 4, 'b')"
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(now, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
@@ -118,7 +138,7 @@ int32_t init_env() {
...
@@ -118,7 +138,7 @@ int32_t init_env() {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1 tags(3000)"
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1
(t1)
tags(3000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
return
-
1
;
...
@@ -202,6 +222,13 @@ int32_t init_env() {
...
@@ -202,6 +222,13 @@ int32_t init_env() {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 comment 'hello'"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 drop column c1"
);
pRes
=
taos_query
(
pConn
,
"alter table n1 drop column c1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
@@ -230,6 +257,27 @@ int32_t init_env() {
...
@@ -230,6 +257,27 @@ int32_t init_env() {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt2 using jt tags('')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
return
0
;
return
0
;
}
}
...
...
include/client/taos.h
浏览文件 @
131f87ff
...
@@ -265,7 +265,9 @@ typedef struct tmq_raw_data tmq_raw_data;
...
@@ -265,7 +265,9 @@ typedef struct tmq_raw_data tmq_raw_data;
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
tmq_raw_data
*
tmq_get_raw_meta
(
TAOS_RES
*
res
);
DLL_EXPORT
tmq_raw_data
*
tmq_get_raw_meta
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
taos_write_raw_meta
(
TAOS
*
taos
,
tmq_raw_data
*
raw_meta
);
DLL_EXPORT
int32_t
taos_write_raw_meta
(
TAOS
*
taos
,
tmq_raw_data
*
raw_meta
);
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
// Returning null means error. Returned result need to be freed.
DLL_EXPORT
void
tmq_free_raw_meta
(
tmq_raw_data
*
rawMeta
);
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT
void
tmq_free_json_meta
(
char
*
jsonMeta
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_db_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_db_name
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_vgroup_id
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_vgroup_id
(
TAOS_RES
*
res
);
...
...
include/common/tcommon.h
浏览文件 @
131f87ff
...
@@ -40,7 +40,6 @@ enum {
...
@@ -40,7 +40,6 @@ enum {
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
)
// clang-format on
// clang-format on
...
...
include/common/tdataformat.h
浏览文件 @
131f87ff
...
@@ -171,6 +171,7 @@ struct SColVal {
...
@@ -171,6 +171,7 @@ struct SColVal {
#pragma pack(push, 1)
#pragma pack(push, 1)
struct
STagVal
{
struct
STagVal
{
// char colName[TSDB_COL_NAME_LEN]; // only used for tmq_get_meta
union
{
union
{
int16_t
cid
;
int16_t
cid
;
char
*
pKey
;
char
*
pKey
;
...
...
include/common/tmsg.h
浏览文件 @
131f87ff
...
@@ -1939,6 +1939,8 @@ typedef struct SVCreateStbReq {
...
@@ -1939,6 +1939,8 @@ typedef struct SVCreateStbReq {
SSchemaWrapper
schemaRow
;
SSchemaWrapper
schemaRow
;
SSchemaWrapper
schemaTag
;
SSchemaWrapper
schemaTag
;
SRSmaParam
rsmaParam
;
SRSmaParam
rsmaParam
;
int32_t
alterOriDataLen
;
void
*
alterOriData
;
}
SVCreateStbReq
;
}
SVCreateStbReq
;
int
tEncodeSVCreateStbReq
(
SEncoder
*
pCoder
,
const
SVCreateStbReq
*
pReq
);
int
tEncodeSVCreateStbReq
(
SEncoder
*
pCoder
,
const
SVCreateStbReq
*
pReq
);
...
@@ -1966,7 +1968,9 @@ typedef struct SVCreateTbReq {
...
@@ -1966,7 +1968,9 @@ typedef struct SVCreateTbReq {
int8_t
type
;
int8_t
type
;
union
{
union
{
struct
{
struct
{
char
*
name
;
// super table name
tb_uid_t
suid
;
tb_uid_t
suid
;
SArray
*
tagName
;
uint8_t
*
pTag
;
uint8_t
*
pTag
;
}
ctb
;
}
ctb
;
struct
{
struct
{
...
@@ -1983,6 +1987,9 @@ static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
...
@@ -1983,6 +1987,9 @@ static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
taosMemoryFreeClear
(
req
->
comment
);
taosMemoryFreeClear
(
req
->
comment
);
if
(
req
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
req
->
type
==
TSDB_CHILD_TABLE
)
{
taosMemoryFreeClear
(
req
->
ctb
.
pTag
);
taosMemoryFreeClear
(
req
->
ctb
.
pTag
);
taosMemoryFreeClear
(
req
->
ctb
.
name
);
taosArrayDestroy
(
req
->
ctb
.
tagName
);
req
->
ctb
.
tagName
=
NULL
;
}
else
if
(
req
->
type
==
TSDB_NORMAL_TABLE
)
{
}
else
if
(
req
->
type
==
TSDB_NORMAL_TABLE
)
{
taosMemoryFreeClear
(
req
->
ntb
.
schemaRow
.
pSchema
);
taosMemoryFreeClear
(
req
->
ntb
.
schemaRow
.
pSchema
);
}
}
...
@@ -2066,12 +2073,14 @@ typedef struct {
...
@@ -2066,12 +2073,14 @@ typedef struct {
int32_t
bytes
;
int32_t
bytes
;
// TSDB_ALTER_TABLE_DROP_COLUMN
// TSDB_ALTER_TABLE_DROP_COLUMN
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
int8_t
colModType
;
int32_t
colModBytes
;
int32_t
colModBytes
;
// TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
// TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
char
*
colNewName
;
char
*
colNewName
;
// TSDB_ALTER_TABLE_UPDATE_TAG_VAL
// TSDB_ALTER_TABLE_UPDATE_TAG_VAL
char
*
tagName
;
char
*
tagName
;
int8_t
isNull
;
int8_t
isNull
;
int8_t
tagType
;
uint32_t
nTagVal
;
uint32_t
nTagVal
;
uint8_t
*
pTagVal
;
uint8_t
*
pTagVal
;
// TSDB_ALTER_TABLE_UPDATE_OPTIONS
// TSDB_ALTER_TABLE_UPDATE_OPTIONS
...
@@ -2858,8 +2867,8 @@ typedef struct {
...
@@ -2858,8 +2867,8 @@ typedef struct {
static
FORCE_INLINE
int32_t
tEncodeSMqMetaRsp
(
void
**
buf
,
const
SMqMetaRsp
*
pRsp
)
{
static
FORCE_INLINE
int32_t
tEncodeSMqMetaRsp
(
void
**
buf
,
const
SMqMetaRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
//
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
reqOffset
);
//
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
rspOffset
);
tlen
+=
taosEncodeFixedI16
(
buf
,
pRsp
->
resMsgType
);
tlen
+=
taosEncodeFixedI16
(
buf
,
pRsp
->
resMsgType
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
metaRspLen
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
metaRspLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
);
...
@@ -2867,8 +2876,7 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
...
@@ -2867,8 +2876,7 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
}
}
static
FORCE_INLINE
void
*
tDecodeSMqMetaRsp
(
const
void
*
buf
,
SMqMetaRsp
*
pRsp
)
{
static
FORCE_INLINE
void
*
tDecodeSMqMetaRsp
(
const
void
*
buf
,
SMqMetaRsp
*
pRsp
)
{
// buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
// buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf
=
taosDecodeFixedI16
(
buf
,
&
pRsp
->
resMsgType
);
buf
=
taosDecodeFixedI16
(
buf
,
&
pRsp
->
resMsgType
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
metaRspLen
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
metaRspLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
);
...
...
include/libs/parser/parser.h
浏览文件 @
131f87ff
...
@@ -29,7 +29,7 @@ struct SMetaData;
...
@@ -29,7 +29,7 @@ struct SMetaData;
typedef
struct
SStmtCallback
{
typedef
struct
SStmtCallback
{
TAOS_STMT
*
pStmt
;
TAOS_STMT
*
pStmt
;
int32_t
(
*
getTbNameFn
)(
TAOS_STMT
*
,
char
**
);
int32_t
(
*
getTbNameFn
)(
TAOS_STMT
*
,
char
**
);
int32_t
(
*
setInfoFn
)(
TAOS_STMT
*
,
STableMeta
*
,
void
*
,
char
*
,
bool
,
SHashObj
*
,
SHashObj
*
);
int32_t
(
*
setInfoFn
)(
TAOS_STMT
*
,
STableMeta
*
,
void
*
,
char
*
,
bool
,
SHashObj
*
,
SHashObj
*
,
const
char
*
);
int32_t
(
*
getExecInfoFn
)(
TAOS_STMT
*
,
SHashObj
**
,
SHashObj
**
);
int32_t
(
*
getExecInfoFn
)(
TAOS_STMT
*
,
SHashObj
**
,
SHashObj
**
);
}
SStmtCallback
;
}
SStmtCallback
;
...
@@ -84,7 +84,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
...
@@ -84,7 +84,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
int32_t
rowNum
);
int32_t
rowNum
);
int32_t
qBuildStmtColFields
(
void
*
pDataBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
int32_t
qBuildStmtColFields
(
void
*
pDataBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
char
*
tName
,
TAOS_MULTI_BIND
*
bind
,
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
c
onst
char
*
sTableName
,
c
har
*
tName
,
TAOS_MULTI_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
);
char
*
msgBuf
,
int32_t
msgBufLen
);
void
destroyBoundColumnInfo
(
void
*
pBoundInfo
);
void
destroyBoundColumnInfo
(
void
*
pBoundInfo
);
int32_t
qCreateSName
(
SName
*
pName
,
const
char
*
pTableName
,
int32_t
acctId
,
char
*
dbName
,
char
*
msgBuf
,
int32_t
qCreateSName
(
SName
*
pName
,
const
char
*
pTableName
,
int32_t
acctId
,
char
*
dbName
,
char
*
msgBuf
,
...
@@ -93,7 +93,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
...
@@ -93,7 +93,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void
*
smlInitHandle
(
SQuery
*
pQuery
);
void
*
smlInitHandle
(
SQuery
*
pQuery
);
void
smlDestroyHandle
(
void
*
pHandle
);
void
smlDestroyHandle
(
void
*
pHandle
);
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
);
char
*
tableName
,
c
onst
char
*
sTableName
,
int32_t
sTableNameLen
,
c
har
*
msgBuf
,
int16_t
msgBufLen
);
int32_t
smlBuildOutput
(
void
*
handle
,
SHashObj
*
pVgHash
);
int32_t
smlBuildOutput
(
void
*
handle
,
SHashObj
*
pVgHash
);
int32_t
rewriteToVnodeModifyOpStmt
(
SQuery
*
pQuery
,
SArray
*
pBufArray
);
int32_t
rewriteToVnodeModifyOpStmt
(
SQuery
*
pQuery
,
SArray
*
pBufArray
);
...
...
include/util/tencode.h
浏览文件 @
131f87ff
...
@@ -440,7 +440,7 @@ static FORCE_INLINE bool tDecodeIsEnd(SDecoder* pCoder) { return (pCoder->size =
...
@@ -440,7 +440,7 @@ static FORCE_INLINE bool tDecodeIsEnd(SDecoder* pCoder) { return (pCoder->size =
static
FORCE_INLINE
void
*
tEncoderMalloc
(
SEncoder
*
pCoder
,
int32_t
size
)
{
static
FORCE_INLINE
void
*
tEncoderMalloc
(
SEncoder
*
pCoder
,
int32_t
size
)
{
void
*
p
=
NULL
;
void
*
p
=
NULL
;
SCoderMem
*
pMem
=
(
SCoderMem
*
)
taosMemory
Malloc
(
sizeof
(
*
pMem
)
+
size
);
SCoderMem
*
pMem
=
(
SCoderMem
*
)
taosMemory
Calloc
(
1
,
sizeof
(
*
pMem
)
+
size
);
if
(
pMem
)
{
if
(
pMem
)
{
pMem
->
next
=
pCoder
->
mList
;
pMem
->
next
=
pCoder
->
mList
;
pCoder
->
mList
=
pMem
;
pCoder
->
mList
=
pMem
;
...
@@ -451,7 +451,7 @@ static FORCE_INLINE void* tEncoderMalloc(SEncoder* pCoder, int32_t size) {
...
@@ -451,7 +451,7 @@ static FORCE_INLINE void* tEncoderMalloc(SEncoder* pCoder, int32_t size) {
static
FORCE_INLINE
void
*
tDecoderMalloc
(
SDecoder
*
pCoder
,
int32_t
size
)
{
static
FORCE_INLINE
void
*
tDecoderMalloc
(
SDecoder
*
pCoder
,
int32_t
size
)
{
void
*
p
=
NULL
;
void
*
p
=
NULL
;
SCoderMem
*
pMem
=
(
SCoderMem
*
)
taosMemory
Malloc
(
sizeof
(
*
pMem
)
+
size
);
SCoderMem
*
pMem
=
(
SCoderMem
*
)
taosMemory
Calloc
(
1
,
sizeof
(
*
pMem
)
+
size
);
if
(
pMem
)
{
if
(
pMem
)
{
pMem
->
next
=
pCoder
->
mList
;
pMem
->
next
=
pCoder
->
mList
;
pCoder
->
mList
=
pMem
;
pCoder
->
mList
=
pMem
;
...
...
source/client/src/clientSml.c
浏览文件 @
131f87ff
...
@@ -2256,7 +2256,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
...
@@ -2256,7 +2256,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
(
*
pMeta
)
->
tableMeta
->
uid
=
tableData
->
uid
;
// one table merge data block together according uid
(
*
pMeta
)
->
tableMeta
->
uid
=
tableData
->
uid
;
// one table merge data block together according uid
code
=
smlBindData
(
info
->
exec
,
tableData
->
tags
,
(
*
pMeta
)
->
cols
,
tableData
->
cols
,
info
->
dataFormat
,
code
=
smlBindData
(
info
->
exec
,
tableData
->
tags
,
(
*
pMeta
)
->
cols
,
tableData
->
cols
,
info
->
dataFormat
,
(
*
pMeta
)
->
tableMeta
,
tableData
->
childTableName
,
info
->
msgBuf
.
buf
,
info
->
msgBuf
.
len
);
(
*
pMeta
)
->
tableMeta
,
tableData
->
childTableName
,
tableData
->
sTableName
,
tableData
->
sTableNameLen
,
info
->
msgBuf
.
buf
,
info
->
msgBuf
.
len
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" smlBindData failed"
,
info
->
id
);
uError
(
"SML:0x%"
PRIx64
" smlBindData failed"
,
info
->
id
);
return
code
;
return
code
;
...
...
source/client/src/clientStmt.c
浏览文件 @
131f87ff
...
@@ -136,7 +136,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
...
@@ -136,7 +136,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
stmtUpdateBindInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbFName
)
{
int32_t
stmtUpdateBindInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbFName
,
const
char
*
sTableName
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
strncpy
(
pStmt
->
bInfo
.
tbFName
,
tbFName
,
sizeof
(
pStmt
->
bInfo
.
tbFName
)
-
1
);
strncpy
(
pStmt
->
bInfo
.
tbFName
,
tbFName
,
sizeof
(
pStmt
->
bInfo
.
tbFName
)
-
1
);
...
@@ -147,6 +147,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
...
@@ -147,6 +147,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
pStmt
->
bInfo
.
tbType
=
pTableMeta
->
tableType
;
pStmt
->
bInfo
.
tbType
=
pTableMeta
->
tableType
;
pStmt
->
bInfo
.
boundTags
=
tags
;
pStmt
->
bInfo
.
boundTags
=
tags
;
pStmt
->
bInfo
.
tagsCached
=
false
;
pStmt
->
bInfo
.
tagsCached
=
false
;
strcpy
(
pStmt
->
bInfo
.
stbFName
,
sTableName
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -162,10 +163,10 @@ int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockH
...
@@ -162,10 +163,10 @@ int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockH
}
}
int32_t
stmtUpdateInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbFName
,
bool
autoCreateTbl
,
int32_t
stmtUpdateInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbFName
,
bool
autoCreateTbl
,
SHashObj
*
pVgHash
,
SHashObj
*
pBlockHash
)
{
SHashObj
*
pVgHash
,
SHashObj
*
pBlockHash
,
const
char
*
sTableName
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_ERR_RET
(
stmtUpdateBindInfo
(
stmt
,
pTableMeta
,
tags
,
tbFName
));
STMT_ERR_RET
(
stmtUpdateBindInfo
(
stmt
,
pTableMeta
,
tags
,
tbFName
,
sTableName
));
STMT_ERR_RET
(
stmtUpdateExecInfo
(
stmt
,
pVgHash
,
pBlockHash
,
autoCreateTbl
));
STMT_ERR_RET
(
stmtUpdateExecInfo
(
stmt
,
pVgHash
,
pBlockHash
,
autoCreateTbl
));
pStmt
->
sql
.
autoCreateTbl
=
autoCreateTbl
;
pStmt
->
sql
.
autoCreateTbl
=
autoCreateTbl
;
...
@@ -253,7 +254,7 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
...
@@ -253,7 +254,7 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
destroyBoundColumnInfo
(
pStmt
->
bInfo
.
boundTags
);
destroyBoundColumnInfo
(
pStmt
->
bInfo
.
boundTags
);
taosMemoryFreeClear
(
pStmt
->
bInfo
.
boundTags
);
taosMemoryFreeClear
(
pStmt
->
bInfo
.
boundTags
);
}
}
memset
(
pStmt
->
bInfo
.
stbFName
,
0
,
TSDB_TABLE_FNAME_LEN
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -592,7 +593,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
...
@@ -592,7 +593,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
}
}
tscDebug
(
"start to bind stmt tag values"
);
tscDebug
(
"start to bind stmt tag values"
);
STMT_ERR_RET
(
qBindStmtTagsValue
(
*
pDataBlock
,
pStmt
->
bInfo
.
boundTags
,
pStmt
->
bInfo
.
tbSuid
,
pStmt
->
bInfo
.
sname
.
tname
,
STMT_ERR_RET
(
qBindStmtTagsValue
(
*
pDataBlock
,
pStmt
->
bInfo
.
boundTags
,
pStmt
->
bInfo
.
tbSuid
,
pStmt
->
bInfo
.
s
tbFName
,
pStmt
->
bInfo
.
s
name
.
tname
,
tags
,
pStmt
->
exec
.
pRequest
->
msgBuf
,
pStmt
->
exec
.
pRequest
->
msgBufLen
));
tags
,
pStmt
->
exec
.
pRequest
->
msgBuf
,
pStmt
->
exec
.
pRequest
->
msgBufLen
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
...
source/client/src/tmq.c
浏览文件 @
131f87ff
此差异已折叠。
点击以展开。
source/common/src/tmsg.c
浏览文件 @
131f87ff
...
@@ -4871,6 +4871,11 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
...
@@ -4871,6 +4871,11 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
if
(
tEncodeSRSmaParam
(
pCoder
,
&
pReq
->
rsmaParam
)
<
0
)
return
-
1
;
if
(
tEncodeSRSmaParam
(
pCoder
,
&
pReq
->
rsmaParam
)
<
0
)
return
-
1
;
}
}
if
(
tEncodeI32
(
pCoder
,
pReq
->
alterOriDataLen
)
<
0
)
return
-
1
;
if
(
pReq
->
alterOriDataLen
>
0
)
{
if
(
tEncodeBinary
(
pCoder
,
pReq
->
alterOriData
,
pReq
->
alterOriDataLen
)
<
0
)
return
-
1
;
}
tEndEncode
(
pCoder
);
tEndEncode
(
pCoder
);
return
0
;
return
0
;
}
}
...
@@ -4887,6 +4892,11 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
...
@@ -4887,6 +4892,11 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
if
(
tDecodeSRSmaParam
(
pCoder
,
&
pReq
->
rsmaParam
)
<
0
)
return
-
1
;
if
(
tDecodeSRSmaParam
(
pCoder
,
&
pReq
->
rsmaParam
)
<
0
)
return
-
1
;
}
}
if
(
tDecodeI32
(
pCoder
,
&
pReq
->
alterOriDataLen
)
<
0
)
return
-
1
;
if
(
pReq
->
alterOriDataLen
>
0
)
{
if
(
tDecodeBinary
(
pCoder
,
(
uint8_t
**
)
&
pReq
->
alterOriData
,
NULL
)
<
0
)
return
-
1
;
}
tEndDecode
(
pCoder
);
tEndDecode
(
pCoder
);
return
0
;
return
0
;
}
}
...
@@ -4930,8 +4940,15 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
...
@@ -4930,8 +4940,15 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
}
}
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
tEncodeCStr
(
pCoder
,
pReq
->
ctb
.
name
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tEncodeTag
(
pCoder
,
(
const
STag
*
)
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
if
(
tEncodeTag
(
pCoder
,
(
const
STag
*
)
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
int32_t
len
=
taosArrayGetSize
(
pReq
->
ctb
.
tagName
);
if
(
tEncodeI32
(
pCoder
,
len
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
len
;
i
++
){
char
*
name
=
taosArrayGet
(
pReq
->
ctb
.
tagName
,
i
);
if
(
tEncodeCStr
(
pCoder
,
name
)
<
0
)
return
-
1
;
}
}
else
if
(
pReq
->
type
==
TSDB_NORMAL_TABLE
)
{
}
else
if
(
pReq
->
type
==
TSDB_NORMAL_TABLE
)
{
if
(
tEncodeSSchemaWrapper
(
pCoder
,
&
pReq
->
ntb
.
schemaRow
)
<
0
)
return
-
1
;
if
(
tEncodeSSchemaWrapper
(
pCoder
,
&
pReq
->
ntb
.
schemaRow
)
<
0
)
return
-
1
;
}
else
{
}
else
{
...
@@ -4959,8 +4976,20 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
...
@@ -4959,8 +4976,20 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
}
}
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
tDecodeCStr
(
pCoder
,
&
pReq
->
ctb
.
name
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tDecodeTag
(
pCoder
,
(
STag
**
)
&
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
if
(
tDecodeTag
(
pCoder
,
(
STag
**
)
&
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
int32_t
len
=
0
;
if
(
tDecodeI32
(
pCoder
,
&
len
)
<
0
)
return
-
1
;
pReq
->
ctb
.
tagName
=
taosArrayInit
(
len
,
TSDB_COL_NAME_LEN
);
if
(
pReq
->
ctb
.
tagName
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
len
;
i
++
){
char
name
[
TSDB_COL_NAME_LEN
]
=
{
0
};
char
*
tmp
=
NULL
;
if
(
tDecodeCStr
(
pCoder
,
&
tmp
)
<
0
)
return
-
1
;
strcpy
(
name
,
tmp
);
taosArrayPush
(
pReq
->
ctb
.
tagName
,
name
);
}
}
else
if
(
pReq
->
type
==
TSDB_NORMAL_TABLE
)
{
}
else
if
(
pReq
->
type
==
TSDB_NORMAL_TABLE
)
{
if
(
tDecodeSSchemaWrapperEx
(
pCoder
,
&
pReq
->
ntb
.
schemaRow
)
<
0
)
return
-
1
;
if
(
tDecodeSSchemaWrapperEx
(
pCoder
,
&
pReq
->
ntb
.
schemaRow
)
<
0
)
return
-
1
;
}
else
{
}
else
{
...
@@ -5292,6 +5321,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
...
@@ -5292,6 +5321,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
break
;
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:
if
(
tEncodeCStr
(
pEncoder
,
pReq
->
colName
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pReq
->
colName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pReq
->
colModType
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pEncoder
,
pReq
->
colModBytes
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pEncoder
,
pReq
->
colModBytes
)
<
0
)
return
-
1
;
break
;
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
:
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
:
...
@@ -5301,6 +5331,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
...
@@ -5301,6 +5331,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
case
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
:
case
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
:
if
(
tEncodeCStr
(
pEncoder
,
pReq
->
tagName
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pReq
->
tagName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pReq
->
isNull
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pReq
->
isNull
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pReq
->
tagType
)
<
0
)
return
-
1
;
if
(
!
pReq
->
isNull
)
{
if
(
!
pReq
->
isNull
)
{
if
(
tEncodeBinary
(
pEncoder
,
pReq
->
pTagVal
,
pReq
->
nTagVal
)
<
0
)
return
-
1
;
if
(
tEncodeBinary
(
pEncoder
,
pReq
->
pTagVal
,
pReq
->
nTagVal
)
<
0
)
return
-
1
;
}
}
...
@@ -5340,6 +5371,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
...
@@ -5340,6 +5371,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
break
;
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:
if
(
tDecodeCStr
(
pDecoder
,
&
pReq
->
colName
)
<
0
)
return
-
1
;
if
(
tDecodeCStr
(
pDecoder
,
&
pReq
->
colName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
colModType
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pReq
->
colModBytes
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pReq
->
colModBytes
)
<
0
)
return
-
1
;
break
;
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
:
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
:
...
@@ -5349,6 +5381,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
...
@@ -5349,6 +5381,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
case
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
:
case
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
:
if
(
tDecodeCStr
(
pDecoder
,
&
pReq
->
tagName
)
<
0
)
return
-
1
;
if
(
tDecodeCStr
(
pDecoder
,
&
pReq
->
tagName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
isNull
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
isNull
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
tagType
)
<
0
)
return
-
1
;
if
(
!
pReq
->
isNull
)
{
if
(
!
pReq
->
isNull
)
{
if
(
tDecodeBinary
(
pDecoder
,
&
pReq
->
pTagVal
,
&
pReq
->
nTagVal
)
<
0
)
return
-
1
;
if
(
tDecodeBinary
(
pDecoder
,
&
pReq
->
pTagVal
,
&
pReq
->
nTagVal
)
<
0
)
return
-
1
;
}
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
131f87ff
...
@@ -45,7 +45,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
...
@@ -45,7 +45,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static
int32_t
mndRetrieveStb
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
int32_t
mndRetrieveStb
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextStb
(
SMnode
*
pMnode
,
void
*
pIter
);
static
void
mndCancelGetNextStb
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessTableCfgReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessTableCfgReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndAlterStbImp
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SStbObj
*
pStb
,
bool
needRsp
);
static
int32_t
mndAlterStbImp
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SStbObj
*
pStb
,
bool
needRsp
,
void
*
alterOriData
,
int32_t
alterOriDataLen
);
int32_t
mndInitStb
(
SMnode
*
pMnode
)
{
int32_t
mndInitStb
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{
SSdbTable
table
=
{
...
@@ -409,7 +409,7 @@ static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *
...
@@ -409,7 +409,7 @@ static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *
return
0
;
return
0
;
}
}
static
void
*
mndBuildVCreateStbReq
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SStbObj
*
pStb
,
int32_t
*
pContLen
)
{
static
void
*
mndBuildVCreateStbReq
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SStbObj
*
pStb
,
int32_t
*
pContLen
,
void
*
alterOriData
,
int32_t
alterOriDataLen
)
{
SEncoder
encoder
=
{
0
};
SEncoder
encoder
=
{
0
};
int32_t
contLen
;
int32_t
contLen
;
SName
name
=
{
0
};
SName
name
=
{
0
};
...
@@ -422,6 +422,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
...
@@ -422,6 +422,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req
.
name
=
(
char
*
)
tNameGetTableName
(
&
name
);
req
.
name
=
(
char
*
)
tNameGetTableName
(
&
name
);
req
.
suid
=
pStb
->
uid
;
req
.
suid
=
pStb
->
uid
;
req
.
rollup
=
pStb
->
ast1Len
>
0
?
1
:
0
;
req
.
rollup
=
pStb
->
ast1Len
>
0
?
1
:
0
;
req
.
alterOriData
=
alterOriData
;
req
.
alterOriDataLen
=
alterOriDataLen
;
// todo
// todo
req
.
schemaRow
.
nCols
=
pStb
->
numOfColumns
;
req
.
schemaRow
.
nCols
=
pStb
->
numOfColumns
;
req
.
schemaRow
.
version
=
pStb
->
colVer
;
req
.
schemaRow
.
version
=
pStb
->
colVer
;
...
@@ -626,7 +628,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
...
@@ -626,7 +628,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
continue
;
continue
;
}
}
void
*
pReq
=
mndBuildVCreateStbReq
(
pMnode
,
pVgroup
,
pStb
,
&
contLen
);
void
*
pReq
=
mndBuildVCreateStbReq
(
pMnode
,
pVgroup
,
pStb
,
&
contLen
,
NULL
,
0
);
if
(
pReq
==
NULL
)
{
if
(
pReq
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
...
@@ -706,10 +708,10 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
...
@@ -706,10 +708,10 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
memcpy
(
pDst
->
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
memcpy
(
pDst
->
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
pDst
->
createdTime
=
taosGetTimestampMs
();
pDst
->
createdTime
=
taosGetTimestampMs
();
pDst
->
updateTime
=
pDst
->
createdTime
;
pDst
->
updateTime
=
pDst
->
createdTime
;
pDst
->
uid
=
(
pCreate
->
source
==
1
)
?
pCreate
->
suid
:
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
pDst
->
uid
=
(
pCreate
->
source
==
TD_REQ_FROM_TAOX
)
?
pCreate
->
suid
:
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
pDst
->
dbUid
=
pDb
->
uid
;
pDst
->
dbUid
=
pDb
->
uid
;
pDst
->
tagVer
=
(
pCreate
->
source
!=
TD_REQ_FROM_APP
)
?
pCreate
->
tagVer
:
1
;
pDst
->
tagVer
=
1
;
pDst
->
colVer
=
(
pCreate
->
source
!=
TD_REQ_FROM_APP
)
?
pCreate
->
colVer
:
1
;
pDst
->
colVer
=
1
;
pDst
->
smaVer
=
1
;
pDst
->
smaVer
=
1
;
pDst
->
nextColId
=
1
;
pDst
->
nextColId
=
1
;
pDst
->
maxdelay
[
0
]
=
pCreate
->
delay1
;
pDst
->
maxdelay
[
0
]
=
pCreate
->
delay1
;
...
@@ -849,6 +851,75 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
...
@@ -849,6 +851,75 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
return
0
;
return
0
;
}
}
static
int32_t
mndFindSuperTableTagIndex
(
const
SStbObj
*
pStb
,
const
char
*
tagName
)
{
for
(
int32_t
tag
=
0
;
tag
<
pStb
->
numOfTags
;
tag
++
)
{
if
(
strcasecmp
(
pStb
->
pTags
[
tag
].
name
,
tagName
)
==
0
)
{
return
tag
;
}
}
return
-
1
;
}
static
int32_t
mndFindSuperTableColumnIndex
(
const
SStbObj
*
pStb
,
const
char
*
colName
)
{
for
(
int32_t
col
=
0
;
col
<
pStb
->
numOfColumns
;
col
++
)
{
if
(
strcasecmp
(
pStb
->
pColumns
[
col
].
name
,
colName
)
==
0
)
{
return
col
;
}
}
return
-
1
;
}
static
int32_t
mndBuildStbFromAlter
(
SStbObj
*
pStb
,
SStbObj
*
pDst
,
SMCreateStbReq
*
createReq
)
{
taosRLockLatch
(
&
pStb
->
lock
);
memcpy
(
pDst
,
pStb
,
sizeof
(
SStbObj
));
taosRUnLockLatch
(
&
pStb
->
lock
);
pDst
->
updateTime
=
taosGetTimestampMs
();
pDst
->
numOfColumns
=
createReq
->
numOfColumns
;
pDst
->
numOfTags
=
createReq
->
numOfTags
;
pDst
->
pColumns
=
taosMemoryCalloc
(
1
,
pDst
->
numOfColumns
*
sizeof
(
SSchema
));
pDst
->
pTags
=
taosMemoryCalloc
(
1
,
pDst
->
numOfTags
*
sizeof
(
SSchema
));
if
(
pDst
->
pColumns
==
NULL
||
pDst
->
pTags
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pDst
->
numOfColumns
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
createReq
->
pColumns
,
i
);
SSchema
*
pSchema
=
&
pDst
->
pColumns
[
i
];
pSchema
->
type
=
pField
->
type
;
pSchema
->
bytes
=
pField
->
bytes
;
pSchema
->
flags
=
pField
->
flags
;
memcpy
(
pSchema
->
name
,
pField
->
name
,
TSDB_COL_NAME_LEN
);
int32_t
cIndex
=
mndFindSuperTableColumnIndex
(
pStb
,
pField
->
name
);
if
(
cIndex
>=
0
){
pSchema
->
colId
=
pStb
->
pColumns
[
cIndex
].
colId
;
}
else
{
pSchema
->
colId
=
pDst
->
nextColId
++
;
}
}
for
(
int32_t
i
=
0
;
i
<
pDst
->
numOfTags
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
createReq
->
pTags
,
i
);
SSchema
*
pSchema
=
&
pDst
->
pTags
[
i
];
pSchema
->
type
=
pField
->
type
;
pSchema
->
bytes
=
pField
->
bytes
;
memcpy
(
pSchema
->
name
,
pField
->
name
,
TSDB_COL_NAME_LEN
);
int32_t
cIndex
=
mndFindSuperTableTagIndex
(
pStb
,
pField
->
name
);
if
(
cIndex
>=
0
){
pSchema
->
colId
=
pStb
->
pTags
[
cIndex
].
colId
;
}
else
{
pSchema
->
colId
=
pDst
->
nextColId
++
;
}
}
pDst
->
tagVer
=
createReq
->
tagVer
;
pDst
->
colVer
=
createReq
->
colVer
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mndProcessCreateStbReq
(
SRpcMsg
*
pReq
)
{
static
int32_t
mndProcessCreateStbReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
...
@@ -881,9 +952,9 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
...
@@ -881,9 +952,9 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
terrno
=
TSDB_CODE_MND_STABLE_UID_NOT_MATCH
;
terrno
=
TSDB_CODE_MND_STABLE_UID_NOT_MATCH
;
goto
_OVER
;
goto
_OVER
;
}
else
if
(
createReq
.
tagVer
>
0
||
createReq
.
colVer
>
0
)
{
}
else
if
(
createReq
.
tagVer
>
0
||
createReq
.
colVer
>
0
)
{
int32_t
tagDelta
=
pStb
->
tagVer
-
createReq
.
tagVer
;
int32_t
tagDelta
=
createReq
.
tagVer
-
pStb
->
tagVer
;
int32_t
colDelta
=
pStb
->
colVer
-
createReq
.
colVer
;
int32_t
colDelta
=
createReq
.
colVer
-
pStb
->
colVer
;
int32_t
verDelta
=
tagDelta
+
ver
Delta
;
int32_t
verDelta
=
tagDelta
+
col
Delta
;
mInfo
(
"stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d"
,
mInfo
(
"stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d"
,
createReq
.
name
,
createReq
.
tagVer
,
createReq
.
colVer
,
pStb
->
tagVer
,
pStb
->
colVer
);
createReq
.
name
,
createReq
.
tagVer
,
createReq
.
colVer
,
pStb
->
tagVer
,
pStb
->
colVer
);
if
(
tagDelta
<=
0
&&
colDelta
<=
0
)
{
if
(
tagDelta
<=
0
&&
colDelta
<=
0
)
{
...
@@ -910,6 +981,10 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
...
@@ -910,6 +981,10 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
}
}
}
else
if
(
terrno
!=
TSDB_CODE_MND_STB_NOT_EXIST
)
{
}
else
if
(
terrno
!=
TSDB_CODE_MND_STB_NOT_EXIST
)
{
goto
_OVER
;
goto
_OVER
;
}
else
if
(
createReq
.
source
==
TD_REQ_FROM_TAOX
&&
(
createReq
.
tagVer
!=
1
||
createReq
.
colVer
!=
1
)){
mInfo
(
"stb:%s, alter table does not need to be done, because table is deleted"
,
createReq
.
name
);
code
=
0
;
goto
_OVER
;
}
}
pDb
=
mndAcquireDbByStb
(
pMnode
,
createReq
.
name
);
pDb
=
mndAcquireDbByStb
(
pMnode
,
createReq
.
name
);
...
@@ -934,7 +1009,16 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
...
@@ -934,7 +1009,16 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
if
(
isAlter
)
{
if
(
isAlter
)
{
bool
needRsp
=
false
;
bool
needRsp
=
false
;
code
=
mndAlterStbImp
(
pMnode
,
pReq
,
pDb
,
pStb
,
needRsp
);
SStbObj
pDst
=
{
0
};
if
(
mndBuildStbFromAlter
(
pStb
,
&
pDst
,
&
createReq
)
!=
0
)
{
taosMemoryFreeClear
(
pDst
.
pTags
);
taosMemoryFreeClear
(
pDst
.
pColumns
);
goto
_OVER
;
}
code
=
mndAlterStbImp
(
pMnode
,
pReq
,
pDb
,
&
pDst
,
needRsp
,
NULL
,
0
);
taosMemoryFreeClear
(
pDst
.
pTags
);
taosMemoryFreeClear
(
pDst
.
pColumns
);
}
else
{
}
else
{
code
=
mndCreateStb
(
pMnode
,
pReq
,
&
createReq
,
pDb
);
code
=
mndCreateStb
(
pMnode
,
pReq
,
&
createReq
,
pDb
);
}
}
...
@@ -972,26 +1056,6 @@ static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
...
@@ -972,26 +1056,6 @@ static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
return
0
;
return
0
;
}
}
static
int32_t
mndFindSuperTableTagIndex
(
const
SStbObj
*
pStb
,
const
char
*
tagName
)
{
for
(
int32_t
tag
=
0
;
tag
<
pStb
->
numOfTags
;
tag
++
)
{
if
(
strcasecmp
(
pStb
->
pTags
[
tag
].
name
,
tagName
)
==
0
)
{
return
tag
;
}
}
return
-
1
;
}
static
int32_t
mndFindSuperTableColumnIndex
(
const
SStbObj
*
pStb
,
const
char
*
colName
)
{
for
(
int32_t
col
=
0
;
col
<
pStb
->
numOfColumns
;
col
++
)
{
if
(
strcasecmp
(
pStb
->
pColumns
[
col
].
name
,
colName
)
==
0
)
{
return
col
;
}
}
return
-
1
;
}
static
int32_t
mndAllocStbSchemas
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
)
{
static
int32_t
mndAllocStbSchemas
(
const
SStbObj
*
pOld
,
SStbObj
*
pNew
)
{
pNew
->
pTags
=
taosMemoryCalloc
(
pNew
->
numOfTags
,
sizeof
(
SSchema
));
pNew
->
pTags
=
taosMemoryCalloc
(
pNew
->
numOfTags
,
sizeof
(
SSchema
));
pNew
->
pColumns
=
taosMemoryCalloc
(
pNew
->
numOfColumns
,
sizeof
(
SSchema
));
pNew
->
pColumns
=
taosMemoryCalloc
(
pNew
->
numOfColumns
,
sizeof
(
SSchema
));
...
@@ -1315,7 +1379,7 @@ static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *
...
@@ -1315,7 +1379,7 @@ static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *
return
0
;
return
0
;
}
}
static
int32_t
mndSetAlterStbRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
static
int32_t
mndSetAlterStbRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SStbObj
*
pStb
,
void
*
alterOriData
,
int32_t
alterOriDataLen
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
...
@@ -1329,7 +1393,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
...
@@ -1329,7 +1393,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
continue
;
continue
;
}
}
void
*
pReq
=
mndBuildVCreateStbReq
(
pMnode
,
pVgroup
,
pStb
,
&
contLen
);
void
*
pReq
=
mndBuildVCreateStbReq
(
pMnode
,
pVgroup
,
pStb
,
&
contLen
,
alterOriData
,
alterOriDataLen
);
if
(
pReq
==
NULL
)
{
if
(
pReq
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
...
@@ -1542,7 +1606,7 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
...
@@ -1542,7 +1606,7 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
return
0
;
return
0
;
}
}
static
int32_t
mndAlterStbImp
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SStbObj
*
pStb
,
bool
needRsp
)
{
static
int32_t
mndAlterStbImp
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SStbObj
*
pStb
,
bool
needRsp
,
void
*
alterOriData
,
int32_t
alterOriDataLen
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB_INSIDE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB_INSIDE
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
...
@@ -1559,7 +1623,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
...
@@ -1559,7 +1623,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
if
(
mndSetAlterStbRedoLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterStbRedoLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterStbCommitLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterStbCommitLogs
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterStbRedoActions
(
pMnode
,
pTrans
,
pDb
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterStbRedoActions
(
pMnode
,
pTrans
,
pDb
,
pStb
,
alterOriData
,
alterOriDataLen
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
code
=
0
;
...
@@ -1620,7 +1684,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
...
@@ -1620,7 +1684,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
}
}
if
(
code
!=
0
)
goto
_OVER
;
if
(
code
!=
0
)
goto
_OVER
;
code
=
mndAlterStbImp
(
pMnode
,
pReq
,
pDb
,
&
stbObj
,
needRsp
);
code
=
mndAlterStbImp
(
pMnode
,
pReq
,
pDb
,
&
stbObj
,
needRsp
,
pReq
->
pCont
,
pReq
->
contLen
);
_OVER:
_OVER:
taosMemoryFreeClear
(
stbObj
.
pTags
);
taosMemoryFreeClear
(
stbObj
.
pTags
);
...
@@ -1785,8 +1849,8 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
...
@@ -1785,8 +1849,8 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
}
}
}
}
if
(
dropReq
.
source
!=
TD_REQ_FROM_APP
&&
pStb
->
uid
!=
dropReq
.
suid
)
{
if
(
dropReq
.
source
==
TD_REQ_FROM_TAOX
&&
pStb
->
uid
!=
dropReq
.
suid
)
{
terrno
=
TSDB_CODE_MND_STB_NOT_EXIST
;
code
=
0
;
goto
_OVER
;
goto
_OVER
;
}
}
...
...
source/dnode/mnode/sdb/inc/sdb.h
浏览文件 @
131f87ff
...
@@ -163,6 +163,7 @@ typedef struct SSdbRow {
...
@@ -163,6 +163,7 @@ typedef struct SSdbRow {
ESdbType
type
;
ESdbType
type
;
ESdbStatus
status
;
ESdbStatus
status
;
int32_t
refCount
;
int32_t
refCount
;
int64_t
forAlign
;
char
pObj
[];
char
pObj
[];
}
SSdbRow
;
}
SSdbRow
;
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
131f87ff
...
@@ -1125,6 +1125,11 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -1125,6 +1125,11 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
skmDbKey
.
uid
=
pME
->
uid
;
skmDbKey
.
uid
=
pME
->
uid
;
skmDbKey
.
sver
=
pSW
->
version
;
skmDbKey
.
sver
=
pSW
->
version
;
// if receive tmq meta message is: create stable1 then delete stable1 then create stable1 with multi vgroups
if
(
tdbTbGet
(
pMeta
->
pSkmDb
,
&
skmDbKey
,
sizeof
(
skmDbKey
),
NULL
,
NULL
)
==
0
)
{
return
rcode
;
}
// encode schema
// encode schema
int32_t
ret
=
0
;
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSSchemaWrapper
,
pSW
,
vLen
,
ret
);
tEncodeSize
(
tEncodeSSchemaWrapper
,
pSW
,
vLen
,
ret
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
131f87ff
...
@@ -401,7 +401,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -401,7 +401,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqDebug
(
"fetch meta msg, ver:%"
PRId64
", type:%d"
,
pHead
->
version
,
pHead
->
msgType
);
tqDebug
(
"fetch meta msg, ver:%"
PRId64
", type:%d"
,
pHead
->
version
,
pHead
->
msgType
);
SMqMetaRsp
metaRsp
=
{
0
};
SMqMetaRsp
metaRsp
=
{
0
};
/*metaRsp.reqOffset = pReq->reqOffset.version;*/
/*metaRsp.reqOffset = pReq->reqOffset.version;*/
/*metaRsp.rspOffset = fetchVer;*/
metaRsp
.
rspOffset
=
fetchVer
;
/*metaRsp.rspOffsetNew.version = fetchVer;*/
/*metaRsp.rspOffsetNew.version = fetchVer;*/
tqOffsetResetToLog
(
&
metaRsp
.
reqOffsetNew
,
pReq
->
reqOffset
.
version
);
tqOffsetResetToLog
(
&
metaRsp
.
reqOffsetNew
,
pReq
->
reqOffset
.
version
);
tqOffsetResetToLog
(
&
metaRsp
.
rspOffsetNew
,
fetchVer
);
tqOffsetResetToLog
(
&
metaRsp
.
rspOffsetNew
,
fetchVer
);
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
131f87ff
...
@@ -50,6 +50,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
...
@@ -50,6 +50,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
SVCreateTbReq
createTbReq
=
{
0
};
SVCreateTbReq
createTbReq
=
{
0
};
createTbReq
.
name
=
buildCtbNameByGroupId
(
stbFullName
,
pDataBlock
->
info
.
groupId
);
createTbReq
.
name
=
buildCtbNameByGroupId
(
stbFullName
,
pDataBlock
->
info
.
groupId
);
createTbReq
.
ctb
.
name
=
strdup
(
stbFullName
);
createTbReq
.
flags
=
0
;
createTbReq
.
flags
=
0
;
createTbReq
.
type
=
TSDB_CHILD_TABLE
;
createTbReq
.
type
=
TSDB_CHILD_TABLE
;
createTbReq
.
ctb
.
suid
=
suid
;
createTbReq
.
ctb
.
suid
=
suid
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
131f87ff
...
@@ -494,8 +494,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
...
@@ -494,8 +494,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
taosArrayPush
(
rsp
.
pArray
,
&
cRsp
);
taosArrayPush
(
rsp
.
pArray
,
&
cRsp
);
}
}
tDecoderClear
(
&
decoder
);
tqUpdateTbUidList
(
pVnode
->
pTq
,
tbUids
,
true
);
tqUpdateTbUidList
(
pVnode
->
pTq
,
tbUids
,
true
);
tdUpdateTbUidList
(
pVnode
->
pSma
,
pStore
);
tdUpdateTbUidList
(
pVnode
->
pSma
,
pStore
);
tdUidStoreFree
(
pStore
);
tdUidStoreFree
(
pStore
);
...
@@ -512,9 +510,12 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
...
@@ -512,9 +510,12 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
}
}
tEncoderInit
(
&
encoder
,
pRsp
->
pCont
,
pRsp
->
contLen
);
tEncoderInit
(
&
encoder
,
pRsp
->
pCont
,
pRsp
->
contLen
);
tEncodeSVCreateTbBatchRsp
(
&
encoder
,
&
rsp
);
tEncodeSVCreateTbBatchRsp
(
&
encoder
,
&
rsp
);
tEncoderClear
(
&
encoder
);
_exit:
_exit:
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pCreateReq
=
req
.
pReqs
+
iReq
;
taosArrayDestroy
(
pCreateReq
->
ctb
.
tagName
);
}
taosArrayDestroy
(
rsp
.
pArray
);
taosArrayDestroy
(
rsp
.
pArray
);
taosArrayDestroy
(
tbUids
);
taosArrayDestroy
(
tbUids
);
tDecoderClear
(
&
decoder
);
tDecoderClear
(
&
decoder
);
...
@@ -611,7 +612,7 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pRe
...
@@ -611,7 +612,7 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pRe
// process
// process
if
(
metaAlterTable
(
pVnode
->
pMeta
,
version
,
&
vAlterTbReq
,
&
vMetaRsp
)
<
0
)
{
if
(
metaAlterTable
(
pVnode
->
pMeta
,
version
,
&
vAlterTbReq
,
&
vMetaRsp
)
<
0
)
{
vAlterTbRsp
.
code
=
TSDB_CODE_INVALID_MSG
;
vAlterTbRsp
.
code
=
terrno
;
tDecoderClear
(
&
dc
);
tDecoderClear
(
&
dc
);
rcode
=
-
1
;
rcode
=
-
1
;
goto
_exit
;
goto
_exit
;
...
@@ -795,6 +796,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
...
@@ -795,6 +796,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
if
(
tDecodeSVCreateTbReq
(
&
decoder
,
&
createTbReq
)
<
0
)
{
if
(
tDecodeSVCreateTbReq
(
&
decoder
,
&
createTbReq
)
<
0
)
{
pRsp
->
code
=
TSDB_CODE_INVALID_MSG
;
pRsp
->
code
=
TSDB_CODE_INVALID_MSG
;
tDecoderClear
(
&
decoder
);
tDecoderClear
(
&
decoder
);
taosArrayDestroy
(
createTbReq
.
ctb
.
tagName
);
goto
_exit
;
goto
_exit
;
}
}
...
@@ -802,6 +804,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
...
@@ -802,6 +804,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
if
(
terrno
!=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
)
{
if
(
terrno
!=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
)
{
submitBlkRsp
.
code
=
terrno
;
submitBlkRsp
.
code
=
terrno
;
tDecoderClear
(
&
decoder
);
tDecoderClear
(
&
decoder
);
taosArrayDestroy
(
createTbReq
.
ctb
.
tagName
);
goto
_exit
;
goto
_exit
;
}
}
}
}
...
@@ -822,6 +825,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
...
@@ -822,6 +825,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
vnodeDebugPrintSingleSubmitMsg
(
pVnode
->
pMeta
,
pBlock
,
&
msgIter
,
"real uid"
);
vnodeDebugPrintSingleSubmitMsg
(
pVnode
->
pMeta
,
pBlock
,
&
msgIter
,
"real uid"
);
#endif
#endif
tDecoderClear
(
&
decoder
);
tDecoderClear
(
&
decoder
);
taosArrayDestroy
(
createTbReq
.
ctb
.
tagName
);
}
else
{
}
else
{
submitBlkRsp
.
tblFName
=
taosMemoryMalloc
(
TSDB_TABLE_FNAME_LEN
);
submitBlkRsp
.
tblFName
=
taosMemoryMalloc
(
TSDB_TABLE_FNAME_LEN
);
sprintf
(
submitBlkRsp
.
tblFName
,
"%s."
,
pVnode
->
config
.
dbname
);
sprintf
(
submitBlkRsp
.
tblFName
,
"%s."
,
pVnode
->
config
.
dbname
);
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
131f87ff
...
@@ -71,6 +71,7 @@ typedef struct SInsertParseContext {
...
@@ -71,6 +71,7 @@ typedef struct SInsertParseContext {
SVnodeModifOpStmt
*
pOutput
;
SVnodeModifOpStmt
*
pOutput
;
SStmtCallback
*
pStmtCb
;
SStmtCallback
*
pStmtCb
;
SParseMetaCache
*
pMetaCache
;
SParseMetaCache
*
pMetaCache
;
char
sTableName
[
TSDB_TABLE_NAME_LEN
];
}
SInsertParseContext
;
}
SInsertParseContext
;
typedef
struct
SInsertParseSyntaxCxt
{
typedef
struct
SInsertParseSyntaxCxt
{
...
@@ -734,11 +735,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
...
@@ -734,11 +735,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
buildCreateTbReq
(
SVCreateTbReq
*
pTbReq
,
const
char
*
tname
,
STag
*
pTag
,
int64_t
suid
)
{
static
void
buildCreateTbReq
(
SVCreateTbReq
*
pTbReq
,
const
char
*
tname
,
STag
*
pTag
,
int64_t
suid
,
const
char
*
sname
,
SArray
*
tagName
)
{
pTbReq
->
type
=
TD_CHILD_TABLE
;
pTbReq
->
type
=
TD_CHILD_TABLE
;
pTbReq
->
name
=
strdup
(
tname
);
pTbReq
->
name
=
strdup
(
tname
);
pTbReq
->
ctb
.
suid
=
suid
;
pTbReq
->
ctb
.
suid
=
suid
;
if
(
sname
)
pTbReq
->
ctb
.
name
=
strdup
(
sname
);
pTbReq
->
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
pTbReq
->
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
pTbReq
->
ctb
.
tagName
=
taosArrayDup
(
tagName
);
pTbReq
->
commentLen
=
-
1
;
pTbReq
->
commentLen
=
-
1
;
return
;
return
;
...
@@ -758,6 +761,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16
...
@@ -758,6 +761,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
// strcpy(val->colName, pSchema->name);
val
->
cid
=
pSchema
->
colId
;
val
->
cid
=
pSchema
->
colId
;
val
->
type
=
pSchema
->
type
;
val
->
type
=
pSchema
->
type
;
...
@@ -936,6 +940,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16
...
@@ -936,6 +940,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16
static
int32_t
parseTagsClause
(
SInsertParseContext
*
pCxt
,
SSchema
*
pSchema
,
uint8_t
precision
,
const
char
*
tName
)
{
static
int32_t
parseTagsClause
(
SInsertParseContext
*
pCxt
,
SSchema
*
pSchema
,
uint8_t
precision
,
const
char
*
tName
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SArray
*
pTagVals
=
taosArrayInit
(
pCxt
->
tags
.
numOfBound
,
sizeof
(
STagVal
));
SArray
*
pTagVals
=
taosArrayInit
(
pCxt
->
tags
.
numOfBound
,
sizeof
(
STagVal
));
SArray
*
tagName
=
taosArrayInit
(
8
,
TSDB_COL_NAME_LEN
);
SToken
sToken
;
SToken
sToken
;
bool
isParseBindParam
=
false
;
bool
isParseBindParam
=
false
;
bool
isJson
=
false
;
bool
isJson
=
false
;
...
@@ -965,6 +970,10 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
...
@@ -965,6 +970,10 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
taosMemoryFree
(
tmpTokenBuf
);
taosMemoryFree
(
tmpTokenBuf
);
goto
end
;
goto
end
;
}
}
if
(
!
isNullStr
(
&
sToken
))
{
taosArrayPush
(
tagName
,
pTagSchema
->
name
);
}
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
sToken
.
n
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
if
(
sToken
.
n
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
code
=
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"json string too long than 4095"
,
sToken
.
z
);
code
=
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"json string too long than 4095"
,
sToken
.
z
);
...
@@ -1004,7 +1013,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
...
@@ -1004,7 +1013,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
goto
end
;
goto
end
;
}
}
buildCreateTbReq
(
&
pCxt
->
createTblReq
,
tName
,
pTag
,
pCxt
->
pTableMeta
->
suid
);
buildCreateTbReq
(
&
pCxt
->
createTblReq
,
tName
,
pTag
,
pCxt
->
pTableMeta
->
suid
,
pCxt
->
sTableName
,
tagName
);
end:
end:
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagVals
);
++
i
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagVals
);
++
i
)
{
...
@@ -1014,6 +1023,7 @@ end:
...
@@ -1014,6 +1023,7 @@ end:
}
}
}
}
taosArrayDestroy
(
pTagVals
);
taosArrayDestroy
(
pTagVals
);
taosArrayDestroy
(
tagName
);
return
code
;
return
code
;
}
}
...
@@ -1089,6 +1099,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
...
@@ -1089,6 +1099,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
createSName
(
&
sname
,
&
sToken
,
pCxt
->
pComCxt
->
acctId
,
pCxt
->
pComCxt
->
db
,
&
pCxt
->
msg
);
createSName
(
&
sname
,
&
sToken
,
pCxt
->
pComCxt
->
acctId
,
pCxt
->
pComCxt
->
db
,
&
pCxt
->
msg
);
char
dbFName
[
TSDB_DB_FNAME_LEN
];
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
&
sname
,
dbFName
);
tNameGetFullDbName
(
&
sname
,
dbFName
);
strcpy
(
pCxt
->
sTableName
,
sname
.
tname
);
CHECK_CODE
(
getSTableMeta
(
pCxt
,
&
sname
,
dbFName
));
CHECK_CODE
(
getSTableMeta
(
pCxt
,
&
sname
,
dbFName
));
if
(
TSDB_SUPER_TABLE
!=
pCxt
->
pTableMeta
->
tableType
)
{
if
(
TSDB_SUPER_TABLE
!=
pCxt
->
pTableMeta
->
tableType
)
{
...
@@ -1325,15 +1336,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
...
@@ -1325,15 +1336,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
void
destroyCreateSubTbReq
(
SVCreateTbReq
*
pReq
)
{
taosMemoryFreeClear
(
pReq
->
name
);
taosMemoryFreeClear
(
pReq
->
ctb
.
pTag
);
}
static
void
destroyInsertParseContextForTable
(
SInsertParseContext
*
pCxt
)
{
static
void
destroyInsertParseContextForTable
(
SInsertParseContext
*
pCxt
)
{
taosMemoryFreeClear
(
pCxt
->
pTableMeta
);
taosMemoryFreeClear
(
pCxt
->
pTableMeta
);
destroyBoundColumnInfo
(
&
pCxt
->
tags
);
destroyBoundColumnInfo
(
&
pCxt
->
tags
);
destroyCreateSub
TbReq
(
&
pCxt
->
createTblReq
);
tdDestroySVCreate
TbReq
(
&
pCxt
->
createTblReq
);
}
}
static
void
destroySubTableHashElem
(
void
*
p
)
{
taosMemoryFree
(
*
(
STableMeta
**
)
p
);
}
static
void
destroySubTableHashElem
(
void
*
p
)
{
taosMemoryFree
(
*
(
STableMeta
**
)
p
);
}
...
@@ -1479,7 +1485,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
...
@@ -1479,7 +1485,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
}
}
memcpy
(
tags
,
&
pCxt
->
tags
,
sizeof
(
pCxt
->
tags
));
memcpy
(
tags
,
&
pCxt
->
tags
,
sizeof
(
pCxt
->
tags
));
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pCxt
->
pTableMeta
,
tags
,
tbFName
,
autoCreateTbl
,
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pCxt
->
pTableMeta
,
tags
,
tbFName
,
autoCreateTbl
,
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
);
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
,
pCxt
->
sTableName
);
memset
(
&
pCxt
->
tags
,
0
,
sizeof
(
pCxt
->
tags
));
memset
(
&
pCxt
->
tags
,
0
,
sizeof
(
pCxt
->
tags
));
pCxt
->
pVgroupsHashObj
=
NULL
;
pCxt
->
pVgroupsHashObj
=
NULL
;
...
@@ -1508,6 +1514,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
...
@@ -1508,6 +1514,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
.
pSql
=
(
char
*
)
pContext
->
pSql
,
.
pSql
=
(
char
*
)
pContext
->
pSql
,
.
msg
=
{.
buf
=
pContext
->
pMsg
,
.
len
=
pContext
->
msgLen
},
.
msg
=
{.
buf
=
pContext
->
pMsg
,
.
len
=
pContext
->
msgLen
},
.
pTableMeta
=
NULL
,
.
pTableMeta
=
NULL
,
.
createTblReq
=
{
0
},
.
pSubTableHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_VARCHAR
),
true
,
HASH_NO_LOCK
),
.
pSubTableHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_VARCHAR
),
true
,
HASH_NO_LOCK
),
.
pTableNameHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_VARCHAR
),
true
,
HASH_NO_LOCK
),
.
pTableNameHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_VARCHAR
),
true
,
HASH_NO_LOCK
),
.
pDbFNameHashObj
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_VARCHAR
),
true
,
HASH_NO_LOCK
),
.
pDbFNameHashObj
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_VARCHAR
),
true
,
HASH_NO_LOCK
),
...
@@ -1788,7 +1795,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
...
@@ -1788,7 +1795,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
char
*
tName
,
TAOS_MULTI_BIND
*
bind
,
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
c
onst
char
*
sTableName
,
c
har
*
tName
,
TAOS_MULTI_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
char
*
msgBuf
,
int32_t
msgBufLen
)
{
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
...
@@ -1802,6 +1809,11 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
...
@@ -1802,6 +1809,11 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
return
buildInvalidOperationMsg
(
&
pBuf
,
"out of memory"
);
return
buildInvalidOperationMsg
(
&
pBuf
,
"out of memory"
);
}
}
SArray
*
tagName
=
taosArrayInit
(
8
,
TSDB_COL_NAME_LEN
);
if
(
!
tagName
)
{
return
buildInvalidOperationMsg
(
&
pBuf
,
"out of memory"
);
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSchema
*
pSchema
=
getTableTagSchema
(
pDataBlock
->
pTableMeta
);
SSchema
*
pSchema
=
getTableTagSchema
(
pDataBlock
->
pTableMeta
);
...
@@ -1818,6 +1830,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
...
@@ -1818,6 +1830,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
colLen
=
bind
[
c
].
length
[
0
];
colLen
=
bind
[
c
].
length
[
0
];
}
}
taosArrayPush
(
tagName
,
pTagSchema
->
name
);
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
colLen
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
if
(
colLen
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
code
=
buildSyntaxErrMsg
(
&
pBuf
,
"json string too long than 4095"
,
bind
[
c
].
buffer
);
code
=
buildSyntaxErrMsg
(
&
pBuf
,
"json string too long than 4095"
,
bind
[
c
].
buffer
);
...
@@ -1834,6 +1847,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
...
@@ -1834,6 +1847,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
}
}
}
else
{
}
else
{
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
// strcpy(val.colName, pTagSchema->name);
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
val
.
pData
=
(
uint8_t
*
)
bind
[
c
].
buffer
;
val
.
pData
=
(
uint8_t
*
)
bind
[
c
].
buffer
;
val
.
nData
=
colLen
;
val
.
nData
=
colLen
;
...
@@ -1870,9 +1884,9 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
...
@@ -1870,9 +1884,9 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
}
}
SVCreateTbReq
tbReq
=
{
0
};
SVCreateTbReq
tbReq
=
{
0
};
buildCreateTbReq
(
&
tbReq
,
tName
,
pTag
,
suid
);
buildCreateTbReq
(
&
tbReq
,
tName
,
pTag
,
suid
,
sTableName
,
tagName
);
code
=
buildCreateTbMsg
(
pDataBlock
,
&
tbReq
);
code
=
buildCreateTbMsg
(
pDataBlock
,
&
tbReq
);
destroyCreateSub
TbReq
(
&
tbReq
);
tdDestroySVCreate
TbReq
(
&
tbReq
);
end:
end:
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagArray
);
++
i
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagArray
);
++
i
)
{
...
@@ -1882,6 +1896,7 @@ end:
...
@@ -1882,6 +1896,7 @@ end:
}
}
}
}
taosArrayDestroy
(
pTagArray
);
taosArrayDestroy
(
pTagArray
);
taosArrayDestroy
(
tagName
);
return
code
;
return
code
;
}
}
...
@@ -2136,7 +2151,7 @@ typedef struct SmlExecHandle {
...
@@ -2136,7 +2151,7 @@ typedef struct SmlExecHandle {
static
void
smlDestroyTableHandle
(
void
*
pHandle
)
{
static
void
smlDestroyTableHandle
(
void
*
pHandle
)
{
SmlExecTableHandle
*
handle
=
(
SmlExecTableHandle
*
)
pHandle
;
SmlExecTableHandle
*
handle
=
(
SmlExecTableHandle
*
)
pHandle
;
destroyBoundColumnInfo
(
&
handle
->
tags
);
destroyBoundColumnInfo
(
&
handle
->
tags
);
destroyCreateSub
TbReq
(
&
handle
->
createTblReq
);
tdDestroySVCreate
TbReq
(
&
handle
->
createTblReq
);
}
}
static
int32_t
smlBoundColumnData
(
SArray
*
cols
,
SParsedDataColInfo
*
pColList
,
SSchema
*
pSchema
)
{
static
int32_t
smlBoundColumnData
(
SArray
*
cols
,
SParsedDataColInfo
*
pColList
,
SSchema
*
pSchema
)
{
...
@@ -2222,18 +2237,24 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
...
@@ -2222,18 +2237,24 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
* @param msg
* @param msg
* @return int32_t
* @return int32_t
*/
*/
static
int32_t
smlBuildTagRow
(
SArray
*
cols
,
SParsedDataColInfo
*
tags
,
SSchema
*
pSchema
,
STag
**
ppTag
,
SMsgBuf
*
msg
)
{
static
int32_t
smlBuildTagRow
(
SArray
*
cols
,
SParsedDataColInfo
*
tags
,
SSchema
*
pSchema
,
STag
**
ppTag
,
S
Array
**
tagName
,
S
MsgBuf
*
msg
)
{
SArray
*
pTagArray
=
taosArrayInit
(
tags
->
numOfBound
,
sizeof
(
STagVal
));
SArray
*
pTagArray
=
taosArrayInit
(
tags
->
numOfBound
,
sizeof
(
STagVal
));
if
(
!
pTagArray
)
{
if
(
!
pTagArray
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
*
tagName
=
taosArrayInit
(
8
,
TSDB_COL_NAME_LEN
);
if
(
!*
tagName
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
tags
->
numOfBound
;
++
i
)
{
for
(
int
i
=
0
;
i
<
tags
->
numOfBound
;
++
i
)
{
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
boundColumns
[
i
]];
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
boundColumns
[
i
]];
SSmlKv
*
kv
=
taosArrayGetP
(
cols
,
i
);
SSmlKv
*
kv
=
taosArrayGetP
(
cols
,
i
);
taosArrayPush
(
*
tagName
,
pTagSchema
->
name
);
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
// strcpy(val.colName, pTagSchema->name);
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
val
.
pData
=
(
uint8_t
*
)
kv
->
value
;
val
.
pData
=
(
uint8_t
*
)
kv
->
value
;
val
.
nData
=
kv
->
length
;
val
.
nData
=
kv
->
length
;
...
@@ -2277,7 +2298,7 @@ end:
...
@@ -2277,7 +2298,7 @@ end:
}
}
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
)
{
char
*
tableName
,
c
onst
char
*
sTableName
,
int32_t
sTableNameLen
,
c
har
*
msgBuf
,
int16_t
msgBufLen
)
{
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SSmlExecHandle
*
smlHandle
=
(
SSmlExecHandle
*
)
handle
;
SSmlExecHandle
*
smlHandle
=
(
SSmlExecHandle
*
)
handle
;
...
@@ -2290,12 +2311,19 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
...
@@ -2290,12 +2311,19 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
return
ret
;
return
ret
;
}
}
STag
*
pTag
=
NULL
;
STag
*
pTag
=
NULL
;
ret
=
smlBuildTagRow
(
tags
,
&
smlHandle
->
tableExecHandle
.
tags
,
pTagsSchema
,
&
pTag
,
&
pBuf
);
SArray
*
tagName
=
NULL
;
ret
=
smlBuildTagRow
(
tags
,
&
smlHandle
->
tableExecHandle
.
tags
,
pTagsSchema
,
&
pTag
,
&
tagName
,
&
pBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
tagName
);
return
ret
;
return
ret
;
}
}
buildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
);
buildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
,
NULL
,
tagName
);
taosArrayDestroy
(
tagName
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
=
taosMemoryMalloc
(
sTableNameLen
+
1
);
memcpy
(
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
,
sTableName
,
sTableNameLen
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
[
sTableNameLen
]
=
0
;
STableDataBlocks
*
pDataBlock
=
NULL
;
STableDataBlocks
*
pDataBlock
=
NULL
;
ret
=
getDataBlockFromList
(
smlHandle
->
pBlockHash
,
&
pTableMeta
->
uid
,
sizeof
(
pTableMeta
->
uid
),
ret
=
getDataBlockFromList
(
smlHandle
->
pBlockHash
,
&
pTableMeta
->
uid
,
sizeof
(
pTableMeta
->
uid
),
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
131f87ff
...
@@ -3754,6 +3754,9 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
...
@@ -3754,6 +3754,9 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
pReq
->
delay2
=
pStmt
->
pOptions
->
maxDelay2
;
pReq
->
delay2
=
pStmt
->
pOptions
->
maxDelay2
;
pReq
->
watermark1
=
pStmt
->
pOptions
->
watermark1
;
pReq
->
watermark1
=
pStmt
->
pOptions
->
watermark1
;
pReq
->
watermark2
=
pStmt
->
pOptions
->
watermark2
;
pReq
->
watermark2
=
pStmt
->
pOptions
->
watermark2
;
pReq
->
colVer
=
1
;
pReq
->
tagVer
=
1
;
pReq
->
source
=
TD_REQ_FROM_APP
;
columnDefNodeToField
(
pStmt
->
pCols
,
&
pReq
->
pColumns
);
columnDefNodeToField
(
pStmt
->
pCols
,
&
pReq
->
pColumns
);
columnDefNodeToField
(
pStmt
->
pTags
,
&
pReq
->
pTags
);
columnDefNodeToField
(
pStmt
->
pTags
,
&
pReq
->
pTags
);
pReq
->
numOfColumns
=
LIST_LENGTH
(
pStmt
->
pCols
);
pReq
->
numOfColumns
=
LIST_LENGTH
(
pStmt
->
pCols
);
...
@@ -5337,6 +5340,8 @@ static void destroyCreateTbReqBatch(void* data) {
...
@@ -5337,6 +5340,8 @@ static void destroyCreateTbReqBatch(void* data) {
taosMemoryFreeClear
(
pTableReq
->
ntb
.
schemaRow
.
pSchema
);
taosMemoryFreeClear
(
pTableReq
->
ntb
.
schemaRow
.
pSchema
);
}
else
if
(
pTableReq
->
type
==
TSDB_CHILD_TABLE
)
{
}
else
if
(
pTableReq
->
type
==
TSDB_CHILD_TABLE
)
{
taosMemoryFreeClear
(
pTableReq
->
ctb
.
pTag
);
taosMemoryFreeClear
(
pTableReq
->
ctb
.
pTag
);
taosMemoryFreeClear
(
pTableReq
->
ctb
.
name
);
taosArrayDestroy
(
pTableReq
->
ctb
.
tagName
);
}
}
}
}
...
@@ -5408,11 +5413,11 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
...
@@ -5408,11 +5413,11 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
}
}
static
void
addCreateTbReqIntoVgroup
(
int32_t
acctId
,
SHashObj
*
pVgroupHashmap
,
SCreateSubTableClause
*
pStmt
,
static
void
addCreateTbReqIntoVgroup
(
int32_t
acctId
,
SHashObj
*
pVgroupHashmap
,
SCreateSubTableClause
*
pStmt
,
const
STag
*
pTag
,
uint64_t
suid
,
SVgroupInfo
*
pVgInfo
)
{
const
STag
*
pTag
,
uint64_t
suid
,
const
char
*
sTableNmae
,
SVgroupInfo
*
pVgInfo
,
SArray
*
tagName
)
{
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
// strcpy(name.dbname, pStmt->dbName);
// strcpy(name.dbname, pStmt->dbName);
// tNameGetFullDbName(&name, dbFName);
// tNameGetFullDbName(&name, dbFName);
struct
SVCreateTbReq
req
=
{
0
};
struct
SVCreateTbReq
req
=
{
0
};
req
.
type
=
TD_CHILD_TABLE
;
req
.
type
=
TD_CHILD_TABLE
;
...
@@ -5425,7 +5430,9 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
...
@@ -5425,7 +5430,9 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
req
.
commentLen
=
-
1
;
req
.
commentLen
=
-
1
;
}
}
req
.
ctb
.
suid
=
suid
;
req
.
ctb
.
suid
=
suid
;
req
.
ctb
.
name
=
strdup
(
sTableNmae
);
req
.
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
req
.
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
req
.
ctb
.
tagName
=
taosArrayDup
(
tagName
);
if
(
pStmt
->
ignoreExists
)
{
if
(
pStmt
->
ignoreExists
)
{
req
.
flags
|=
TD_CREATE_IF_NOT_EXISTS
;
req
.
flags
|=
TD_CREATE_IF_NOT_EXISTS
;
}
}
...
@@ -5517,6 +5524,7 @@ static int32_t buildNormalTagVal(STranslateContext* pCxt, SSchema* pTagSchema, S
...
@@ -5517,6 +5524,7 @@ static int32_t buildNormalTagVal(STranslateContext* pCxt, SSchema* pTagSchema, S
if
(
pVal
->
node
.
resType
.
type
!=
TSDB_DATA_TYPE_NULL
)
{
if
(
pVal
->
node
.
resType
.
type
!=
TSDB_DATA_TYPE_NULL
)
{
void
*
nodeVal
=
nodesGetValueFromNode
(
pVal
);
void
*
nodeVal
=
nodesGetValueFromNode
(
pVal
);
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
// strcpy(val.colName, pTagSchema->name);
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
val
.
pData
=
varDataVal
(
nodeVal
);
val
.
pData
=
varDataVal
(
nodeVal
);
val
.
nData
=
varDataLen
(
nodeVal
);
val
.
nData
=
varDataLen
(
nodeVal
);
...
@@ -5529,7 +5537,7 @@ static int32_t buildNormalTagVal(STranslateContext* pCxt, SSchema* pTagSchema, S
...
@@ -5529,7 +5537,7 @@ static int32_t buildNormalTagVal(STranslateContext* pCxt, SSchema* pTagSchema, S
}
}
static
int32_t
buildKVRowForBindTags
(
STranslateContext
*
pCxt
,
SCreateSubTableClause
*
pStmt
,
STableMeta
*
pSuperTableMeta
,
static
int32_t
buildKVRowForBindTags
(
STranslateContext
*
pCxt
,
SCreateSubTableClause
*
pStmt
,
STableMeta
*
pSuperTableMeta
,
STag
**
ppTag
)
{
STag
**
ppTag
,
SArray
*
tagName
)
{
int32_t
numOfTags
=
getNumOfTags
(
pSuperTableMeta
);
int32_t
numOfTags
=
getNumOfTags
(
pSuperTableMeta
);
if
(
LIST_LENGTH
(
pStmt
->
pValsOfTags
)
!=
LIST_LENGTH
(
pStmt
->
pSpecificTags
)
||
if
(
LIST_LENGTH
(
pStmt
->
pValsOfTags
)
!=
LIST_LENGTH
(
pStmt
->
pSpecificTags
)
||
numOfTags
<
LIST_LENGTH
(
pStmt
->
pValsOfTags
))
{
numOfTags
<
LIST_LENGTH
(
pStmt
->
pValsOfTags
))
{
...
@@ -5560,8 +5568,10 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
...
@@ -5560,8 +5568,10 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
isJson
=
true
;
isJson
=
true
;
code
=
buildJsonTagVal
(
pCxt
,
pSchema
,
pVal
,
pTagArray
,
ppTag
);
code
=
buildJsonTagVal
(
pCxt
,
pSchema
,
pVal
,
pTagArray
,
ppTag
);
taosArrayPush
(
tagName
,
pCol
->
colName
);
}
else
if
(
pVal
->
node
.
resType
.
type
!=
TSDB_DATA_TYPE_NULL
)
{
}
else
if
(
pVal
->
node
.
resType
.
type
!=
TSDB_DATA_TYPE_NULL
)
{
code
=
buildNormalTagVal
(
pCxt
,
pSchema
,
pVal
,
pTagArray
);
code
=
buildNormalTagVal
(
pCxt
,
pSchema
,
pVal
,
pTagArray
);
taosArrayPush
(
tagName
,
pCol
->
colName
);
}
}
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
@@ -5582,7 +5592,7 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
...
@@ -5582,7 +5592,7 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
}
}
static
int32_t
buildKVRowForAllTags
(
STranslateContext
*
pCxt
,
SCreateSubTableClause
*
pStmt
,
STableMeta
*
pSuperTableMeta
,
static
int32_t
buildKVRowForAllTags
(
STranslateContext
*
pCxt
,
SCreateSubTableClause
*
pStmt
,
STableMeta
*
pSuperTableMeta
,
STag
**
ppTag
)
{
STag
**
ppTag
,
SArray
*
tagName
)
{
if
(
getNumOfTags
(
pSuperTableMeta
)
!=
LIST_LENGTH
(
pStmt
->
pValsOfTags
))
{
if
(
getNumOfTags
(
pSuperTableMeta
)
!=
LIST_LENGTH
(
pStmt
->
pValsOfTags
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_TAGS_NOT_MATCHED
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_TAGS_NOT_MATCHED
);
}
}
...
@@ -5607,9 +5617,11 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
...
@@ -5607,9 +5617,11 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_JSON
)
{
isJson
=
true
;
isJson
=
true
;
code
=
buildJsonTagVal
(
pCxt
,
pTagSchema
,
pVal
,
pTagArray
,
ppTag
);
code
=
buildJsonTagVal
(
pCxt
,
pTagSchema
,
pVal
,
pTagArray
,
ppTag
);
taosArrayPush
(
tagName
,
pTagSchema
->
name
);
}
else
if
(
pVal
->
node
.
resType
.
type
!=
TSDB_DATA_TYPE_NULL
&&
!
pVal
->
isNull
)
{
}
else
if
(
pVal
->
node
.
resType
.
type
!=
TSDB_DATA_TYPE_NULL
&&
!
pVal
->
isNull
)
{
char
*
tmpVal
=
nodesGetValueFromNode
(
pVal
);
char
*
tmpVal
=
nodesGetValueFromNode
(
pVal
);
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
// strcpy(val.colName, pTagSchema->name);
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
val
.
pData
=
varDataVal
(
tmpVal
);
val
.
pData
=
varDataVal
(
tmpVal
);
val
.
nData
=
varDataLen
(
tmpVal
);
val
.
nData
=
varDataLen
(
tmpVal
);
...
@@ -5617,6 +5629,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
...
@@ -5617,6 +5629,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
memcpy
(
&
val
.
i64
,
tmpVal
,
pTagSchema
->
bytes
);
memcpy
(
&
val
.
i64
,
tmpVal
,
pTagSchema
->
bytes
);
}
}
taosArrayPush
(
pTagArray
,
&
val
);
taosArrayPush
(
pTagArray
,
&
val
);
taosArrayPush
(
tagName
,
pTagSchema
->
name
);
}
}
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
@@ -5652,12 +5665,13 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
...
@@ -5652,12 +5665,13 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
}
}
STag
*
pTag
=
NULL
;
STag
*
pTag
=
NULL
;
SArray
*
tagName
=
taosArrayInit
(
8
,
TSDB_COL_NAME_LEN
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
NULL
!=
pStmt
->
pSpecificTags
)
{
if
(
NULL
!=
pStmt
->
pSpecificTags
)
{
code
=
buildKVRowForBindTags
(
pCxt
,
pStmt
,
pSuperTableMeta
,
&
pTag
);
code
=
buildKVRowForBindTags
(
pCxt
,
pStmt
,
pSuperTableMeta
,
&
pTag
,
tagName
);
}
else
{
}
else
{
code
=
buildKVRowForAllTags
(
pCxt
,
pStmt
,
pSuperTableMeta
,
&
pTag
);
code
=
buildKVRowForAllTags
(
pCxt
,
pStmt
,
pSuperTableMeta
,
&
pTag
,
tagName
);
}
}
}
}
...
@@ -5666,9 +5680,10 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
...
@@ -5666,9 +5680,10 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code
=
getTableHashVgroup
(
pCxt
,
pStmt
->
dbName
,
pStmt
->
tableName
,
&
info
);
code
=
getTableHashVgroup
(
pCxt
,
pStmt
->
dbName
,
pStmt
->
tableName
,
&
info
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
addCreateTbReqIntoVgroup
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
,
pStmt
,
pTag
,
pSuperTableMeta
->
uid
,
&
info
);
addCreateTbReqIntoVgroup
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
,
pStmt
,
pTag
,
pSuperTableMeta
->
uid
,
pStmt
->
useTableName
,
&
info
,
tagName
);
}
}
taosArrayDestroy
(
tagName
);
taosMemoryFreeClear
(
pSuperTableMeta
);
taosMemoryFreeClear
(
pSuperTableMeta
);
return
code
;
return
code
;
}
}
...
@@ -5883,9 +5898,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
...
@@ -5883,9 +5898,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
return
pCxt
->
errCode
;
return
pCxt
->
errCode
;
}
}
pReq
->
isNull
=
(
TSDB_DATA_TYPE_NULL
==
pStmt
->
pVal
->
node
.
resType
.
type
)
;
pReq
->
tagType
=
targetDt
.
type
;
if
(
targetDt
.
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
targetDt
.
type
==
TSDB_DATA_TYPE_JSON
)
{
pReq
->
isNull
=
0
;
if
(
pStmt
->
pVal
->
literal
&&
if
(
pStmt
->
pVal
->
literal
&&
strlen
(
pStmt
->
pVal
->
literal
)
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
strlen
(
pStmt
->
pVal
->
literal
)
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
"json string too long than 4095"
,
pStmt
->
pVal
->
literal
);
return
buildSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
"json string too long than 4095"
,
pStmt
->
pVal
->
literal
);
...
@@ -5913,6 +5927,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
...
@@ -5913,6 +5927,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
pReq
->
pTagVal
=
(
uint8_t
*
)
pTag
;
pReq
->
pTagVal
=
(
uint8_t
*
)
pTag
;
pStmt
->
pVal
->
datum
.
p
=
(
char
*
)
pTag
;
// for free
pStmt
->
pVal
->
datum
.
p
=
(
char
*
)
pTag
;
// for free
}
else
{
}
else
{
pReq
->
isNull
=
(
TSDB_DATA_TYPE_NULL
==
pStmt
->
pVal
->
node
.
resType
.
type
);
pReq
->
nTagVal
=
pStmt
->
pVal
->
node
.
resType
.
bytes
;
pReq
->
nTagVal
=
pStmt
->
pVal
->
node
.
resType
.
bytes
;
pReq
->
pTagVal
=
nodesGetValueFromNode
(
pStmt
->
pVal
);
pReq
->
pTagVal
=
nodesGetValueFromNode
(
pStmt
->
pVal
);
...
@@ -5966,7 +5981,7 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
...
@@ -5966,7 +5981,7 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
static
int32_t
buildUpdateColReq
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
STableMeta
*
pTableMeta
,
static
int32_t
buildUpdateColReq
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
STableMeta
*
pTableMeta
,
SVAlterTbReq
*
pReq
)
{
SVAlterTbReq
*
pReq
)
{
pReq
->
colModBytes
=
calcTypeBytes
(
pStmt
->
dataType
);
pReq
->
colModBytes
=
calcTypeBytes
(
pStmt
->
dataType
);
pReq
->
colModType
=
pStmt
->
dataType
.
type
;
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
if
(
NULL
==
pSchema
)
{
if
(
NULL
==
pSchema
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
131f87ff
...
@@ -389,6 +389,7 @@ int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, voi
...
@@ -389,6 +389,7 @@ int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, voi
continue
;
continue
;
}
}
STagVal
val
=
{
0
};
STagVal
val
=
{
0
};
// strcpy(val.colName, colName);
val
.
pKey
=
jsonKey
;
val
.
pKey
=
jsonKey
;
taosHashPut
(
keyHash
,
jsonKey
,
keyLen
,
&
keyLen
,
taosHashPut
(
keyHash
,
jsonKey
,
keyLen
,
&
keyLen
,
CHAR_BYTES
);
// add key to hash to remove dumplicate, value is useless
CHAR_BYTES
);
// add key to hash to remove dumplicate, value is useless
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录