Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c7efb35b
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c7efb35b
编写于
12月 03, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'refact/submit_req' of
https://github.com/taosdata/TDengine
into refact/submit_req
上级
4ecfdfd1
da842b57
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
154 addition
and
215 deletion
+154
-215
include/libs/parser/parser.h
include/libs/parser/parser.h
+3
-4
source/client/src/clientSml.c
source/client/src/clientSml.c
+8
-24
source/libs/parser/inc/parInsertUtil.h
source/libs/parser/inc/parInsertUtil.h
+2
-1
source/libs/parser/src/parInsertSml.c
source/libs/parser/src/parInsertSml.c
+136
-173
source/libs/parser/src/parInsertSql.c
source/libs/parser/src/parInsertSql.c
+0
-2
source/libs/parser/src/parInsertStmt.c
source/libs/parser/src/parInsertStmt.c
+0
-7
source/libs/parser/src/parInsertUtil.c
source/libs/parser/src/parInsertUtil.c
+5
-4
未找到文件。
include/libs/parser/parser.h
浏览文件 @
c7efb35b
...
...
@@ -106,11 +106,10 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void
qDestroyBoundColInfo
(
void
*
pInfo
);
void
*
smlInitHandle
(
SQuery
*
pQuery
);
void
smlDestroyHandle
(
void
*
pHandle
);
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
SQuery
*
smlInitHandle
();
int32_t
smlBindData
(
SQuery
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
const
char
*
sTableName
,
int32_t
sTableNameLen
,
int32_t
ttl
,
char
*
msgBuf
,
int16_t
msgBufLen
);
int32_t
smlBuildOutput
(
void
*
handle
,
SHashObj
*
pVgHash
);
int32_t
smlBuildOutput
(
SQuery
*
handle
,
SHashObj
*
pVgHash
);
int32_t
rewriteToVnodeModifyOpStmt
(
SQuery
*
pQuery
,
SArray
*
pBufArray
);
SArray
*
serializeVgroupsCreateTableBatch
(
SHashObj
*
pVgroupHashmap
);
...
...
source/client/src/clientSml.c
浏览文件 @
c7efb35b
...
...
@@ -170,7 +170,6 @@ typedef struct {
SHashObj
*
childTables
;
SHashObj
*
superTables
;
SHashObj
*
pVgHash
;
void
*
exec
;
STscObj
*
taos
;
SCatalog
*
pCatalog
;
...
...
@@ -712,21 +711,21 @@ static bool smlParseBool(SSmlKv *kvVal) {
const
char
*
pVal
=
kvVal
->
value
;
int32_t
len
=
kvVal
->
length
;
if
((
len
==
1
)
&&
(
pVal
[
0
]
==
't'
||
pVal
[
0
]
==
'T'
))
{
kvVal
->
i
=
true
;
kvVal
->
i
=
TSDB_TRUE
;
return
true
;
}
if
((
len
==
1
)
&&
(
pVal
[
0
]
==
'f'
||
pVal
[
0
]
==
'F'
))
{
kvVal
->
i
=
false
;
kvVal
->
i
=
TSDB_FALSE
;
return
true
;
}
if
((
len
==
4
)
&&
!
strncasecmp
(
pVal
,
"true"
,
len
))
{
kvVal
->
i
=
true
;
kvVal
->
i
=
TSDB_TRUE
;
return
true
;
}
if
((
len
==
5
)
&&
!
strncasecmp
(
pVal
,
"false"
,
len
))
{
kvVal
->
i
=
false
;
kvVal
->
i
=
TSDB_FALSE
;
return
true
;
}
return
false
;
...
...
@@ -1488,7 +1487,6 @@ static void smlDestroyCols(SArray *cols) {
static
void
smlDestroyInfo
(
SSmlHandle
*
info
)
{
if
(
!
info
)
return
;
qDestroyQuery
(
info
->
pQuery
);
smlDestroyHandle
(
info
->
exec
);
// destroy info->childTables
void
**
p1
=
(
void
**
)
taosHashIterate
(
info
->
childTables
,
NULL
);
...
...
@@ -1526,19 +1524,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
}
info
->
id
=
smlGenId
();
info
->
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
if
(
NULL
==
info
->
pQuery
)
{
uError
(
"SML:0x%"
PRIx64
" create info->pQuery error"
,
info
->
id
);
goto
cleanup
;
}
info
->
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
info
->
pQuery
->
haveResultSet
=
false
;
info
->
pQuery
->
msgType
=
TDMT_VND_SUBMIT
;
info
->
pQuery
->
pRoot
=
(
SNode
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
if
(
NULL
==
info
->
pQuery
->
pRoot
)
{
uError
(
"SML:0x%"
PRIx64
" create info->pQuery->pRoot error"
,
info
->
id
);
goto
cleanup
;
}
info
->
pQuery
=
smlInitHandle
();
if
(
pTscObj
)
{
info
->
taos
=
pTscObj
;
...
...
@@ -1561,10 +1547,8 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
info
->
pRequest
=
request
;
info
->
msgBuf
.
buf
=
info
->
pRequest
->
msgBuf
;
info
->
msgBuf
.
len
=
ERROR_MSG_BUF_DEFAULT_SIZE
;
info
->
pRequest
->
stmtType
=
info
->
pQuery
->
pRoot
->
type
;
}
info
->
exec
=
smlInitHandle
(
info
->
pQuery
);
info
->
childTables
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
info
->
superTables
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
info
->
pVgHash
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
...
...
@@ -1577,7 +1561,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
goto
cleanup
;
}
}
if
(
NULL
==
info
->
exec
||
NULL
==
info
->
childTables
||
NULL
==
info
->
superTables
||
NULL
==
info
->
pVgHash
||
if
(
NULL
==
info
->
pQuery
||
NULL
==
info
->
childTables
||
NULL
==
info
->
superTables
||
NULL
==
info
->
pVgHash
||
NULL
==
info
->
dumplicateKey
)
{
uError
(
"SML:0x%"
PRIx64
" create info failed"
,
info
->
id
);
goto
cleanup
;
...
...
@@ -2337,7 +2321,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
(
*
pMeta
)
->
tableMeta
->
vgId
=
vg
.
vgId
;
(
*
pMeta
)
->
tableMeta
->
uid
=
tableData
->
uid
;
// one table merge data block together according uid
code
=
smlBindData
(
info
->
exec
,
tableData
->
tags
,
(
*
pMeta
)
->
cols
,
tableData
->
cols
,
info
->
dataFormat
,
code
=
smlBindData
(
info
->
pQuery
,
tableData
->
tags
,
(
*
pMeta
)
->
cols
,
tableData
->
cols
,
info
->
dataFormat
,
(
*
pMeta
)
->
tableMeta
,
tableData
->
childTableName
,
tableData
->
sTableName
,
tableData
->
sTableNameLen
,
info
->
ttl
,
info
->
msgBuf
.
buf
,
info
->
msgBuf
.
len
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2347,7 +2331,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
oneTable
=
(
SSmlTableInfo
**
)
taosHashIterate
(
info
->
childTables
,
oneTable
);
}
code
=
smlBuildOutput
(
info
->
exec
,
info
->
pVgHash
);
code
=
smlBuildOutput
(
info
->
pQuery
,
info
->
pVgHash
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" smlBuildOutput failed"
,
info
->
id
);
return
code
;
...
...
source/libs/parser/inc/parInsertUtil.h
浏览文件 @
c7efb35b
...
...
@@ -167,6 +167,7 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
void
insCheckTableDataOrder
(
STableDataCxt
*
pTableCxt
,
TSKEY
tsKey
);
int32_t
insGetTableDataCxt
(
SHashObj
*
pHash
,
void
*
id
,
int32_t
idLen
,
STableMeta
*
pTableMeta
,
SVCreateTbReq
**
pCreateTbReq
,
STableDataCxt
**
pTableCxt
,
bool
colMode
);
int32_t
initTableColSubmitData
(
STableDataCxt
*
pTableCxt
);
int32_t
insMergeTableDataCxt
(
SHashObj
*
pTableHash
,
SArray
**
pVgDataBlocks
);
int32_t
insBuildVgDataBlocks
(
SHashObj
*
pVgroupsHashObj
,
SArray
*
pVgDataBlocks
,
SArray
**
pDataBlocks
);
void
insDestroyTableDataCxtHashMap
(
SHashObj
*
pTableCxtHash
);
...
...
@@ -174,5 +175,5 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
void
insDestroyVgroupDataCxtList
(
SArray
*
pVgCxtList
);
void
insDestroyVgroupDataCxtHashMap
(
SHashObj
*
pVgCxtHash
);
void
insDestroyTableDataCxt
(
STableDataCxt
*
pTableCxt
);
void
destroyBoundColInfo
(
SBoundColInfo
*
pInfo
);
#endif // TDENGINE_PAR_INSERT_UTIL_H
source/libs/parser/src/parInsertSml.c
浏览文件 @
c7efb35b
...
...
@@ -45,95 +45,46 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SmlExecTableHandle
{
SParsedDataColInfo
tags
;
// each table
SVCreateTbReq
createTblReq
;
// each table
}
SmlExecTableHandle
;
typedef
struct
SmlExecHandle
{
SHashObj
*
pBlockHash
;
SmlExecTableHandle
tableExecHandle
;
SQuery
*
pQuery
;
}
SSmlExecHandle
;
static
void
smlDestroyTableHandle
(
void
*
pHandle
)
{
SmlExecTableHandle
*
handle
=
(
SmlExecTableHandle
*
)
pHandle
;
destroyBoundColumnInfo
(
&
handle
->
tags
);
tdDestroySVCreateTbReq
(
&
handle
->
createTblReq
);
}
static
int32_t
smlBoundColumnData
(
SArray
*
cols
,
SParsedDataColInfo
*
pColList
,
SSchema
*
pSchema
,
bool
isTag
)
{
col_id_t
nCols
=
pColList
->
numOfCols
;
pColList
->
numOfBound
=
0
;
pColList
->
boundNullLen
=
0
;
memset
(
pColList
->
boundColumns
,
0
,
sizeof
(
col_id_t
)
*
nCols
);
for
(
col_id_t
i
=
0
;
i
<
nCols
;
++
i
)
{
pColList
->
cols
[
i
].
valStat
=
VAL_STAT_NONE
;
static
int32_t
smlBoundColumnData
(
SArray
*
cols
,
SBoundColInfo
*
pBoundInfo
,
SSchema
*
pSchema
,
bool
isTag
)
{
bool
*
pUseCols
=
taosMemoryCalloc
(
pBoundInfo
->
numOfCols
,
sizeof
(
bool
));
if
(
NULL
==
pUseCols
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
bool
isOrdered
=
true
;
col_id_t
lastColIdx
=
-
1
;
// last column found
pBoundInfo
->
numOfBound
=
0
;
int16_t
lastColIdx
=
-
1
;
// last column found
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
cols
);
++
i
)
{
SSmlKv
*
kv
=
taosArrayGetP
(
cols
,
i
);
SToken
sToken
=
{.
n
=
kv
->
keyLen
,
.
z
=
(
char
*
)
kv
->
key
};
col_id_t
t
=
lastColIdx
+
1
;
col_id_t
index
=
((
t
==
0
&&
!
isTag
)
?
0
:
insFindCol
(
&
sToken
,
t
,
n
Cols
,
pSchema
));
uDebug
(
"SML, index:%d, t:%d, ncols:%d"
,
index
,
t
,
n
Cols
);
col_id_t
index
=
((
t
==
0
&&
!
isTag
)
?
0
:
insFindCol
(
&
sToken
,
t
,
pBoundInfo
->
numOf
Cols
,
pSchema
));
uDebug
(
"SML, index:%d, t:%d, ncols:%d"
,
index
,
t
,
pBoundInfo
->
numOf
Cols
);
if
(
index
<
0
&&
t
>
0
)
{
index
=
insFindCol
(
&
sToken
,
0
,
t
,
pSchema
);
isOrdered
=
false
;
}
if
(
index
<
0
)
{
uError
(
"smlBoundColumnData. index:%d"
,
index
);
return
TSDB_CODE_SML_INVALID_DATA
;
code
=
TSDB_CODE_SML_INVALID_DATA
;
goto
end
;
}
if
(
p
ColList
->
cols
[
index
].
valStat
==
VAL_STAT_HAS
)
{
if
(
p
UseCols
[
index
]
)
{
uError
(
"smlBoundColumnData. already set. index:%d"
,
index
);
return
TSDB_CODE_SML_INVALID_DATA
;
code
=
TSDB_CODE_SML_INVALID_DATA
;
goto
end
;
}
lastColIdx
=
index
;
pColList
->
cols
[
index
].
valStat
=
VAL_STAT_HAS
;
pColList
->
boundColumns
[
pColList
->
numOfBound
]
=
index
;
++
pColList
->
numOfBound
;
switch
(
pSchema
[
t
].
type
)
{
case
TSDB_DATA_TYPE_BINARY
:
pColList
->
boundNullLen
+=
(
sizeof
(
VarDataOffsetT
)
+
VARSTR_HEADER_SIZE
+
CHAR_BYTES
);
break
;
case
TSDB_DATA_TYPE_NCHAR
:
pColList
->
boundNullLen
+=
(
sizeof
(
VarDataOffsetT
)
+
VARSTR_HEADER_SIZE
+
TSDB_NCHAR_SIZE
);
break
;
default:
pColList
->
boundNullLen
+=
TYPE_BYTES
[
pSchema
[
t
].
type
];
break
;
}
pUseCols
[
index
]
=
true
;
pBoundInfo
->
pColIndex
[
pBoundInfo
->
numOfBound
]
=
index
;
++
pBoundInfo
->
numOfBound
;
}
pColList
->
orderStatus
=
isOrdered
?
ORDER_STATUS_ORDERED
:
ORDER_STATUS_DISORDERED
;
if
(
!
isOrdered
)
{
pColList
->
colIdxInfo
=
taosMemoryCalloc
(
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
));
if
(
NULL
==
pColList
->
colIdxInfo
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
SBoundIdxInfo
*
pColIdx
=
pColList
->
colIdxInfo
;
for
(
col_id_t
i
=
0
;
i
<
pColList
->
numOfBound
;
++
i
)
{
pColIdx
[
i
].
schemaColIdx
=
pColList
->
boundColumns
[
i
];
pColIdx
[
i
].
boundIdx
=
i
;
}
taosSort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
insSchemaIdxCompar
);
for
(
col_id_t
i
=
0
;
i
<
pColList
->
numOfBound
;
++
i
)
{
pColIdx
[
i
].
finalIdx
=
i
;
}
taosSort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
insBoundIdxCompar
);
}
if
(
pColList
->
numOfCols
>
pColList
->
numOfBound
)
{
memset
(
&
pColList
->
boundColumns
[
pColList
->
numOfBound
],
0
,
sizeof
(
col_id_t
)
*
(
pColList
->
numOfCols
-
pColList
->
numOfBound
));
}
end:
taosMemoryFree
(
pUseCols
);
return
TSDB_CODE_SUCCESS
;
return
code
;
}
/**
...
...
@@ -146,7 +97,7 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
* @param msg
* @return int32_t
*/
static
int32_t
smlBuildTagRow
(
SArray
*
cols
,
S
ParsedData
ColInfo
*
tags
,
SSchema
*
pSchema
,
STag
**
ppTag
,
SArray
**
tagName
,
static
int32_t
smlBuildTagRow
(
SArray
*
cols
,
S
Bound
ColInfo
*
tags
,
SSchema
*
pSchema
,
STag
**
ppTag
,
SArray
**
tagName
,
SMsgBuf
*
msg
)
{
SArray
*
pTagArray
=
taosArrayInit
(
tags
->
numOfBound
,
sizeof
(
STagVal
));
if
(
!
pTagArray
)
{
...
...
@@ -159,7 +110,7 @@ static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* p
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int
i
=
0
;
i
<
tags
->
numOfBound
;
++
i
)
{
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
boundColumns
[
i
]];
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
pColIndex
[
i
]];
SSmlKv
*
kv
=
taosArrayGetP
(
cols
,
i
);
taosArrayPush
(
*
tagName
,
pTagSchema
->
name
);
...
...
@@ -207,153 +158,165 @@ end:
return
code
;
}
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
int32_t
smlBindData
(
SQuery
*
query
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
const
char
*
sTableName
,
int32_t
sTableNameLen
,
int32_t
ttl
,
char
*
msgBuf
,
int16_t
msgBufLen
)
{
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SSmlExecHandle
*
smlHandle
=
(
SSmlExecHandle
*
)
handle
;
smlDestroyTableHandle
(
&
smlHandle
->
tableExecHandle
);
// free for each table
SSchema
*
pTagsSchema
=
getTableTagSchema
(
pTableMeta
);
insSetBoundColumnInfo
(
&
smlHandle
->
tableExecHandle
.
tags
,
pTagsSchema
,
getNumOfTags
(
pTableMeta
));
int
ret
=
smlBoundColumnData
(
tags
,
&
smlHandle
->
tableExecHandle
.
tags
,
pTagsSchema
,
true
);
SBoundColInfo
bindTags
=
{
0
};
SVCreateTbReq
*
pCreateTblReq
=
NULL
;
SArray
*
tagName
=
NULL
;
insInitBoundColsInfo
(
getNumOfTags
(
pTableMeta
),
&
bindTags
);
int
ret
=
smlBoundColumnData
(
tags
,
&
bindTags
,
pTagsSchema
,
true
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
buildInvalidOperationMsg
(
&
pBuf
,
"bound tags error"
);
return
ret
;
goto
end
;
}
STag
*
pTag
=
NULL
;
SArray
*
tagName
=
NULL
;
ret
=
smlBuildTagRow
(
tags
,
&
smlHandle
->
tableExecHandle
.
t
ags
,
pTagsSchema
,
&
pTag
,
&
tagName
,
&
pBuf
);
ret
=
smlBuildTagRow
(
tags
,
&
bindT
ags
,
pTagsSchema
,
&
pTag
,
&
tagName
,
&
pBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
tagName
);
return
ret
;
goto
end
;
}
insBuildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
,
NULL
,
tagName
,
pCreateTblReq
=
taosMemoryCalloc
(
1
,
sizeof
(
SVCreateTbReq
));
if
(
NULL
==
pCreateTblReq
)
{
ret
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
insBuildCreateTbReq
(
pCreateTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
,
NULL
,
tagName
,
pTableMeta
->
tableInfo
.
numOfTags
,
ttl
);
taosArrayDestroy
(
tagName
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
stbName
=
taosMemoryMalloc
(
sTableNameLen
+
1
);
memcpy
(
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
stbName
,
sTableName
,
sTableNameLen
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
stbName
[
sTableNameLen
]
=
0
;
pCreateTblReq
->
ctb
.
stbName
=
taosMemoryCalloc
(
1
,
sTableNameLen
+
1
);
memcpy
(
pCreateTblReq
->
ctb
.
stbName
,
sTableName
,
sTableNameLen
);
STableDataBlocks
*
pDataBlock
=
NULL
;
ret
=
insGetDataBlockFromList
(
smlHandle
->
pBlockHash
,
&
pTableMeta
->
uid
,
sizeof
(
pTableMeta
->
uid
),
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
getTableInfo
(
pTableMeta
).
rowSize
,
pTableMeta
,
&
pDataBlock
,
NULL
,
&
smlHandle
->
tableExecHandle
.
createTblReq
);
STableDataCxt
*
pTableCxt
=
NULL
;
ret
=
insGetTableDataCxt
(((
SVnodeModifOpStmt
*
)(
query
->
pRoot
))
->
pTableBlockHashObj
,
&
pTableMeta
->
uid
,
sizeof
(
pTableMeta
->
uid
),
pTableMeta
,
&
pCreateTblReq
,
&
pTableCxt
,
false
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
buildInvalidOperationMsg
(
&
pBuf
,
"
create data block
error"
);
return
ret
;
buildInvalidOperationMsg
(
&
pBuf
,
"
insGetTableDataCxt
error"
);
goto
end
;
}
SSchema
*
pSchema
=
getTableColumnSchema
(
pTableMeta
);
ret
=
smlBoundColumnData
(
colsSchema
,
&
pDataBlock
->
boundColumnInfo
,
pSchema
,
false
);
ret
=
smlBoundColumnData
(
colsSchema
,
&
pTableCxt
->
boundColsInfo
,
pSchema
,
false
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
buildInvalidOperationMsg
(
&
pBuf
,
"bound cols error"
);
return
ret
;
goto
end
;
}
int32_t
extendedRowSize
=
insGetExtendedRowSize
(
pDataBlock
);
SParsedDataColInfo
*
spd
=
&
pDataBlock
->
boundColumnInfo
;
SRowBuilder
*
pBuilder
=
&
pDataBlock
->
rowBuilder
;
SMemParam
param
=
{.
rb
=
pBuilder
};
insInitRowBuilder
(
&
pDataBlock
->
rowBuilder
,
pDataBlock
->
pTableMeta
->
sversion
,
&
pDataBlock
->
boundColumnInfo
);
ret
=
initTableColSubmitData
(
pTableCxt
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
buildInvalidOperationMsg
(
&
pBuf
,
"initTableColSubmitData error"
);
goto
end
;
}
int32_t
rowNum
=
taosArrayGetSize
(
cols
);
if
(
rowNum
<=
0
)
{
return
buildInvalidOperationMsg
(
&
pBuf
,
"cols size <= 0"
);
}
ret
=
insAllocateMemForSize
(
pDataBlock
,
extendedRowSize
*
rowNum
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
buildInvalidOperationMsg
(
&
pBuf
,
"allocate memory error"
);
return
ret
;
ret
=
buildInvalidOperationMsg
(
&
pBuf
,
"cols size <= 0"
);
goto
end
;
}
for
(
int32_t
r
=
0
;
r
<
rowNum
;
++
r
)
{
STSRow
*
row
=
(
STSRow
*
)(
pDataBlock
->
pData
+
pDataBlock
->
size
);
// skip the SSubmitBlk header
tdSRowResetBuf
(
pBuilder
,
row
);
void
*
rowData
=
taosArrayGetP
(
cols
,
r
);
size_t
rowDataSize
=
0
;
if
(
format
)
{
rowDataSize
=
taosArrayGetSize
(
rowData
);
}
// 1. set the parsed value from sql string
for
(
int
c
=
0
,
j
=
0
;
c
<
spd
->
numOfBound
;
++
c
)
{
SSchema
*
pColSchema
=
&
pSchema
[
spd
->
boundColumns
[
c
]];
param
.
schema
=
pColSchema
;
insGetSTSRowAppendInfo
(
pBuilder
->
rowType
,
spd
,
c
,
&
param
.
toffset
,
&
param
.
colIdx
);
for
(
int
c
=
0
;
c
<
pTableCxt
->
boundColsInfo
.
numOfBound
;
++
c
)
{
SSchema
*
pColSchema
=
&
pSchema
[
pTableCxt
->
boundColsInfo
.
pColIndex
[
c
]];
SColVal
*
pVal
=
taosArrayGet
(
pTableCxt
->
pValues
,
pTableCxt
->
boundColsInfo
.
pColIndex
[
c
]);
SSmlKv
*
kv
=
NULL
;
if
(
format
)
{
if
(
j
<
rowDataSize
)
{
kv
=
taosArrayGetP
(
rowData
,
j
);
if
(
rowDataSize
!=
spd
->
numOfBound
&&
j
!=
0
&&
(
kv
->
keyLen
!=
strlen
(
pColSchema
->
name
)
||
strncmp
(
kv
->
key
,
pColSchema
->
name
,
kv
->
keyLen
)
!=
0
))
{
kv
=
NULL
;
}
else
{
j
++
;
}
}
}
else
{
if
(
!
format
){
void
**
p
=
taosHashGet
(
rowData
,
pColSchema
->
name
,
strlen
(
pColSchema
->
name
));
if
(
p
)
kv
=
*
p
;
}
if
(
kv
)
{
int32_t
colLen
=
kv
->
length
;
if
(
pColSchema
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
uDebug
(
"SML:data before:%"
PRId64
", precision:%d"
,
kv
->
i
,
pTableMeta
->
tableInfo
.
precision
);
kv
->
i
=
convertTimePrecision
(
kv
->
i
,
TSDB_TIME_PRECISION_NANO
,
pTableMeta
->
tableInfo
.
precision
);
uDebug
(
"SML:data after:%"
PRId64
", precision:%d"
,
kv
->
i
,
pTableMeta
->
tableInfo
.
precision
);
if
(
kv
==
NULL
)
{
continue
;
}
if
(
pColSchema
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
kv
->
i
=
convertTimePrecision
(
kv
->
i
,
TSDB_TIME_PRECISION_NANO
,
pTableMeta
->
tableInfo
.
precision
);
}
if
(
kv
->
type
==
TSDB_DATA_TYPE_NCHAR
){
int32_t
len
=
0
;
char
*
pUcs4
=
taosMemoryCalloc
(
1
,
pColSchema
->
bytes
-
VARSTR_HEADER_SIZE
);
if
(
NULL
==
pUcs4
)
{
ret
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
if
(
IS_VAR_DATA_TYPE
(
kv
->
type
))
{
insMemRowAppend
(
&
pBuf
,
kv
->
value
,
colLen
,
&
param
);
}
else
{
insMemRowAppend
(
&
pBuf
,
&
(
kv
->
value
),
colLen
,
&
param
);
if
(
!
taosMbsToUcs4
(
kv
->
value
,
kv
->
length
,
(
TdUcs4
*
)
pUcs4
,
pColSchema
->
bytes
-
VARSTR_HEADER_SIZE
,
&
len
))
{
if
(
errno
==
E2BIG
)
{
buildInvalidOperationMsg
(
&
pBuf
,
"value too long"
);
ret
=
TSDB_CODE_PAR_VALUE_TOO_LONG
;
goto
end
;
}
ret
=
buildInvalidOperationMsg
(
&
pBuf
,
strerror
(
errno
));
goto
end
;
}
pVal
->
value
.
pData
=
pUcs4
;
pVal
->
value
.
nData
=
len
;
}
else
if
(
kv
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
pVal
->
value
.
nData
=
kv
->
length
;
pVal
->
value
.
pData
=
(
uint8_t
*
)
kv
->
value
;
}
else
{
pBuilder
->
hasNone
=
true
;
}
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
pColSchema
->
colId
)
{
TSKEY
tsKey
=
TD_ROW_KEY
(
row
);
insCheckTimestamp
(
pDataBlock
,
(
const
char
*
)
&
tsKey
);
memcpy
(
&
pVal
->
value
.
val
,
&
(
kv
->
value
),
kv
->
length
);
}
pVal
->
flag
=
CV_FLAG_VALUE
;
}
// set the null value for the columns that do not assign values
if
((
spd
->
numOfBound
<
spd
->
numOfCols
)
&&
TD_IS_TP_ROW
(
row
))
{
pBuilder
->
hasNone
=
true
;
SRow
**
pRow
=
taosArrayReserve
(
pTableCxt
->
pData
->
aRowP
,
1
);
ret
=
tRowBuild
(
pTableCxt
->
pValues
,
pTableCxt
->
pSchema
,
pRow
);
if
(
TSDB_CODE_SUCCESS
!=
ret
)
{
buildInvalidOperationMsg
(
&
pBuf
,
"tRowBuild error"
);
goto
end
;
}
tdSRowEnd
(
pBuilder
);
pDataBlock
->
size
+=
extendedRowSize
;
insCheckTableDataOrder
(
pTableCxt
,
TD_ROW_KEY
(
*
pRow
));
}
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)(
pDataBlock
->
pData
);
return
insSetBlockInfo
(
pBlocks
,
pDataBlock
,
rowNum
,
&
pBuf
);
end:
destroyBoundColInfo
(
&
bindTags
);
taosMemoryFree
(
pCreateTblReq
);
taosArrayDestroy
(
tagName
);
return
ret
;
}
void
*
smlInitHandle
(
SQuery
*
pQuery
)
{
SSmlExecHandle
*
handle
=
taosMemoryCalloc
(
1
,
sizeof
(
SSmlExecHandle
));
if
(
!
handle
)
return
NULL
;
handle
->
pBlockHash
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
handle
->
pQuery
=
pQuery
;
return
handle
;
}
SQuery
*
smlInitHandle
()
{
SQuery
*
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
if
(
NULL
==
pQuery
)
{
uError
(
"create pQuery error"
);
return
NULL
;
}
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
pQuery
->
haveResultSet
=
false
;
pQuery
->
msgType
=
TDMT_VND_SUBMIT
;
SVnodeModifOpStmt
*
stmt
=
(
SVnodeModifOpStmt
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
if
(
NULL
==
stmt
)
{
uError
(
"create SVnodeModifOpStmt error"
);
qDestroyQuery
(
pQuery
);
return
NULL
;
}
stmt
->
pVgroupsHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
stmt
->
pTableBlockHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
stmt
->
freeHashFunc
=
insDestroyTableDataCxtHashMap
;
stmt
->
freeArrayFunc
=
insDestroyVgroupDataCxtList
;
void
smlDestroyHandle
(
void
*
pHandle
)
{
if
(
!
pHandle
)
return
;
SSmlExecHandle
*
handle
=
(
SSmlExecHandle
*
)
pHandle
;
insDestroyBlockHashmap
(
handle
->
pBlockHash
);
smlDestroyTableHandle
(
&
handle
->
tableExecHandle
);
taosMemoryFree
(
handle
);
pQuery
->
pRoot
=
(
SNode
*
)
stmt
;
return
pQuery
;
}
int32_t
smlBuildOutput
(
void
*
handle
,
SHashObj
*
pVgHash
)
{
SSmlExecHandle
*
smlHandle
=
(
SSmlExecHandle
*
)
handle
;
return
qBuildStmtOutput
(
smlHandle
->
pQuery
,
pVgHash
,
smlHandle
->
pBlockHash
);
int32_t
smlBuildOutput
(
SQuery
*
handle
,
SHashObj
*
pVgHash
)
{
SVnodeModifOpStmt
*
pStmt
=
(
SVnodeModifOpStmt
*
)(
handle
)
->
pRoot
;
// merge according to vgId
int32_t
code
=
insMergeTableDataCxt
(
pStmt
->
pTableBlockHashObj
,
&
pStmt
->
pVgDataBlocks
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"insMergeTableDataCxt failed"
);
return
code
;
}
code
=
insBuildVgDataBlocks
(
pVgHash
,
pStmt
->
pVgDataBlocks
,
&
pStmt
->
pDataBlocks
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"insBuildVgDataBlocks failed"
);
return
code
;
}
return
code
;
}
source/libs/parser/src/parInsertSql.c
浏览文件 @
c7efb35b
...
...
@@ -1371,8 +1371,6 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod
return
code
;
}
static
void
destroyBoundColInfo
(
SBoundColInfo
*
pInfo
)
{
taosMemoryFreeClear
(
pInfo
->
pColIndex
);
}
static
void
resetEnvPreTable
(
SInsertParseContext
*
pCxt
,
SVnodeModifOpStmt
*
pStmt
)
{
destroyBoundColInfo
(
&
pCxt
->
tags
);
taosMemoryFreeClear
(
pStmt
->
pTableMeta
);
...
...
source/libs/parser/src/parInsertStmt.c
浏览文件 @
c7efb35b
...
...
@@ -22,13 +22,6 @@
#include "ttime.h"
#include "ttypes.h"
typedef
struct
SKvParam
{
int16_t
pos
;
SArray
*
pTagVals
;
SSchema
*
schema
;
char
buf
[
TSDB_MAX_TAGS_LEN
];
}
SKvParam
;
int32_t
qBuildStmtOutput
(
SQuery
*
pQuery
,
SHashObj
*
pVgHash
,
SHashObj
*
pBlockHash
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SArray
*
pVgDataBlocks
=
NULL
;
...
...
source/libs/parser/src/parInsertUtil.c
浏览文件 @
c7efb35b
...
...
@@ -340,8 +340,8 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
void
**
p1
=
taosHashIterate
(
pDataBlockHash
,
NULL
);
while
(
p1
)
{
S
TableDataBlocks
*
pBlocks
=
*
p1
;
insDestroyDataBlock
(
pBlocks
);
S
BoundColInfo
*
pBlocks
=
*
p1
;
destroyBoundColInfo
(
pBlocks
);
p1
=
taosHashIterate
(
pDataBlockHash
,
p1
);
}
...
...
@@ -1083,8 +1083,9 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
int32_t
insGetTableDataCxt
(
SHashObj
*
pHash
,
void
*
id
,
int32_t
idLen
,
STableMeta
*
pTableMeta
,
SVCreateTbReq
**
pCreateTbReq
,
STableDataCxt
**
pTableCxt
,
bool
colMode
)
{
*
pTableCxt
=
taosHashGet
(
pHash
,
id
,
idLen
);
if
(
NULL
!=
*
pTableCxt
)
{
STableDataCxt
**
tmp
=
(
STableDataCxt
**
)
taosHashGet
(
pHash
,
id
,
idLen
);
if
(
NULL
!=
tmp
)
{
*
pTableCxt
=
*
tmp
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
createTableDataCxt
(
pTableMeta
,
pCreateTbReq
,
pTableCxt
,
colMode
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录