Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
607f8163
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
607f8163
编写于
7月 10, 2022
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat:add logic for get_tmq_meta_json
上级
054b16c2
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
261 addition
and
97 deletion
+261
-97
include/client/taos.h
include/client/taos.h
+3
-1
include/common/tdataformat.h
include/common/tdataformat.h
+2
-1
include/common/tmsg.h
include/common/tmsg.h
+6
-0
include/libs/parser/parser.h
include/libs/parser/parser.h
+3
-3
source/client/src/clientSml.c
source/client/src/clientSml.c
+1
-1
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+6
-5
source/client/src/tmq.c
source/client/src/tmq.c
+179
-54
source/common/src/tmsg.c
source/common/src/tmsg.c
+16
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+7
-5
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+21
-17
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+12
-6
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+2
-1
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+1
-1
source/libs/scalar/test/scalar/scalarTests.cpp
source/libs/scalar/test/scalar/scalarTests.cpp
+2
-2
未找到文件。
include/client/taos.h
浏览文件 @
607f8163
...
...
@@ -265,7 +265,9 @@ typedef struct tmq_raw_data tmq_raw_data;
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
tmq_raw_data
*
tmq_get_raw_meta
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
taos_write_raw_meta
(
TAOS
*
taos
,
tmq_raw_data
*
raw_meta
);
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
// Returning null means error. Returned result need to be freed.
DLL_EXPORT
void
tmq_free_raw_meta
(
tmq_raw_data
*
rawMeta
);
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT
void
tmq_free_json_meta
(
char
*
jsonMeta
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_db_name
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_vgroup_id
(
TAOS_RES
*
res
);
...
...
include/common/tdataformat.h
浏览文件 @
607f8163
...
...
@@ -78,7 +78,7 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t
tDecodeTag
(
SDecoder
*
pDecoder
,
STag
**
ppTag
);
int32_t
tTagToValArray
(
const
STag
*
pTag
,
SArray
**
ppArray
);
void
debugPrintSTag
(
STag
*
pTag
,
const
char
*
tag
,
int32_t
ln
);
// TODO: remove
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
);
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
,
const
char
*
colName
);
// STRUCT =================
struct
STColumn
{
...
...
@@ -147,6 +147,7 @@ struct SColVal {
#pragma pack(push, 1)
struct
STagVal
{
char
colName
[
TSDB_COL_NAME_LEN
];
// only used for tmq_get_meta
union
{
int16_t
cid
;
char
*
pKey
;
...
...
include/common/tmsg.h
浏览文件 @
607f8163
...
...
@@ -1915,6 +1915,8 @@ typedef struct SVCreateStbReq {
SSchemaWrapper
schemaRow
;
SSchemaWrapper
schemaTag
;
SRSmaParam
rsmaParam
;
int32_t
alterOriDataLen
;
void
*
alterOriData
;
}
SVCreateStbReq
;
int
tEncodeSVCreateStbReq
(
SEncoder
*
pCoder
,
const
SVCreateStbReq
*
pReq
);
...
...
@@ -1942,6 +1944,7 @@ typedef struct SVCreateTbReq {
int8_t
type
;
union
{
struct
{
char
*
name
;
tb_uid_t
suid
;
uint8_t
*
pTag
;
}
ctb
;
...
...
@@ -1959,6 +1962,7 @@ static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
taosMemoryFreeClear
(
req
->
comment
);
if
(
req
->
type
==
TSDB_CHILD_TABLE
)
{
taosMemoryFreeClear
(
req
->
ctb
.
pTag
);
taosMemoryFreeClear
(
req
->
ctb
.
name
);
}
else
if
(
req
->
type
==
TSDB_NORMAL_TABLE
)
{
taosMemoryFreeClear
(
req
->
ntb
.
schemaRow
.
pSchema
);
}
...
...
@@ -2042,12 +2046,14 @@ typedef struct {
int32_t
bytes
;
// TSDB_ALTER_TABLE_DROP_COLUMN
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
int8_t
colModType
;
int32_t
colModBytes
;
// TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
char
*
colNewName
;
// TSDB_ALTER_TABLE_UPDATE_TAG_VAL
char
*
tagName
;
int8_t
isNull
;
int8_t
tagType
;
uint32_t
nTagVal
;
uint8_t
*
pTagVal
;
// TSDB_ALTER_TABLE_UPDATE_OPTIONS
...
...
include/libs/parser/parser.h
浏览文件 @
607f8163
...
...
@@ -29,7 +29,7 @@ struct SMetaData;
typedef
struct
SStmtCallback
{
TAOS_STMT
*
pStmt
;
int32_t
(
*
getTbNameFn
)(
TAOS_STMT
*
,
char
**
);
int32_t
(
*
setInfoFn
)(
TAOS_STMT
*
,
STableMeta
*
,
void
*
,
char
*
,
bool
,
SHashObj
*
,
SHashObj
*
);
int32_t
(
*
setInfoFn
)(
TAOS_STMT
*
,
STableMeta
*
,
void
*
,
char
*
,
bool
,
SHashObj
*
,
SHashObj
*
,
const
char
*
);
int32_t
(
*
getExecInfoFn
)(
TAOS_STMT
*
,
SHashObj
**
,
SHashObj
**
);
}
SStmtCallback
;
...
...
@@ -84,7 +84,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
int32_t
rowNum
);
int32_t
qBuildStmtColFields
(
void
*
pDataBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
char
*
tName
,
TAOS_MULTI_BIND
*
bind
,
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
c
onst
char
*
sTableName
,
c
har
*
tName
,
TAOS_MULTI_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
);
void
destroyBoundColumnInfo
(
void
*
pBoundInfo
);
int32_t
qCreateSName
(
SName
*
pName
,
const
char
*
pTableName
,
int32_t
acctId
,
char
*
dbName
,
char
*
msgBuf
,
...
...
@@ -93,7 +93,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void
*
smlInitHandle
(
SQuery
*
pQuery
);
void
smlDestroyHandle
(
void
*
pHandle
);
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
);
char
*
tableName
,
c
onst
char
*
sTableName
,
int32_t
sTableNameLen
,
c
har
*
msgBuf
,
int16_t
msgBufLen
);
int32_t
smlBuildOutput
(
void
*
handle
,
SHashObj
*
pVgHash
);
int32_t
rewriteToVnodeModifyOpStmt
(
SQuery
*
pQuery
,
SArray
*
pBufArray
);
...
...
source/client/src/clientSml.c
浏览文件 @
607f8163
...
...
@@ -2256,7 +2256,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
(
*
pMeta
)
->
tableMeta
->
uid
=
tableData
->
uid
;
// one table merge data block together according uid
code
=
smlBindData
(
info
->
exec
,
tableData
->
tags
,
(
*
pMeta
)
->
cols
,
tableData
->
cols
,
info
->
dataFormat
,
(
*
pMeta
)
->
tableMeta
,
tableData
->
childTableName
,
info
->
msgBuf
.
buf
,
info
->
msgBuf
.
len
);
(
*
pMeta
)
->
tableMeta
,
tableData
->
childTableName
,
tableData
->
sTableName
,
tableData
->
sTableNameLen
,
info
->
msgBuf
.
buf
,
info
->
msgBuf
.
len
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" smlBindData failed"
,
info
->
id
);
return
code
;
...
...
source/client/src/clientStmt.c
浏览文件 @
607f8163
...
...
@@ -128,7 +128,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtUpdateBindInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbFName
)
{
int32_t
stmtUpdateBindInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbFName
,
const
char
*
sTableName
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
strncpy
(
pStmt
->
bInfo
.
tbFName
,
tbFName
,
sizeof
(
pStmt
->
bInfo
.
tbFName
)
-
1
);
...
...
@@ -139,6 +139,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
pStmt
->
bInfo
.
tbType
=
pTableMeta
->
tableType
;
pStmt
->
bInfo
.
boundTags
=
tags
;
pStmt
->
bInfo
.
tagsCached
=
false
;
strcpy
(
pStmt
->
bInfo
.
stbFName
,
sTableName
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -154,10 +155,10 @@ int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockH
}
int32_t
stmtUpdateInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbFName
,
bool
autoCreateTbl
,
SHashObj
*
pVgHash
,
SHashObj
*
pBlockHash
)
{
SHashObj
*
pVgHash
,
SHashObj
*
pBlockHash
,
const
char
*
sTableName
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_ERR_RET
(
stmtUpdateBindInfo
(
stmt
,
pTableMeta
,
tags
,
tbFName
));
STMT_ERR_RET
(
stmtUpdateBindInfo
(
stmt
,
pTableMeta
,
tags
,
tbFName
,
sTableName
));
STMT_ERR_RET
(
stmtUpdateExecInfo
(
stmt
,
pVgHash
,
pBlockHash
,
autoCreateTbl
));
pStmt
->
sql
.
autoCreateTbl
=
autoCreateTbl
;
...
...
@@ -247,7 +248,7 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
destroyBoundColumnInfo
(
pStmt
->
bInfo
.
boundTags
);
taosMemoryFreeClear
(
pStmt
->
bInfo
.
boundTags
);
}
memset
(
pStmt
->
bInfo
.
stbFName
,
0
,
TSDB_TABLE_FNAME_LEN
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -568,7 +569,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
STMT_ERR_RET
(
qBindStmtTagsValue
(
*
pDataBlock
,
pStmt
->
bInfo
.
boundTags
,
pStmt
->
bInfo
.
tbSuid
,
pStmt
->
bInfo
.
sname
.
tname
,
STMT_ERR_RET
(
qBindStmtTagsValue
(
*
pDataBlock
,
pStmt
->
bInfo
.
boundTags
,
pStmt
->
bInfo
.
tbSuid
,
pStmt
->
bInfo
.
s
tbFName
,
pStmt
->
bInfo
.
s
name
.
tname
,
tags
,
pStmt
->
exec
.
pRequest
->
msgBuf
,
pStmt
->
exec
.
pRequest
->
msgBufLen
));
return
TSDB_CODE_SUCCESS
;
...
...
source/client/src/tmq.c
浏览文件 @
607f8163
...
...
@@ -1867,10 +1867,10 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch
cJSON
*
type
=
cJSON_CreateString
(
"create"
);
cJSON_AddItemToObject
(
json
,
"type"
,
type
);
char
uid
[
32
]
=
{
0
};
sprintf
(
uid
,
"%"
PRIi64
,
id
);
cJSON
*
id_
=
cJSON_CreateString
(
uid
);
cJSON_AddItemToObject
(
json
,
"id"
,
id_
);
//
char uid[32] = {0};
//
sprintf(uid, "%"PRIi64, id);
//
cJSON* id_ = cJSON_CreateString(uid);
//
cJSON_AddItemToObject(json, "id", id_);
cJSON
*
tableName
=
cJSON_CreateString
(
name
);
cJSON_AddItemToObject
(
json
,
"tableName"
,
tableName
);
cJSON
*
tableType
=
cJSON_CreateString
(
t
==
TSDB_NORMAL_TABLE
?
"normal"
:
"super"
);
...
...
@@ -1925,6 +1925,98 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch
return
string
;
}
static
char
*
buildAlterSTableJson
(
void
*
alterData
,
int32_t
alterDataLen
){
SMAlterStbReq
req
=
{
0
};
cJSON
*
json
=
NULL
;
char
*
string
=
NULL
;
if
(
tDeserializeSMAlterStbReq
(
alterData
,
alterDataLen
,
&
req
)
!=
0
)
{
goto
end
;
}
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
end
;
}
cJSON
*
type
=
cJSON_CreateString
(
"alter"
);
cJSON_AddItemToObject
(
json
,
"type"
,
type
);
// cJSON* uid = cJSON_CreateNumber(id);
// cJSON_AddItemToObject(json, "uid", uid);
SName
name
=
{
0
};
tNameFromString
(
&
name
,
req
.
name
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
cJSON
*
tableName
=
cJSON_CreateString
(
name
.
tname
);
cJSON_AddItemToObject
(
json
,
"tableName"
,
tableName
);
cJSON
*
tableType
=
cJSON_CreateString
(
"super"
);
cJSON_AddItemToObject
(
json
,
"tableType"
,
tableType
);
cJSON
*
alterType
=
cJSON_CreateNumber
(
req
.
alterType
);
cJSON_AddItemToObject
(
json
,
"alterType"
,
alterType
);
switch
(
req
.
alterType
)
{
case
TSDB_ALTER_TABLE_ADD_TAG
:
case
TSDB_ALTER_TABLE_ADD_COLUMN
:
{
TAOS_FIELD
*
field
=
taosArrayGet
(
req
.
pFields
,
0
);
cJSON
*
colName
=
cJSON_CreateString
(
field
->
name
);
cJSON_AddItemToObject
(
json
,
"colName"
,
colName
);
cJSON
*
colType
=
cJSON_CreateNumber
(
field
->
type
);
cJSON_AddItemToObject
(
json
,
"colType"
,
colType
);
if
(
field
->
type
==
TSDB_DATA_TYPE_BINARY
){
int32_t
length
=
field
->
bytes
-
VARSTR_HEADER_SIZE
;
cJSON
*
cbytes
=
cJSON_CreateNumber
(
length
);
cJSON_AddItemToObject
(
json
,
"colLength"
,
cbytes
);
}
else
if
(
field
->
type
==
TSDB_DATA_TYPE_NCHAR
){
int32_t
length
=
(
field
->
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
;
cJSON
*
cbytes
=
cJSON_CreateNumber
(
length
);
cJSON_AddItemToObject
(
json
,
"colLength"
,
cbytes
);
}
break
;
}
case
TSDB_ALTER_TABLE_DROP_TAG
:
case
TSDB_ALTER_TABLE_DROP_COLUMN
:{
TAOS_FIELD
*
field
=
taosArrayGet
(
req
.
pFields
,
0
);
cJSON
*
colName
=
cJSON_CreateString
(
field
->
name
);
cJSON_AddItemToObject
(
json
,
"colName"
,
colName
);
break
;
}
case
TSDB_ALTER_TABLE_UPDATE_TAG_BYTES
:
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:{
TAOS_FIELD
*
field
=
taosArrayGet
(
req
.
pFields
,
0
);
cJSON
*
colName
=
cJSON_CreateString
(
field
->
name
);
cJSON_AddItemToObject
(
json
,
"colName"
,
colName
);
cJSON
*
colType
=
cJSON_CreateNumber
(
field
->
type
);
cJSON_AddItemToObject
(
json
,
"colType"
,
colType
);
if
(
field
->
type
==
TSDB_DATA_TYPE_BINARY
){
int32_t
length
=
field
->
bytes
-
VARSTR_HEADER_SIZE
;
cJSON
*
cbytes
=
cJSON_CreateNumber
(
length
);
cJSON_AddItemToObject
(
json
,
"colLength"
,
cbytes
);
}
else
if
(
field
->
type
==
TSDB_DATA_TYPE_NCHAR
){
int32_t
length
=
(
field
->
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
;
cJSON
*
cbytes
=
cJSON_CreateNumber
(
length
);
cJSON_AddItemToObject
(
json
,
"colLength"
,
cbytes
);
}
break
;
}
case
TSDB_ALTER_TABLE_UPDATE_TAG_NAME
:
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
:{
TAOS_FIELD
*
oldField
=
taosArrayGet
(
req
.
pFields
,
0
);
TAOS_FIELD
*
newField
=
taosArrayGet
(
req
.
pFields
,
1
);
cJSON
*
colName
=
cJSON_CreateString
(
oldField
->
name
);
cJSON_AddItemToObject
(
json
,
"colName"
,
colName
);
cJSON
*
colNewName
=
cJSON_CreateString
(
newField
->
name
);
cJSON_AddItemToObject
(
json
,
"colNewName"
,
colNewName
);
break
;
}
default:
break
;
}
string
=
cJSON_PrintUnformatted
(
json
);
end:
cJSON_Delete
(
json
);
tFreeSMAltertbReq
(
&
req
);
return
string
;
}
static
char
*
processCreateStb
(
SMqMetaRsp
*
metaRsp
){
SVCreateStbReq
req
=
{
0
};
SDecoder
coder
;
...
...
@@ -1947,53 +2039,74 @@ _err:
return
string
;
}
static
char
*
buildCreateCTableJson
(
STag
*
pTag
,
int64_t
sid
,
char
*
name
,
int64_t
id
){
static
char
*
processAlterStb
(
SMqMetaRsp
*
metaRsp
){
SVCreateStbReq
req
=
{
0
};
SDecoder
coder
;
char
*
string
=
NULL
;
// decode and process req
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
int32_t
len
=
metaRsp
->
metaRspLen
-
sizeof
(
SMsgHead
);
tDecoderInit
(
&
coder
,
data
,
len
);
if
(
tDecodeSVCreateStbReq
(
&
coder
,
&
req
)
<
0
)
{
goto
_err
;
}
string
=
buildAlterSTableJson
(
req
.
alterOriData
,
req
.
alterOriDataLen
);
tDecoderClear
(
&
coder
);
return
string
;
_err:
tDecoderClear
(
&
coder
);
return
string
;
}
static
char
*
buildCreateCTableJson
(
STag
*
pTag
,
char
*
sname
,
char
*
name
,
int64_t
id
){
char
*
string
=
NULL
;
SArray
*
pTagVals
=
NULL
;
cJSON
*
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
return
string
;
}
cJSON
*
type
=
cJSON_CreateString
(
"create"
);
cJSON_AddItemToObject
(
json
,
"type"
,
type
);
char
cid
[
32
]
=
{
0
};
sprintf
(
cid
,
"%"
PRIi64
,
id
);
cJSON
*
cid_
=
cJSON_CreateString
(
cid
);
cJSON_AddItemToObject
(
json
,
"id"
,
cid_
);
//
char cid[32] = {0};
//
sprintf(cid, "%"PRIi64, id);
//
cJSON* cid_ = cJSON_CreateString(cid);
//
cJSON_AddItemToObject(json, "id", cid_);
cJSON
*
tableName
=
cJSON_CreateString
(
name
);
cJSON_AddItemToObject
(
json
,
"tableName"
,
tableName
);
cJSON
*
tableType
=
cJSON_CreateString
(
"child"
);
cJSON_AddItemToObject
(
json
,
"tableType"
,
tableType
);
char
sid_
[
32
]
=
{
0
};
sprintf
(
sid_
,
"%"
PRIi64
,
sid
);
cJSON
*
using
=
cJSON_CreateString
(
sid_
);
cJSON
*
using
=
cJSON_CreateString
(
sname
);
cJSON_AddItemToObject
(
json
,
"using"
,
using
);
// cJSON* version = cJSON_CreateNumber(1);
// cJSON_AddItemToObject(json, "version", version);
cJSON
*
tags
=
cJSON_CreateArray
();
int32_t
code
=
tTagToValArray
(
pTag
,
&
pTagVals
);
if
(
code
)
{
goto
end
;
}
if
(
tTagIsJson
(
pTag
))
{
// todo
if
(
tTagIsJson
(
pTag
))
{
STag
*
p
=
(
STag
*
)
pTag
;
if
(
p
->
nTag
==
0
){
goto
end
;
}
char
*
pJson
=
parseTagDatatoJson
(
pTag
);
cJSON
*
tag
=
cJSON_CreateObject
();
cJSON
*
tname
=
cJSON_CreateString
(
"unknown"
);
// todo
STagVal
*
pTagVal
=
taosArrayGet
(
pTagVals
,
0
);
cJSON
*
tname
=
cJSON_CreateString
(
pTagVal
->
colName
);
cJSON_AddItemToObject
(
tag
,
"name"
,
tname
);
cJSON
*
ttype
=
cJSON_CreateNumber
(
TSDB_DATA_TYPE_JSON
);
cJSON_AddItemToObject
(
tag
,
"type"
,
ttype
);
cJSON
*
tvalue
=
cJSON_CreateString
(
pJson
);
cJSON_AddItemToObject
(
tag
,
"value"
,
tvalue
);
cJSON_AddItemToArray
(
tags
,
tag
);
cJSON_AddItemToObject
(
json
,
"tags"
,
tags
);
string
=
cJSON_PrintUnformatted
(
json
);
goto
end
;
}
SArray
*
pTagVals
=
NULL
;
int32_t
code
=
tTagToValArray
(
pTag
,
&
pTagVals
);
if
(
code
)
{
taosMemoryFree
(
pJson
);
goto
end
;
}
...
...
@@ -2001,8 +2114,7 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t
STagVal
*
pTagVal
=
(
STagVal
*
)
taosArrayGet
(
pTagVals
,
i
);
cJSON
*
tag
=
cJSON_CreateObject
();
// cJSON* tname = cJSON_CreateNumber(pTagVal->cid);
cJSON
*
tname
=
cJSON_CreateString
(
"unkonwn"
);
// todo
cJSON
*
tname
=
cJSON_CreateString
(
pTagVal
->
colName
);
cJSON_AddItemToObject
(
tag
,
"name"
,
tname
);
cJSON
*
ttype
=
cJSON_CreateNumber
(
pTagVal
->
type
);
cJSON_AddItemToObject
(
tag
,
"type"
,
ttype
);
...
...
@@ -2021,12 +2133,12 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t
cJSON_AddItemToObject
(
tag
,
"value"
,
tvalue
);
cJSON_AddItemToArray
(
tags
,
tag
);
}
end:
cJSON_AddItemToObject
(
json
,
"tags"
,
tags
);
string
=
cJSON_PrintUnformatted
(
json
);
end:
cJSON_Delete
(
json
);
taosArrayDestroy
(
pTagVals
);
return
string
;
}
...
...
@@ -2047,7 +2159,7 @@ static char *processCreateTable(SMqMetaRsp *metaRsp){
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pCreateReq
=
req
.
pReqs
+
iReq
;
if
(
pCreateReq
->
type
==
TSDB_CHILD_TABLE
){
string
=
buildCreateCTableJson
((
STag
*
)
pCreateReq
->
ctb
.
pTag
,
pCreateReq
->
ctb
.
suid
,
pCreateReq
->
name
,
pCreateReq
->
uid
);
string
=
buildCreateCTableJson
((
STag
*
)
pCreateReq
->
ctb
.
pTag
,
pCreateReq
->
ctb
.
name
,
pCreateReq
->
name
,
pCreateReq
->
uid
);
}
else
if
(
pCreateReq
->
type
==
TSDB_NORMAL_TABLE
){
string
=
buildCreateTableJson
(
&
pCreateReq
->
ntb
.
schemaRow
,
NULL
,
pCreateReq
->
name
,
pCreateReq
->
uid
,
TSDB_NORMAL_TABLE
);
}
...
...
@@ -2085,11 +2197,11 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
cJSON_AddItemToObject
(
json
,
"tableName"
,
tableName
);
cJSON
*
tableType
=
cJSON_CreateString
(
vAlterTbReq
.
action
==
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
?
"child"
:
"normal"
);
cJSON_AddItemToObject
(
json
,
"tableType"
,
tableType
);
cJSON
*
alterType
=
cJSON_CreateNumber
(
vAlterTbReq
.
action
);
cJSON_AddItemToObject
(
json
,
"alterType"
,
alterType
);
switch
(
vAlterTbReq
.
action
)
{
case
TSDB_ALTER_TABLE_ADD_COLUMN
:
{
cJSON
*
alterType
=
cJSON_CreateNumber
(
TSDB_ALTER_TABLE_ADD_COLUMN
);
cJSON_AddItemToObject
(
json
,
"alterType"
,
alterType
);
cJSON
*
colName
=
cJSON_CreateString
(
vAlterTbReq
.
colName
);
cJSON_AddItemToObject
(
json
,
"colName"
,
colName
);
cJSON
*
colType
=
cJSON_CreateNumber
(
vAlterTbReq
.
type
);
...
...
@@ -2107,33 +2219,27 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
break
;
}
case
TSDB_ALTER_TABLE_DROP_COLUMN
:{
cJSON
*
alterType
=
cJSON_CreateNumber
(
TSDB_ALTER_TABLE_DROP_COLUMN
);
cJSON_AddItemToObject
(
json
,
"alterType"
,
alterType
);
cJSON
*
colName
=
cJSON_CreateString
(
vAlterTbReq
.
colName
);
cJSON_AddItemToObject
(
json
,
"colName"
,
colName
);
break
;
}
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:{
cJSON
*
alterType
=
cJSON_CreateNumber
(
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
);
cJSON_AddItemToObject
(
json
,
"alterType"
,
alterType
);
cJSON
*
colName
=
cJSON_CreateString
(
vAlterTbReq
.
colName
);
cJSON_AddItemToObject
(
json
,
"colName"
,
colName
);
cJSON
*
colType
=
cJSON_CreateNumber
(
vAlterTbReq
.
t
ype
);
cJSON
*
colType
=
cJSON_CreateNumber
(
vAlterTbReq
.
colModT
ype
);
cJSON_AddItemToObject
(
json
,
"colType"
,
colType
);
if
(
vAlterTbReq
.
t
ype
==
TSDB_DATA_TYPE_BINARY
){
int32_t
length
=
vAlterTbReq
.
b
ytes
-
VARSTR_HEADER_SIZE
;
if
(
vAlterTbReq
.
colModT
ype
==
TSDB_DATA_TYPE_BINARY
){
int32_t
length
=
vAlterTbReq
.
colModB
ytes
-
VARSTR_HEADER_SIZE
;
cJSON
*
cbytes
=
cJSON_CreateNumber
(
length
);
cJSON_AddItemToObject
(
json
,
"colLength"
,
cbytes
);
}
else
if
(
vAlterTbReq
.
t
ype
==
TSDB_DATA_TYPE_NCHAR
){
int32_t
length
=
(
vAlterTbReq
.
b
ytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
;
}
else
if
(
vAlterTbReq
.
colModT
ype
==
TSDB_DATA_TYPE_NCHAR
){
int32_t
length
=
(
vAlterTbReq
.
colModB
ytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
;
cJSON
*
cbytes
=
cJSON_CreateNumber
(
length
);
cJSON_AddItemToObject
(
json
,
"colLength"
,
cbytes
);
}
break
;
}
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
:{
cJSON
*
alterType
=
cJSON_CreateNumber
(
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
);
cJSON_AddItemToObject
(
json
,
"alterType"
,
alterType
);
cJSON
*
colName
=
cJSON_CreateString
(
vAlterTbReq
.
colName
);
cJSON_AddItemToObject
(
json
,
"colName"
,
colName
);
cJSON
*
colNewName
=
cJSON_CreateString
(
vAlterTbReq
.
colNewName
);
...
...
@@ -2141,12 +2247,25 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
break
;
}
case
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
:{
cJSON
*
alterType
=
cJSON_CreateNumber
(
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
);
cJSON_AddItemToObject
(
json
,
"alterType"
,
alterType
);
cJSON
*
tagName
=
cJSON_CreateString
(
vAlterTbReq
.
tagName
);
cJSON_AddItemToObject
(
json
,
"colName"
,
tagName
);
cJSON
*
colValue
=
cJSON_CreateString
(
"invalid, todo"
);
// todo
cJSON_AddItemToObject
(
json
,
"colValue"
,
colValue
);
if
(
!
vAlterTbReq
.
isNull
){
char
*
buf
=
NULL
;
if
(
vAlterTbReq
.
tagType
==
TSDB_DATA_TYPE_JSON
)
{
ASSERT
(
tTagIsJson
(
vAlterTbReq
.
pTagVal
)
==
true
);
buf
=
parseTagDatatoJson
(
vAlterTbReq
.
pTagVal
);
}
else
{
buf
=
taosMemoryCalloc
(
vAlterTbReq
.
nTagVal
+
1
,
1
);
dataConverToStr
(
buf
,
vAlterTbReq
.
tagType
,
vAlterTbReq
.
pTagVal
,
vAlterTbReq
.
nTagVal
,
NULL
);
}
cJSON
*
colValue
=
cJSON_CreateString
(
buf
);
cJSON_AddItemToObject
(
json
,
"colValue"
,
colValue
);
taosMemoryFree
(
buf
);
}
cJSON
*
isNull
=
cJSON_CreateBool
(
vAlterTbReq
.
isNull
);
cJSON_AddItemToObject
(
json
,
"colValueNull"
,
isNull
);
break
;
...
...
@@ -2180,10 +2299,6 @@ static char *processDropSTable(SMqMetaRsp *metaRsp){
}
cJSON
*
type
=
cJSON_CreateString
(
"drop"
);
cJSON_AddItemToObject
(
json
,
"type"
,
type
);
char
uid
[
32
]
=
{
0
};
sprintf
(
uid
,
"%"
PRIi64
,
req
.
suid
);
cJSON
*
id
=
cJSON_CreateString
(
uid
);
cJSON_AddItemToObject
(
json
,
"id"
,
id
);
cJSON
*
tableName
=
cJSON_CreateString
(
req
.
name
);
cJSON_AddItemToObject
(
json
,
"tableName"
,
tableName
);
cJSON
*
tableType
=
cJSON_CreateString
(
"super"
);
...
...
@@ -2224,7 +2339,7 @@ static char *processDropTable(SMqMetaRsp *metaRsp){
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
SVDropTbReq
*
pDropTbReq
=
req
.
pReqs
+
iReq
;
cJSON
*
tableName
=
cJSON_CreateString
(
pDropTbReq
->
name
);
// todo
cJSON
*
tableName
=
cJSON_CreateString
(
pDropTbReq
->
name
);
cJSON_AddItemToArray
(
tableNameList
,
tableName
);
}
cJSON_AddItemToObject
(
json
,
"tableNameList"
,
tableNameList
);
...
...
@@ -2245,7 +2360,7 @@ char *tmq_get_json_meta(TAOS_RES *res){
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_CREATE_STB
){
return
processCreateStb
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_ALTER_STB
){
return
process
Create
Stb
(
&
pMetaRspObj
->
metaRsp
);
return
process
Alter
Stb
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_DROP_STB
){
return
processDropSTable
(
&
pMetaRspObj
->
metaRsp
);
}
else
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_CREATE_TABLE
){
...
...
@@ -2258,6 +2373,10 @@ char *tmq_get_json_meta(TAOS_RES *res){
return
NULL
;
}
void
tmq_free_json_meta
(
char
*
jsonMeta
){
taosMemoryFreeClear
(
jsonMeta
);
}
static
int32_t
taosCreateStb
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
){
SVCreateStbReq
req
=
{
0
};
SDecoder
coder
;
...
...
@@ -2310,6 +2429,7 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
pReq
.
commentLen
=
-
1
;
pReq
.
suid
=
req
.
suid
;
pReq
.
source
=
1
;
pReq
.
igExists
=
true
;
SName
tableName
;
tNameExtractFullName
(
toName
(
pTscObj
->
acctId
,
pRequest
->
pDb
,
req
.
name
,
&
tableName
),
pReq
.
name
);
...
...
@@ -2747,6 +2867,7 @@ end:
}
int32_t
taos_write_raw_meta
(
TAOS
*
taos
,
tmq_raw_data
*
raw_meta
){
return
0
;
if
(
!
taos
||
!
raw_meta
)
{
return
TSDB_CODE_INVALID_PARA
;
}
...
...
@@ -2767,6 +2888,10 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){
return
TSDB_CODE_INVALID_PARA
;
}
void
tmq_free_raw_meta
(
tmq_raw_data
*
rawMeta
){
taosMemoryFreeClear
(
rawMeta
);
}
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
tmqCommitInner2
(
tmq
,
msg
,
0
,
1
,
cb
,
param
);
}
...
...
source/common/src/tmsg.c
浏览文件 @
607f8163
...
...
@@ -4803,6 +4803,11 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
if
(
tEncodeSRSmaParam
(
pCoder
,
&
pReq
->
rsmaParam
)
<
0
)
return
-
1
;
}
if
(
tEncodeI32
(
pCoder
,
pReq
->
alterOriDataLen
)
<
0
)
return
-
1
;
if
(
pReq
->
alterOriDataLen
>
0
)
{
if
(
tEncodeBinary
(
pCoder
,
pReq
->
alterOriData
,
pReq
->
alterOriDataLen
)
<
0
)
return
-
1
;
}
tEndEncode
(
pCoder
);
return
0
;
}
...
...
@@ -4819,6 +4824,11 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
if
(
tDecodeSRSmaParam
(
pCoder
,
&
pReq
->
rsmaParam
)
<
0
)
return
-
1
;
}
if
(
tDecodeI32
(
pCoder
,
&
pReq
->
alterOriDataLen
)
<
0
)
return
-
1
;
if
(
pReq
->
alterOriDataLen
>
0
)
{
if
(
tDecodeBinary
(
pCoder
,
(
uint8_t
**
)
&
pReq
->
alterOriData
,
NULL
)
<
0
)
return
-
1
;
}
tEndDecode
(
pCoder
);
return
0
;
}
...
...
@@ -4862,6 +4872,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
}
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
tEncodeCStr
(
pCoder
,
pReq
->
ctb
.
name
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tEncodeTag
(
pCoder
,
(
const
STag
*
)
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
}
else
if
(
pReq
->
type
==
TSDB_NORMAL_TABLE
)
{
...
...
@@ -4891,6 +4902,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
}
if
(
pReq
->
type
==
TSDB_CHILD_TABLE
)
{
if
(
tDecodeCStr
(
pCoder
,
&
pReq
->
ctb
.
name
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
ctb
.
suid
)
<
0
)
return
-
1
;
if
(
tDecodeTag
(
pCoder
,
(
STag
**
)
&
pReq
->
ctb
.
pTag
)
<
0
)
return
-
1
;
}
else
if
(
pReq
->
type
==
TSDB_NORMAL_TABLE
)
{
...
...
@@ -5224,6 +5236,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:
if
(
tEncodeCStr
(
pEncoder
,
pReq
->
colName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pReq
->
colModType
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pEncoder
,
pReq
->
colModBytes
)
<
0
)
return
-
1
;
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
:
...
...
@@ -5233,6 +5246,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
case
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
:
if
(
tEncodeCStr
(
pEncoder
,
pReq
->
tagName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pReq
->
isNull
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pReq
->
tagType
)
<
0
)
return
-
1
;
if
(
!
pReq
->
isNull
)
{
if
(
tEncodeBinary
(
pEncoder
,
pReq
->
pTagVal
,
pReq
->
nTagVal
)
<
0
)
return
-
1
;
}
...
...
@@ -5272,6 +5286,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
:
if
(
tDecodeCStr
(
pDecoder
,
&
pReq
->
colName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
colModType
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pReq
->
colModBytes
)
<
0
)
return
-
1
;
break
;
case
TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
:
...
...
@@ -5281,6 +5296,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
case
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
:
if
(
tDecodeCStr
(
pDecoder
,
&
pReq
->
tagName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
isNull
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pReq
->
tagType
)
<
0
)
return
-
1
;
if
(
!
pReq
->
isNull
)
{
if
(
tDecodeBinary
(
pDecoder
,
&
pReq
->
pTagVal
,
&
pReq
->
nTagVal
)
<
0
)
return
-
1
;
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
607f8163
...
...
@@ -409,7 +409,7 @@ static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *
return
0
;
}
static
void
*
mndBuildVCreateStbReq
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SStbObj
*
pStb
,
int32_t
*
pContLen
)
{
static
void
*
mndBuildVCreateStbReq
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SStbObj
*
pStb
,
int32_t
*
pContLen
,
void
*
alterOriData
,
int32_t
alterOriDataLen
)
{
SEncoder
encoder
=
{
0
};
int32_t
contLen
;
SName
name
=
{
0
};
...
...
@@ -422,6 +422,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req
.
name
=
(
char
*
)
tNameGetTableName
(
&
name
);
req
.
suid
=
pStb
->
uid
;
req
.
rollup
=
pStb
->
ast1Len
>
0
?
1
:
0
;
req
.
alterOriData
=
alterOriData
;
req
.
alterOriDataLen
=
alterOriDataLen
;
// todo
req
.
schemaRow
.
nCols
=
pStb
->
numOfColumns
;
req
.
schemaRow
.
version
=
pStb
->
colVer
;
...
...
@@ -626,7 +628,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
continue
;
}
void
*
pReq
=
mndBuildVCreateStbReq
(
pMnode
,
pVgroup
,
pStb
,
&
contLen
);
void
*
pReq
=
mndBuildVCreateStbReq
(
pMnode
,
pVgroup
,
pStb
,
&
contLen
,
NULL
,
0
);
if
(
pReq
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
...
...
@@ -1278,7 +1280,7 @@ static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *
return
0
;
}
static
int32_t
mndSetAlterStbRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
static
int32_t
mndSetAlterStbRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SStbObj
*
pStb
,
void
*
alterOriData
,
int32_t
alterOriDataLen
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
...
...
@@ -1292,7 +1294,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
continue
;
}
void
*
pReq
=
mndBuildVCreateStbReq
(
pMnode
,
pVgroup
,
pStb
,
&
contLen
);
void
*
pReq
=
mndBuildVCreateStbReq
(
pMnode
,
pVgroup
,
pStb
,
&
contLen
,
alterOriData
,
alterOriDataLen
);
if
(
pReq
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
...
...
@@ -1575,7 +1577,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
if
(
mndSetAlterStbRedoLogs
(
pMnode
,
pTrans
,
pDb
,
&
stbObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterStbCommitLogs
(
pMnode
,
pTrans
,
pDb
,
&
stbObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterStbRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
stbObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterStbRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
stbObj
,
pReq
->
pCont
,
pReq
->
contLen
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
607f8163
...
...
@@ -71,6 +71,7 @@ typedef struct SInsertParseContext {
SVnodeModifOpStmt
*
pOutput
;
SStmtCallback
*
pStmtCb
;
SParseMetaCache
*
pMetaCache
;
char
sTableName
[
TSDB_TABLE_NAME_LEN
];
}
SInsertParseContext
;
typedef
struct
SInsertParseSyntaxCxt
{
...
...
@@ -811,10 +812,11 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return
TSDB_CODE_SUCCESS
;
}
static
void
buildCreateTbReq
(
SVCreateTbReq
*
pTbReq
,
const
char
*
tname
,
STag
*
pTag
,
int64_t
suid
)
{
static
void
buildCreateTbReq
(
SVCreateTbReq
*
pTbReq
,
const
char
*
tname
,
STag
*
pTag
,
int64_t
suid
,
const
char
*
sname
)
{
pTbReq
->
type
=
TD_CHILD_TABLE
;
pTbReq
->
name
=
strdup
(
tname
);
pTbReq
->
ctb
.
suid
=
suid
;
if
(
sname
)
pTbReq
->
ctb
.
name
=
strdup
(
sname
);
pTbReq
->
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
pTbReq
->
commentLen
=
-
1
;
...
...
@@ -835,6 +837,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16
return
TSDB_CODE_SUCCESS
;
}
strcpy
(
val
->
colName
,
pSchema
->
name
);
val
->
cid
=
pSchema
->
colId
;
val
->
type
=
pSchema
->
type
;
...
...
@@ -1051,7 +1054,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
if
(
isNullStr
(
&
sToken
))
{
code
=
tTagNew
(
pTagVals
,
1
,
true
,
&
pTag
);
}
else
{
code
=
parseJsontoTagData
(
sToken
.
z
,
pTagVals
,
&
pTag
,
&
pCxt
->
msg
);
code
=
parseJsontoTagData
(
sToken
.
z
,
pTagVals
,
&
pTag
,
&
pCxt
->
msg
,
pTagSchema
->
name
);
}
taosMemoryFree
(
tmpTokenBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1081,7 +1084,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
goto
end
;
}
buildCreateTbReq
(
&
pCxt
->
createTblReq
,
tName
,
pTag
,
pCxt
->
pTableMeta
->
suid
);
buildCreateTbReq
(
&
pCxt
->
createTblReq
,
tName
,
pTag
,
pCxt
->
pTableMeta
->
suid
,
pCxt
->
sTableName
);
end:
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagVals
);
++
i
)
{
...
...
@@ -1166,6 +1169,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
createSName
(
&
sname
,
&
sToken
,
pCxt
->
pComCxt
->
acctId
,
pCxt
->
pComCxt
->
db
,
&
pCxt
->
msg
);
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
&
sname
,
dbFName
);
strcpy
(
pCxt
->
sTableName
,
sname
.
tname
);
CHECK_CODE
(
getSTableMeta
(
pCxt
,
&
sname
,
dbFName
));
if
(
TSDB_SUPER_TABLE
!=
pCxt
->
pTableMeta
->
tableType
)
{
...
...
@@ -1402,15 +1406,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
return
TSDB_CODE_SUCCESS
;
}
void
destroyCreateSubTbReq
(
SVCreateTbReq
*
pReq
)
{
taosMemoryFreeClear
(
pReq
->
name
);
taosMemoryFreeClear
(
pReq
->
ctb
.
pTag
);
}
static
void
destroyInsertParseContextForTable
(
SInsertParseContext
*
pCxt
)
{
taosMemoryFreeClear
(
pCxt
->
pTableMeta
);
destroyBoundColumnInfo
(
&
pCxt
->
tags
);
destroyCreateSub
TbReq
(
&
pCxt
->
createTblReq
);
tdDestroySVCreate
TbReq
(
&
pCxt
->
createTblReq
);
}
static
void
destroySubTableHashElem
(
void
*
p
)
{
taosMemoryFree
(
*
(
STableMeta
**
)
p
);
}
...
...
@@ -1556,7 +1555,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
}
memcpy
(
tags
,
&
pCxt
->
tags
,
sizeof
(
pCxt
->
tags
));
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pCxt
->
pTableMeta
,
tags
,
tbFName
,
autoCreateTbl
,
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
);
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
,
pCxt
->
sTableName
);
memset
(
&
pCxt
->
tags
,
0
,
sizeof
(
pCxt
->
tags
));
pCxt
->
pVgroupsHashObj
=
NULL
;
...
...
@@ -1865,7 +1864,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
char
*
tName
,
TAOS_MULTI_BIND
*
bind
,
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
c
onst
char
*
sTableName
,
c
har
*
tName
,
TAOS_MULTI_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
...
...
@@ -1904,13 +1903,14 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
isJson
=
true
;
char
*
tmp
=
taosMemoryCalloc
(
1
,
colLen
+
1
);
memcpy
(
tmp
,
bind
[
c
].
buffer
,
colLen
);
code
=
parseJsontoTagData
(
tmp
,
pTagArray
,
&
pTag
,
&
pBuf
);
code
=
parseJsontoTagData
(
tmp
,
pTagArray
,
&
pTag
,
&
pBuf
,
pTagSchema
->
name
);
taosMemoryFree
(
tmp
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
}
else
{
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
strcpy
(
val
.
colName
,
pTagSchema
->
name
);
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
val
.
pData
=
(
uint8_t
*
)
bind
[
c
].
buffer
;
val
.
nData
=
colLen
;
...
...
@@ -1947,9 +1947,9 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
}
SVCreateTbReq
tbReq
=
{
0
};
buildCreateTbReq
(
&
tbReq
,
tName
,
pTag
,
suid
);
buildCreateTbReq
(
&
tbReq
,
tName
,
pTag
,
suid
,
sTableName
);
code
=
buildCreateTbMsg
(
pDataBlock
,
&
tbReq
);
destroyCreateSub
TbReq
(
&
tbReq
);
tdDestroySVCreate
TbReq
(
&
tbReq
);
end:
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagArray
);
++
i
)
{
...
...
@@ -2213,7 +2213,7 @@ typedef struct SmlExecHandle {
static
void
smlDestroyTableHandle
(
void
*
pHandle
)
{
SmlExecTableHandle
*
handle
=
(
SmlExecTableHandle
*
)
pHandle
;
destroyBoundColumnInfo
(
&
handle
->
tags
);
destroyCreateSub
TbReq
(
&
handle
->
createTblReq
);
tdDestroySVCreate
TbReq
(
&
handle
->
createTblReq
);
}
static
int32_t
smlBoundColumnData
(
SArray
*
cols
,
SParsedDataColInfo
*
pColList
,
SSchema
*
pSchema
)
{
...
...
@@ -2311,6 +2311,7 @@ static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* p
SSmlKv
*
kv
=
taosArrayGetP
(
cols
,
i
);
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
strcpy
(
val
.
colName
,
pTagSchema
->
name
);
if
(
pTagSchema
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
val
.
pData
=
(
uint8_t
*
)
kv
->
value
;
val
.
nData
=
kv
->
length
;
...
...
@@ -2354,7 +2355,7 @@ end:
}
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
)
{
char
*
tableName
,
c
onst
char
*
sTableName
,
int32_t
sTableNameLen
,
c
har
*
msgBuf
,
int16_t
msgBufLen
)
{
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SSmlExecHandle
*
smlHandle
=
(
SSmlExecHandle
*
)
handle
;
...
...
@@ -2372,7 +2373,10 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
return
ret
;
}
buildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
);
buildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
,
NULL
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
=
taosMemoryMalloc
(
sTableNameLen
+
1
);
memcpy
(
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
,
sTableName
,
sTableNameLen
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
[
sTableNameLen
]
=
0
;
STableDataBlocks
*
pDataBlock
=
NULL
;
ret
=
getDataBlockFromList
(
smlHandle
->
pBlockHash
,
&
pTableMeta
->
uid
,
sizeof
(
pTableMeta
->
uid
),
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
607f8163
...
...
@@ -5369,7 +5369,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
}
static
void
addCreateTbReqIntoVgroup
(
int32_t
acctId
,
SHashObj
*
pVgroupHashmap
,
SCreateSubTableClause
*
pStmt
,
const
STag
*
pTag
,
uint64_t
suid
,
SVgroupInfo
*
pVgInfo
)
{
const
STag
*
pTag
,
uint64_t
suid
,
const
char
*
sTableNmae
,
SVgroupInfo
*
pVgInfo
)
{
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
// strcpy(name.dbname, pStmt->dbName);
...
...
@@ -5386,6 +5386,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
req
.
commentLen
=
-
1
;
}
req
.
ctb
.
suid
=
suid
;
req
.
ctb
.
name
=
strdup
(
sTableNmae
);
req
.
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
if
(
pStmt
->
ignoreExists
)
{
req
.
flags
|=
TD_CREATE_IF_NOT_EXISTS
;
...
...
@@ -5471,13 +5472,14 @@ static int32_t buildJsonTagVal(STranslateContext* pCxt, SSchema* pTagSchema, SVa
return
buildSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
"json string too long than 4095"
,
pVal
->
literal
);
}
return
parseJsontoTagData
(
pVal
->
literal
,
pTagArray
,
ppTag
,
&
pCxt
->
msgBuf
);
return
parseJsontoTagData
(
pVal
->
literal
,
pTagArray
,
ppTag
,
&
pCxt
->
msgBuf
,
pTagSchema
->
name
);
}
static
int32_t
buildNormalTagVal
(
STranslateContext
*
pCxt
,
SSchema
*
pTagSchema
,
SValueNode
*
pVal
,
SArray
*
pTagArray
)
{
if
(
pVal
->
node
.
resType
.
type
!=
TSDB_DATA_TYPE_NULL
)
{
void
*
nodeVal
=
nodesGetValueFromNode
(
pVal
);
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
strcpy
(
val
.
colName
,
pTagSchema
->
name
);
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
val
.
pData
=
varDataVal
(
nodeVal
);
val
.
nData
=
varDataLen
(
nodeVal
);
...
...
@@ -5571,6 +5573,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
}
else
if
(
pVal
->
node
.
resType
.
type
!=
TSDB_DATA_TYPE_NULL
&&
!
pVal
->
isNull
)
{
char
*
tmpVal
=
nodesGetValueFromNode
(
pVal
);
STagVal
val
=
{.
cid
=
pTagSchema
->
colId
,
.
type
=
pTagSchema
->
type
};
strcpy
(
val
.
colName
,
pTagSchema
->
name
);
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
val
.
pData
=
varDataVal
(
tmpVal
);
val
.
nData
=
varDataLen
(
tmpVal
);
...
...
@@ -5627,7 +5630,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
code
=
getTableHashVgroup
(
pCxt
,
pStmt
->
dbName
,
pStmt
->
tableName
,
&
info
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
addCreateTbReqIntoVgroup
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
,
pStmt
,
pTag
,
pSuperTableMeta
->
uid
,
&
info
);
addCreateTbReqIntoVgroup
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
,
pStmt
,
pTag
,
pSuperTableMeta
->
uid
,
pStmt
->
useTableName
,
&
info
);
}
taosMemoryFreeClear
(
pSuperTableMeta
);
...
...
@@ -5845,8 +5848,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
}
pReq
->
isNull
=
(
TSDB_DATA_TYPE_NULL
==
pStmt
->
pVal
->
node
.
resType
.
type
);
pReq
->
tagType
=
targetDt
.
type
;
if
(
targetDt
.
type
==
TSDB_DATA_TYPE_JSON
)
{
pReq
->
isNull
=
0
;
if
(
pStmt
->
pVal
->
literal
&&
strlen
(
pStmt
->
pVal
->
literal
)
>
(
TSDB_MAX_JSON_TAG_LEN
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
"json string too long than 4095"
,
pStmt
->
pVal
->
literal
);
...
...
@@ -5855,7 +5858,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
int32_t
code
=
TSDB_CODE_SUCCESS
;
STag
*
pTag
=
NULL
;
do
{
code
=
parseJsontoTagData
(
pStmt
->
pVal
->
literal
,
pTagVals
,
&
pTag
,
&
pCxt
->
msgBuf
);
code
=
parseJsontoTagData
(
pStmt
->
pVal
->
literal
,
pTagVals
,
&
pTag
,
&
pCxt
->
msgBuf
,
pReq
->
tagName
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
...
...
@@ -5870,6 +5873,9 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
pTag
->
nTag
==
0
){
pReq
->
isNull
=
true
;
}
pReq
->
nTagVal
=
pTag
->
len
;
pReq
->
pTagVal
=
(
uint8_t
*
)
pTag
;
pStmt
->
pVal
->
datum
.
p
=
(
char
*
)
pTag
;
// for free
...
...
@@ -5927,7 +5933,7 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
static
int32_t
buildUpdateColReq
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
STableMeta
*
pTableMeta
,
SVAlterTbReq
*
pReq
)
{
pReq
->
colModBytes
=
calcTypeBytes
(
pStmt
->
dataType
);
pReq
->
colModType
=
pStmt
->
dataType
.
type
;
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
if
(
NULL
==
pSchema
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
607f8163
...
...
@@ -341,7 +341,7 @@ static bool isValidateTag(char* input) {
return
true
;
}
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
)
{
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
,
const
char
*
colName
)
{
int32_t
retCode
=
TSDB_CODE_SUCCESS
;
cJSON
*
root
=
NULL
;
SHashObj
*
keyHash
=
NULL
;
...
...
@@ -389,6 +389,7 @@ int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, voi
continue
;
}
STagVal
val
=
{
0
};
strcpy
(
val
.
colName
,
colName
);
val
.
pKey
=
jsonKey
;
taosHashPut
(
keyHash
,
jsonKey
,
keyLen
,
&
keyLen
,
CHAR_BYTES
);
// add key to hash to remove dumplicate, value is useless
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
607f8163
...
...
@@ -1150,7 +1150,7 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
}
memcpy
(
tmp
,
varDataVal
(
input
),
varDataLen
(
input
));
tmp
[
varDataLen
(
input
)]
=
0
;
if
(
parseJsontoTagData
(
tmp
,
pTagVals
,
&
pTag
,
NULL
)){
if
(
parseJsontoTagData
(
tmp
,
pTagVals
,
&
pTag
,
NULL
,
""
)){
tTagNew
(
pTagVals
,
1
,
true
,
&
pTag
);
}
}
...
...
source/libs/scalar/test/scalar/scalarTests.cpp
浏览文件 @
607f8163
...
...
@@ -1114,7 +1114,7 @@ TEST(columnTest, json_column_arith_op) {
memcpy
(
rightv
,
rightvTmp
,
strlen
(
rightvTmp
));
SArray
*
tags
=
taosArrayInit
(
1
,
sizeof
(
STagVal
));
STag
*
row
=
NULL
;
parseJsontoTagData
(
rightv
,
tags
,
&
row
,
NULL
);
parseJsontoTagData
(
rightv
,
tags
,
&
row
,
NULL
,
""
);
const
int32_t
len
=
8
;
EOperatorType
op
[
len
]
=
{
OP_TYPE_ADD
,
OP_TYPE_SUB
,
OP_TYPE_MULTI
,
OP_TYPE_DIV
,
...
...
@@ -1262,7 +1262,7 @@ TEST(columnTest, json_column_logic_op) {
memcpy
(
rightv
,
rightvTmp
,
strlen
(
rightvTmp
));
SArray
*
tags
=
taosArrayInit
(
1
,
sizeof
(
STagVal
));
STag
*
row
=
NULL
;
parseJsontoTagData
(
rightv
,
tags
,
&
row
,
NULL
);
parseJsontoTagData
(
rightv
,
tags
,
&
row
,
NULL
,
""
);
const
int32_t
len0
=
6
;
const
int32_t
len
=
9
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录