Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c8fe5bc8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c8fe5bc8
编写于
4月 18, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
stmt
上级
93ca1168
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
345 addition
and
315 deletion
+345
-315
include/client/taos.h
include/client/taos.h
+6
-30
include/libs/parser/parser.h
include/libs/parser/parser.h
+14
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+5
-1
source/client/inc/clientStmt.h
source/client/inc/clientStmt.h
+7
-3
source/client/src/clientMain.c
source/client/src/clientMain.c
+11
-13
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+54
-97
source/client/src/tmq.c
source/client/src/tmq.c
+1
-1
source/libs/parser/inc/parInsertData.h
source/libs/parser/inc/parInsertData.h
+2
-1
source/libs/parser/inc/parInt.h
source/libs/parser/inc/parInt.h
+0
-16
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+201
-18
source/libs/parser/src/parInsertData.c
source/libs/parser/src/parInsertData.c
+44
-135
未找到文件。
include/client/taos.h
浏览文件 @
c8fe5bc8
...
...
@@ -92,38 +92,14 @@ typedef struct taosField {
typedef
void
(
*
__taos_async_fn_t
)(
void
*
param
,
TAOS_RES
*
,
int
code
);
typedef
struct
TAOS_BIND
{
int
buffer_type
;
void
*
buffer
;
uintptr_t
buffer_length
;
// unused
uintptr_t
*
length
;
int
*
is_null
;
int
is_unsigned
;
// unused
int
*
error
;
// unused
union
{
int64_t
ts
;
int8_t
b
;
int8_t
v1
;
int16_t
v2
;
int32_t
v4
;
int64_t
v8
;
float
f4
;
double
f8
;
unsigned
char
*
bin
;
char
*
nchar
;
}
u
;
unsigned
int
allocated
;
}
TAOS_BIND
;
typedef
struct
TAOS_MULTI_BIND
{
typedef
struct
TAOS_BIND_v2
{
int
buffer_type
;
void
*
buffer
;
uintptr_t
buffer_length
;
int32_t
*
length
;
char
*
is_null
;
int
num
;
}
TAOS_
MULTI_BIND
;
}
TAOS_
BIND_v2
;
typedef
enum
{
SET_CONF_RET_SUCC
=
0
,
...
...
@@ -154,16 +130,16 @@ const char *taos_data_type(int type);
DLL_EXPORT
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
);
DLL_EXPORT
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
DLL_EXPORT
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
*
tags
);
DLL_EXPORT
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
_v2
*
tags
);
DLL_EXPORT
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_set_sub_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_is_insert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
DLL_EXPORT
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
);
DLL_EXPORT
int
taos_stmt_get_param
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
);
DLL_EXPORT
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_
MULTI_BIND
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_single_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_
MULTI_BIND
*
bind
,
int
colIdx
);
DLL_EXPORT
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_BIND
_v2
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_
BIND_v2
*
bind
);
DLL_EXPORT
int
taos_stmt_bind_single_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_
BIND_v2
*
bind
,
int
colIdx
);
DLL_EXPORT
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_execute
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_RES
*
taos_stmt_use_result
(
TAOS_STMT
*
stmt
);
...
...
include/libs/parser/parser.h
浏览文件 @
c8fe5bc8
...
...
@@ -80,6 +80,20 @@ void qDestroyQuery(SQuery* pQueryNode);
int32_t
qExtractResultSchema
(
const
SNode
*
pRoot
,
int32_t
*
numOfCols
,
SSchema
**
pSchema
);
int32_t
qBuildStmtOutput
(
SQuery
*
pQuery
,
SHashObj
*
pVgHash
,
SHashObj
*
pBlockHash
);
void
qResetStmtDataBlock
(
void
*
pBlock
,
bool
freeData
);
int32_t
qCloneStmtDataBlock
(
void
**
pDst
,
void
*
pSrc
);
void
qFreeStmtDataBlock
(
void
*
pDataBlock
);
int32_t
qRebuildStmtDataBlock
(
void
**
pDst
,
void
*
pSrc
);
void
qDestroyStmtDataBlock
(
void
*
pBlock
);
int32_t
qBindStmtColsValue
(
void
*
pDataBlock
,
TAOS_BIND_v2
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
);
int32_t
qBuildStmtColFields
(
void
*
pDataBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
);
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
);
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
SName
*
pName
,
TAOS_BIND_v2
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
);
void
destroyBoundColumnInfo
(
void
*
pBoundInfo
);
int32_t
qCreateSName
(
SName
*
pName
,
const
char
*
pTableName
,
int32_t
acctId
,
char
*
dbName
,
char
*
msgBuf
,
int32_t
msgBufLen
);
#ifdef __cplusplus
}
#endif
...
...
source/client/inc/clientInt.h
浏览文件 @
c8fe5bc8
...
...
@@ -280,7 +280,8 @@ void initMsgHandleFp();
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
bool
topicQuery
,
SQuery
**
pQuery
);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
bool
topicQuery
,
SQuery
**
pQuery
,
SStmtCallback
*
pStmtCb
);
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
);
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
);
...
...
@@ -311,6 +312,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq
void
hbMgrInitMqHbRspHandle
();
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
int32_t
code
,
bool
keepQuery
);
#ifdef __cplusplus
}
#endif
...
...
source/client/inc/clientStmt.h
浏览文件 @
c8fe5bc8
...
...
@@ -19,6 +19,9 @@
#ifdef __cplusplus
extern
"C"
{
#endif
#include "catalog.h"
typedef
void
STableDataBlocks
;
typedef
enum
{
STMT_TYPE_INSERT
=
1
,
...
...
@@ -107,15 +110,16 @@ typedef struct STscStmt {
TAOS_STMT
*
stmtInit
(
TAOS
*
taos
);
int
stmtClose
(
TAOS_STMT
*
stmt
);
int
stmtExec
(
TAOS_STMT
*
stmt
);
char
*
stmtErrstr
(
TAOS_STMT
*
stmt
);
c
onst
c
har
*
stmtErrstr
(
TAOS_STMT
*
stmt
);
int
stmtAffectedRows
(
TAOS_STMT
*
stmt
);
int
stmtPrepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
int
stmtSetTbNameTags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
*
tags
);
int
stmtSetTbName
(
TAOS_STMT
*
stmt
,
const
char
*
tbName
);
int
stmtSetTbTags
(
TAOS_STMT
*
stmt
,
TAOS_BIND_v2
*
tags
);
int
stmtIsInsert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
int
stmtGetParamNum
(
TAOS_STMT
*
stmt
,
int
*
nums
);
int
stmtAddBatch
(
TAOS_STMT
*
stmt
);
TAOS_RES
*
stmtUseResult
(
TAOS_STMT
*
stmt
);
int
stmtBindBatch
(
TAOS_STMT
*
stmt
,
TAOS_
MULTI_BIND
*
bind
);
int
stmtBindBatch
(
TAOS_STMT
*
stmt
,
TAOS_
BIND_v2
*
bind
);
#ifdef __cplusplus
...
...
source/client/src/clientMain.c
浏览文件 @
c8fe5bc8
...
...
@@ -579,7 +579,7 @@ int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
return
stmtPrepare
(
stmt
,
sql
,
length
);
}
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
*
tags
)
{
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
_v2
*
tags
)
{
if
(
stmt
==
NULL
||
name
==
NULL
)
{
tscError
(
"NULL parameter for %s"
,
__FUNCTION__
);
terrno
=
TSDB_CODE_INVALID_PARA
;
...
...
@@ -608,25 +608,23 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
return
stmtSetTbName
(
stmt
,
name
);
}
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
bind
)
{
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_BIND
_v2
*
bind
)
{
if
(
stmt
==
NULL
||
bind
==
NULL
)
{
tscError
(
"NULL parameter for %s"
,
__FUNCTION__
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
TAOS_MULTI_BIND
mbind
=
{
0
};
mbind
.
buffer_type
=
bind
->
buffer_type
;
mbind
.
buffer
=
bind
->
buffer
;
mbind
.
buffer_length
=
bind
->
buffer_length
;
mbind
.
length
=
bind
->
length
;
mbind
.
is_null
=
bind
->
is_null
;
mbind
.
num
=
1
;
if
(
bind
->
num
>
1
)
{
tscError
(
"invalid bind number %d for %s"
,
bind
->
num
,
__FUNCTION__
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
return
stmtBindBatch
(
stmt
,
&
m
bind
);
return
stmtBindBatch
(
stmt
,
bind
);
}
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_
MULTI_BIND
*
bind
)
{
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_
BIND_v2
*
bind
)
{
if
(
stmt
==
NULL
||
bind
==
NULL
)
{
tscError
(
"NULL parameter for %s"
,
__FUNCTION__
);
terrno
=
TSDB_CODE_INVALID_PARA
;
...
...
@@ -642,7 +640,7 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
return
stmtBindBatch
(
stmt
,
bind
);
}
int
taos_stmt_bind_single_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_
MULTI_BIND
*
bind
,
int
colIdx
)
{
int
taos_stmt_bind_single_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_
BIND_v2
*
bind
,
int
colIdx
)
{
return
stmtBindBatch
(
stmt
,
bind
);
/* TODO */
}
...
...
@@ -703,7 +701,7 @@ char *taos_stmt_errstr(TAOS_STMT *stmt) {
return
NULL
;
}
return
stmtErrstr
(
stmt
);
return
(
char
*
)
stmtErrstr
(
stmt
);
}
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
)
{
...
...
source/client/src/clientStmt.c
浏览文件 @
c8fe5bc8
...
...
@@ -48,76 +48,6 @@ int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHa
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtParseSql
(
STscStmt
*
pStmt
)
{
SStmtCallback
stmtCb
=
{
.
pStmt
=
pStmt
,
.
getTbNameFn
=
stmtGetTbName
,
.
setBindInfoFn
=
stmtSetBindInfo
,
.
setExecInfoFn
=
stmtSetExecInfo
,
.
getExecInfoFn
=
stmtGetExecInfo
,
};
STMT_ERR_RET
(
parseSql
(
pStmt
->
exec
.
pRequest
,
false
,
&
pStmt
->
sql
.
pQuery
,
&
stmtCb
));
pStmt
->
bind
.
needParse
=
false
;
switch
(
nodeType
(
pStmt
->
sql
.
pQuery
->
pRoot
))
{
case
QUERY_NODE_VNODE_MODIF_STMT
:
if
(
0
==
pStmt
->
sql
.
type
)
{
pStmt
->
sql
.
type
=
STMT_TYPE_INSERT
;
}
break
;
case
QUERY_NODE_SELECT_STMT
:
pStmt
->
sql
.
type
=
STMT_TYPE_QUERY
;
break
;
default:
tscError
(
"not supported stmt type %d"
,
nodeType
(
pStmt
->
sql
.
pQuery
->
pRoot
));
STMT_ERR_RET
(
TSDB_CODE_TSC_STMT_CLAUSE_ERROR
);
}
STMT_ERR_RET
(
stmtCacheBlock
(
pStmt
));
return
TSDB_CODE_SUCCESS
;
}
void
stmtResetDataBlock
(
STableDataBlocks
*
pBlock
)
{
pBlock
->
pData
=
NULL
;
pBlock
->
ordered
=
true
;
pBlock
->
prevTS
=
INT64_MIN
;
pBlock
->
size
=
sizeof
(
SSubmitBlk
);
pBlock
->
tsSource
=
-
1
;
pBlock
->
numOfTables
=
1
;
pBlock
->
nAllocSize
=
TSDB_PAYLOAD_SIZE
;
pBlock
->
headerSize
=
pBlock
->
size
;
memset
(
&
pBlock
->
rowBuilder
,
0
,
sizeof
(
pBlock
->
rowBuilder
));
}
int32_t
stmtCloneDataBlock
(
STableDataBlocks
**
pDst
,
STableDataBlocks
*
pSrc
)
{
*
pDst
=
(
STableDataBlocks
*
)
taosMemoryMalloc
(
sizeof
(
STableDataBlocks
));
if
(
NULL
==
*
pDst
)
{
STMT_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
memcpy
(
*
pDst
,
pSrc
,
sizeof
(
STableDataBlocks
));
(
*
pDst
)
->
cloned
=
true
;
stmtResetDataBlock
(
*
pDst
);
return
TSDB_CODE_SUCCESS
;
}
void
stmtFreeDataBlock
(
STableDataBlocks
*
pDataBlock
)
{
if
(
pDataBlock
==
NULL
)
{
return
;
}
taosMemoryFreeClear
(
pDataBlock
->
pData
);
taosMemoryFreeClear
(
pDataBlock
);
}
int32_t
stmtCacheBlock
(
STscStmt
*
pStmt
)
{
if
(
pStmt
->
sql
.
type
!=
STMT_TYPE_MULTI_INSERT
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -138,7 +68,7 @@ int32_t stmtCacheBlock(STscStmt *pStmt) {
STableDataBlocks
**
pSrc
=
taosHashGet
(
pStmt
->
exec
.
pBlockHash
,
&
uid
,
sizeof
(
uid
));
STableDataBlocks
*
pDst
=
NULL
;
STMT_ERR_RET
(
stmtClone
DataBlock
(
&
pDst
,
*
pSrc
));
STMT_ERR_RET
(
qCloneStmt
DataBlock
(
&
pDst
,
*
pSrc
));
SStmtTableCache
cache
=
{
.
pDataBlock
=
pDst
,
...
...
@@ -154,6 +84,37 @@ int32_t stmtCacheBlock(STscStmt *pStmt) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtParseSql
(
STscStmt
*
pStmt
)
{
SStmtCallback
stmtCb
=
{
.
pStmt
=
pStmt
,
.
getTbNameFn
=
stmtGetTbName
,
.
setBindInfoFn
=
stmtSetBindInfo
,
.
setExecInfoFn
=
stmtSetExecInfo
,
.
getExecInfoFn
=
stmtGetExecInfo
,
};
STMT_ERR_RET
(
parseSql
(
pStmt
->
exec
.
pRequest
,
false
,
&
pStmt
->
sql
.
pQuery
,
&
stmtCb
));
pStmt
->
bind
.
needParse
=
false
;
switch
(
nodeType
(
pStmt
->
sql
.
pQuery
->
pRoot
))
{
case
QUERY_NODE_VNODE_MODIF_STMT
:
if
(
0
==
pStmt
->
sql
.
type
)
{
pStmt
->
sql
.
type
=
STMT_TYPE_INSERT
;
}
break
;
case
QUERY_NODE_SELECT_STMT
:
pStmt
->
sql
.
type
=
STMT_TYPE_QUERY
;
break
;
default:
tscError
(
"not supported stmt type %d"
,
nodeType
(
pStmt
->
sql
.
pQuery
->
pRoot
));
STMT_ERR_RET
(
TSDB_CODE_TSC_STMT_CLAUSE_ERROR
);
}
STMT_ERR_RET
(
stmtCacheBlock
(
pStmt
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtCleanBindInfo
(
STscStmt
*
pStmt
)
{
pStmt
->
bind
.
tbUid
=
0
;
pStmt
->
bind
.
tbSuid
=
0
;
...
...
@@ -163,6 +124,8 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
taosMemoryFreeClear
(
pStmt
->
bind
.
tbName
);
destroyBoundColumnInfo
(
pStmt
->
bind
.
boundTags
);
taosMemoryFreeClear
(
pStmt
->
bind
.
boundTags
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtCleanExecInfo
(
STscStmt
*
pStmt
,
bool
keepTable
)
{
...
...
@@ -172,16 +135,17 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable) {
void
*
pIter
=
taosHashIterate
(
pStmt
->
exec
.
pBlockHash
,
NULL
);
while
(
pIter
)
{
STableDataBlocks
*
pBlocks
=
*
(
STableDataBlocks
**
)
pIter
;
if
(
keepTable
&&
(
*
(
uint64_t
*
)
taosHashGetKey
(
pIter
,
NULL
)
==
pStmt
->
bind
.
tbUid
))
{
taosMemoryFreeClear
(
pBlocks
->
pData
);
stmtResetDataBlock
(
pBlocks
);
uint64_t
*
key
=
taosHashGetKey
(
pIter
,
NULL
);
if
(
keepTable
&&
(
*
key
==
pStmt
->
bind
.
tbUid
))
{
qResetStmtDataBlock
(
pBlocks
,
true
);
pIter
=
taosHashIterate
(
pStmt
->
exec
.
pBlockHash
,
pIter
);
continue
;
}
stmtFreeDataBlock
(
pBlocks
);
qFreeStmtDataBlock
(
pBlocks
);
taosHashRemove
(
pStmt
->
exec
.
pBlockHash
,
key
,
sizeof
(
*
key
));
pIter
=
taosHashIterate
(
pStmt
->
exec
.
pBlockHash
,
pIter
);
}
...
...
@@ -206,8 +170,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
while
(
pIter
)
{
SStmtTableCache
*
pCache
=
*
(
SStmtTableCache
**
)
pIter
;
pCache
->
pDataBlock
->
cloned
=
false
;
destroyDataBlock
(
pCache
->
pDataBlock
);
qDestroyStmtDataBlock
(
pCache
->
pDataBlock
);
destroyBoundColumnInfo
(
pCache
->
boundTags
);
pIter
=
taosHashIterate
(
pStmt
->
sql
.
pTableCache
,
pIter
);
...
...
@@ -269,13 +232,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt
->
bind
.
boundTags
=
pCache
->
boundTags
;
STableDataBlocks
*
pNewBlock
=
NULL
;
STMT_ERR_RET
(
stmtCloneDataBlock
(
&
pNewBlock
,
pCache
->
pDataBlock
));
pNewBlock
->
pData
=
taosMemoryMalloc
(
pNewBlock
->
nAllocSize
);
if
(
NULL
==
pNewBlock
->
pData
)
{
stmtFreeDataBlock
(
pNewBlock
);
STMT_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
STMT_ERR_RET
(
qRebuildStmtDataBlock
(
&
pNewBlock
,
pCache
->
pDataBlock
));
if
(
taosHashPut
(
pStmt
->
exec
.
pBlockHash
,
&
pStmt
->
bind
.
tbUid
,
sizeof
(
pStmt
->
bind
.
tbUid
),
&
pNewBlock
,
POINTER_BYTES
))
{
STMT_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
...
...
@@ -319,7 +276,7 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
STMT_ERR_RET
(
stmtCleanSQLInfo
(
pStmt
));
}
STMT_SWITCH_STATUS
(
s
tmt
,
STMT_PREPARE
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_SWITCH_STATUS
(
pS
tmt
,
STMT_PREPARE
,
TSDB_CODE_TSC_STMT_API_ERROR
);
pStmt
->
sql
.
sqlStr
=
strndup
(
sql
,
length
);
pStmt
->
sql
.
sqlLen
=
length
;
...
...
@@ -331,7 +288,7 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
int
stmtSetTbName
(
TAOS_STMT
*
stmt
,
const
char
*
tbName
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_SWITCH_STATUS
(
s
tmt
,
STMT_SETTBNAME
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_SWITCH_STATUS
(
pS
tmt
,
STMT_SETTBNAME
,
TSDB_CODE_TSC_STMT_API_ERROR
);
taosMemoryFree
(
pStmt
->
bind
.
tbName
);
...
...
@@ -348,10 +305,10 @@ int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) {
return
TSDB_CODE_SUCCESS
;
}
int
stmtSetTbTags
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
tags
)
{
int
stmtSetTbTags
(
TAOS_STMT
*
stmt
,
TAOS_BIND
_v2
*
tags
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_SWITCH_STATUS
(
s
tmt
,
STMT_SETTBNAME
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_SWITCH_STATUS
(
pS
tmt
,
STMT_SETTBNAME
,
TSDB_CODE_TSC_STMT_API_ERROR
);
if
(
pStmt
->
bind
.
needParse
)
{
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
...
...
@@ -372,7 +329,7 @@ int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND *tags) {
int32_t
stmtFetchTagFields
(
TAOS_STMT
*
stmt
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_SWITCH_STATUS
(
s
tmt
,
STMT_FETCH_TAG_FIELDS
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_SWITCH_STATUS
(
pS
tmt
,
STMT_FETCH_TAG_FIELDS
,
TSDB_CODE_TSC_STMT_API_ERROR
);
if
(
pStmt
->
bind
.
needParse
)
{
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
...
...
@@ -394,10 +351,10 @@ int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fiel
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtFetchColFields
(
TAOS_STMT
*
stmt
,
int32_t
*
fieldNum
,
TAOS_FIELD
*
fields
)
{
int32_t
stmtFetchColFields
(
TAOS_STMT
*
stmt
,
int32_t
*
fieldNum
,
TAOS_FIELD
*
*
fields
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_SWITCH_STATUS
(
s
tmt
,
STMT_FETCH_COL_FIELDS
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_SWITCH_STATUS
(
pS
tmt
,
STMT_FETCH_COL_FIELDS
,
TSDB_CODE_TSC_STMT_API_ERROR
);
if
(
pStmt
->
bind
.
needParse
)
{
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
...
...
@@ -419,10 +376,10 @@ int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* field
return
TSDB_CODE_SUCCESS
;
}
int
stmtBindBatch
(
TAOS_STMT
*
stmt
,
TAOS_
MULTI_BIND
*
bind
)
{
int
stmtBindBatch
(
TAOS_STMT
*
stmt
,
TAOS_
BIND_v2
*
bind
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_SWITCH_STATUS
(
s
tmt
,
STMT_BIND
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_SWITCH_STATUS
(
pS
tmt
,
STMT_BIND
,
TSDB_CODE_TSC_STMT_API_ERROR
);
if
(
pStmt
->
bind
.
needParse
&&
pStmt
->
sql
.
runTimes
&&
pStmt
->
sql
.
type
>
0
&&
STMT_TYPE_MULTI_INSERT
!=
pStmt
->
sql
.
type
)
{
pStmt
->
bind
.
needParse
=
false
;
...
...
@@ -451,7 +408,7 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
int
stmtAddBatch
(
TAOS_STMT
*
stmt
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_SWITCH_STATUS
(
s
tmt
,
STMT_ADD_BATCH
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_SWITCH_STATUS
(
pS
tmt
,
STMT_ADD_BATCH
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_ERR_RET
(
stmtCacheBlock
(
pStmt
));
...
...
@@ -462,7 +419,7 @@ int stmtExec(TAOS_STMT *stmt) {
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
int32_t
code
=
0
;
STMT_SWITCH_STATUS
(
s
tmt
,
STMT_EXECUTE
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_SWITCH_STATUS
(
pS
tmt
,
STMT_EXECUTE
,
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_ERR_RET
(
qBuildStmtOutput
(
pStmt
->
sql
.
pQuery
,
pStmt
->
exec
.
pVgHash
,
pStmt
->
exec
.
pBlockHash
));
...
...
@@ -484,7 +441,7 @@ int stmtClose(TAOS_STMT *stmt) {
return
TSDB_CODE_SUCCESS
;
}
char
*
stmtErrstr
(
TAOS_STMT
*
stmt
)
{
c
onst
c
har
*
stmtErrstr
(
TAOS_STMT
*
stmt
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
stmt
==
NULL
)
{
...
...
source/client/src/tmq.c
浏览文件 @
c8fe5bc8
...
...
@@ -582,7 +582,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
int32_t
code
=
0
;
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
false
,
&
pQueryNode
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
false
,
&
pQueryNode
,
NULL
),
_return
);
// todo check for invalid sql statement and return with error code
...
...
source/libs/parser/inc/parInsertData.h
浏览文件 @
c8fe5bc8
...
...
@@ -131,7 +131,6 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
int32_t
schemaIdxCompar
(
const
void
*
lhs
,
const
void
*
rhs
);
int32_t
boundIdxCompar
(
const
void
*
lhs
,
const
void
*
rhs
);
void
setBoundColumnInfo
(
SParsedDataColInfo
*
pColList
,
SSchema
*
pSchema
,
col_id_t
numOfCols
);
void
destroyBoundColumnInfo
(
void
*
pBoundInfo
);
void
destroyBlockArrayList
(
SArray
*
pDataBlockList
);
void
destroyBlockHashmap
(
SHashObj
*
pDataBlockHash
);
int
initRowBuilder
(
SRowBuilder
*
pBuilder
,
int16_t
schemaVer
,
SParsedDataColInfo
*
pColInfo
);
...
...
@@ -139,5 +138,7 @@ int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t
int32_t
getDataBlockFromList
(
SHashObj
*
pHashList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
const
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
,
SVCreateTbReq
*
pCreateTbReq
);
int32_t
mergeTableDataBlocks
(
SHashObj
*
pHashObj
,
uint8_t
payloadType
,
SArray
**
pVgDataBlocks
);
int32_t
buildCreateTbMsg
(
STableDataBlocks
*
pBlocks
,
SVCreateTbReq
*
pCreateTbReq
);
int32_t
allocateMemForSize
(
STableDataBlocks
*
pDataBlock
,
int32_t
allSize
);
#endif // TDENGINE_DATABLOCKMGT_H
source/libs/parser/inc/parInt.h
浏览文件 @
c8fe5bc8
...
...
@@ -24,27 +24,11 @@ extern "C" {
#include "parToken.h"
#include "parUtil.h"
typedef
struct
SKvParam
{
SKVRowBuilder
*
builder
;
SSchema
*
schema
;
char
buf
[
TSDB_MAX_TAGS_LEN
];
}
SKvParam
;
#define CHECK_CODE(expr) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
return code; \
} \
} while (0)
int32_t
parseInsertSql
(
SParseContext
*
pContext
,
SQuery
**
pQuery
);
int32_t
parse
(
SParseContext
*
pParseCxt
,
SQuery
**
pQuery
);
int32_t
translate
(
SParseContext
*
pParseCxt
,
SQuery
*
pQuery
);
int32_t
extractResultSchema
(
const
SNode
*
pRoot
,
int32_t
*
numOfCols
,
SSchema
**
pSchema
);
int32_t
calculateConstant
(
SParseContext
*
pParseCxt
,
SQuery
*
pQuery
);
int32_t
createSName
(
SName
*
pName
,
SToken
*
pTableName
,
int32_t
acctId
,
const
char
*
dbName
,
SMsgBuf
*
pMsgBuf
);
int32_t
KvRowAppend
(
SMsgBuf
*
pMsgBuf
,
const
void
*
value
,
int32_t
len
,
void
*
param
);
#ifdef __cplusplus
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
c8fe5bc8
...
...
@@ -62,6 +62,29 @@ typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void *value, int32_t
static
uint8_t
TRUE_VALUE
=
(
uint8_t
)
TSDB_TRUE
;
static
uint8_t
FALSE_VALUE
=
(
uint8_t
)
TSDB_FALSE
;
typedef
struct
SKvParam
{
SKVRowBuilder
*
builder
;
SSchema
*
schema
;
char
buf
[
TSDB_MAX_TAGS_LEN
];
}
SKvParam
;
typedef
struct
SMemParam
{
SRowBuilder
*
rb
;
SSchema
*
schema
;
int32_t
toffset
;
col_id_t
colIdx
;
}
SMemParam
;
#define CHECK_CODE(expr) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
return code; \
} \
} while (0)
static
int32_t
skipInsertInto
(
SInsertParseContext
*
pCxt
)
{
SToken
sToken
;
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
...
...
@@ -156,7 +179,7 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD
}
int32_t
createSName
(
SName
*
pName
,
SToken
*
pTableName
,
int32_t
acctId
,
const
char
*
dbName
,
SMsgBuf
*
pMsgBuf
)
{
static
int32_t
createSName
(
SName
*
pName
,
SToken
*
pTableName
,
int32_t
acctId
,
const
char
*
dbName
,
SMsgBuf
*
pMsgBuf
)
{
const
char
*
msg1
=
"name too long"
;
const
char
*
msg2
=
"invalid database name"
;
const
char
*
msg3
=
"db is not specified"
;
...
...
@@ -294,7 +317,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
checkTimestamp
(
STableDataBlocks
*
pDataBlocks
,
const
char
*
start
)
{
int32_t
checkTimestamp
(
STableDataBlocks
*
pDataBlocks
,
const
char
*
start
)
{
// once the data block is disordered, we do NOT keep previous timestamp any more
if
(
!
pDataBlocks
->
ordered
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -600,13 +623,6 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
return
TSDB_CODE_FAILED
;
}
typedef
struct
SMemParam
{
SRowBuilder
*
rb
;
SSchema
*
schema
;
int32_t
toffset
;
col_id_t
colIdx
;
}
SMemParam
;
static
FORCE_INLINE
int32_t
MemRowAppend
(
SMsgBuf
*
pMsgBuf
,
const
void
*
value
,
int32_t
len
,
void
*
param
)
{
SMemParam
*
pa
=
(
SMemParam
*
)
param
;
SRowBuilder
*
rb
=
pa
->
rb
;
...
...
@@ -713,7 +729,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return
TSDB_CODE_SUCCESS
;
}
int32_t
KvRowAppend
(
SMsgBuf
*
pMsgBuf
,
const
void
*
value
,
int32_t
len
,
void
*
param
)
{
static
int32_t
KvRowAppend
(
SMsgBuf
*
pMsgBuf
,
const
void
*
value
,
int32_t
len
,
void
*
param
)
{
SKvParam
*
pa
=
(
SKvParam
*
)
param
;
int8_t
type
=
pa
->
schema
->
type
;
...
...
@@ -979,7 +995,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
return
TSDB_CODE_SUCCESS
;
}
static
void
destroyCreateSubTbReq
(
SVCreateTbReq
*
pReq
)
{
void
destroyCreateSubTbReq
(
SVCreateTbReq
*
pReq
)
{
taosMemoryFreeClear
(
pReq
->
dbFName
);
taosMemoryFreeClear
(
pReq
->
name
);
taosMemoryFreeClear
(
pReq
->
ctbCfg
.
pTag
);
...
...
@@ -1111,7 +1127,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
memcpy
(
tags
,
&
pCxt
->
tags
,
sizeof
(
pCxt
->
tags
));
(
*
pCxt
->
pStmtCb
->
setBindInfoFn
)(
pCxt
->
pTableMeta
,
tags
);
(
*
pCxt
->
pStmtCb
->
setBindInfoFn
)(
pCxt
->
p
StmtCb
->
pStmt
,
pCxt
->
p
TableMeta
,
tags
);
memset
(
&
pCxt
->
tags
,
0
,
sizeof
(
pCxt
->
tags
));
(
*
pCxt
->
pStmtCb
->
setExecInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
);
...
...
@@ -1149,8 +1165,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if
(
pContext
->
pStmtCb
&&
*
pQuery
)
{
(
*
pContext
->
pStmtCb
->
getExecInfoFn
)(
pContext
->
pStmtCb
->
pStmt
,
&
context
.
pVgroupsHashObj
,
&
context
.
pTableBlockHashObj
);
}
else
{
context
.
pVgroupsHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
false
)
,
context
.
pTableBlockHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
)
,
context
.
pVgroupsHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
false
)
;
context
.
pTableBlockHashObj
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
)
;
}
if
(
NULL
==
context
.
pVgroupsHashObj
||
NULL
==
context
.
pTableBlockHashObj
||
...
...
@@ -1184,7 +1200,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
}
int32_t
qCreateSName
(
SName
*
pName
,
char
*
pTableName
,
int32_t
acctId
,
char
*
dbName
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
int32_t
qCreateSName
(
SName
*
pName
,
c
onst
c
har
*
pTableName
,
int32_t
acctId
,
char
*
dbName
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
SMsgBuf
msg
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SToken
sToken
;
int32_t
code
=
0
;
...
...
@@ -1203,7 +1219,7 @@ int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbNam
NEXT_TOKEN
(
pTableName
,
sToken
);
if
(
S
Token
.
n
>
0
)
{
if
(
s
Token
.
n
>
0
)
{
return
buildInvalidOperationMsg
(
&
msg
,
"table name format is wrong"
);
}
...
...
@@ -1217,12 +1233,12 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
SInsertParseContext
insertCtx
=
{
.
pVgroupsHashObj
=
pVgHash
,
.
pTableBlockHashObj
=
pBlockHash
,
.
pOutput
=
pQuery
->
pRoot
.
pOutput
=
(
SVnodeModifOpStmt
*
)
pQuery
->
pRoot
,
};
// merge according to vgId
if
(
taosHashGetSize
(
insertCtx
.
pTableBlockHashObj
)
>
0
)
{
CHECK_CODE
_GOTO
(
mergeTableDataBlocks
(
insertCtx
.
pTableBlockHashObj
,
modifyNode
->
payloadType
,
&
insertCtx
.
pVgDataBlocks
),
_return
);
CHECK_CODE
(
mergeTableDataBlocks
(
insertCtx
.
pTableBlockHashObj
,
modifyNode
->
payloadType
,
&
insertCtx
.
pVgDataBlocks
)
);
}
CHECK_CODE
(
buildOutput
(
&
insertCtx
));
...
...
@@ -1230,4 +1246,171 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
SName
*
pName
,
TAOS_BIND_v2
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
){
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SParsedDataColInfo
*
tags
=
(
SParsedDataColInfo
*
)
boundTags
;
if
(
NULL
==
tags
)
{
return
TSDB_CODE_QRY_APP_ERROR
;
}
SKVRowBuilder
tagBuilder
;
if
(
tdInitKVRowBuilder
(
&
tagBuilder
)
<
0
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
SSchema
*
pSchema
=
getTableTagSchema
(
pDataBlock
->
pTableMeta
);
SKvParam
param
=
{.
builder
=
&
tagBuilder
};
for
(
int
c
=
0
;
c
<
tags
->
numOfBound
;
++
c
)
{
if
(
bind
[
c
].
is_null
&&
bind
[
c
].
is_null
[
0
])
{
KvRowAppend
(
&
pBuf
,
NULL
,
0
,
&
param
);
continue
;
}
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
boundColumns
[
c
]
-
1
];
// colId starts with 1
param
.
schema
=
pTagSchema
;
int32_t
colLen
=
pTagSchema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
colLen
=
bind
[
c
].
length
[
0
];
}
CHECK_CODE
(
KvRowAppend
(
&
pBuf
,
(
char
*
)
bind
[
c
].
buffer
,
colLen
,
&
param
));
}
SKVRow
row
=
tdGetKVRowFromBuilder
(
&
tagBuilder
);
if
(
NULL
==
row
)
{
tdDestroyKVRowBuilder
(
&
tagBuilder
);
return
buildInvalidOperationMsg
(
&
pBuf
,
"tag value expected"
);
}
tdSortKVRowByColIdx
(
row
);
SVCreateTbReq
tbReq
=
{
0
};
CHECK_CODE
(
buildCreateTbReq
(
&
tbReq
,
pName
,
row
,
suid
));
CHECK_CODE
(
buildCreateTbMsg
(
pDataBlock
,
&
tbReq
));
destroyCreateSubTbReq
(
&
tbReq
);
tdDestroyKVRowBuilder
(
&
tagBuilder
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBindStmtColsValue
(
void
*
pBlock
,
TAOS_BIND_v2
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
SSchema
*
pSchema
=
getTableColumnSchema
(
pDataBlock
->
pTableMeta
);
int32_t
extendedRowSize
=
getExtendedRowSize
(
pDataBlock
);
SParsedDataColInfo
*
spd
=
&
pDataBlock
->
boundColumnInfo
;
SRowBuilder
*
pBuilder
=
&
pDataBlock
->
rowBuilder
;
SMemParam
param
=
{.
rb
=
pBuilder
};
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
CHECK_CODE
(
allocateMemForSize
(
pDataBlock
,
extendedRowSize
*
bind
->
num
));
for
(
int32_t
r
=
0
;
r
<
bind
->
num
;
++
r
)
{
STSRow
*
row
=
(
STSRow
*
)(
pDataBlock
->
pData
+
pDataBlock
->
size
);
// skip the SSubmitBlk header
tdSRowResetBuf
(
pBuilder
,
row
);
// 1. set the parsed value from sql string
for
(
int
c
=
0
;
c
<
spd
->
numOfBound
;
++
c
)
{
SSchema
*
pColSchema
=
&
pSchema
[
spd
->
boundColumns
[
c
]
-
1
];
param
.
schema
=
pColSchema
;
getSTSRowAppendInfo
(
pBuilder
->
rowType
,
spd
,
c
,
&
param
.
toffset
,
&
param
.
colIdx
);
if
(
bind
[
c
].
is_null
&&
bind
[
c
].
is_null
[
r
])
{
CHECK_CODE
(
MemRowAppend
(
&
pBuf
,
NULL
,
0
,
&
param
));
}
else
{
int32_t
colLen
=
pColSchema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
pColSchema
->
type
))
{
colLen
=
bind
[
c
].
length
[
r
];
}
CHECK_CODE
(
MemRowAppend
(
&
pBuf
,
(
char
*
)
bind
[
c
].
buffer
+
bind
[
c
].
buffer_length
*
r
,
colLen
,
&
param
));
}
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
pColSchema
->
colId
)
{
TSKEY
tsKey
=
TD_ROW_KEY
(
row
);
checkTimestamp
(
pDataBlock
,
(
const
char
*
)
&
tsKey
);
}
}
// set the null value for the columns that do not assign values
if
((
spd
->
numOfBound
<
spd
->
numOfCols
)
&&
TD_IS_TP_ROW
(
row
))
{
for
(
int32_t
i
=
0
;
i
<
spd
->
numOfCols
;
++
i
)
{
if
(
spd
->
cols
[
i
].
valStat
==
VAL_STAT_NONE
)
{
// the primary TS key is not VAL_STAT_NONE
tdAppendColValToTpRow
(
pBuilder
,
TD_VTYPE_NONE
,
getNullValue
(
pSchema
[
i
].
type
),
true
,
pSchema
[
i
].
type
,
i
,
spd
->
cols
[
i
].
toffset
);
}
}
}
pDataBlock
->
size
+=
extendedRowSize
;
}
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)(
pDataBlock
->
pData
);
if
(
TSDB_CODE_SUCCESS
!=
setBlockInfo
(
pBlocks
,
pDataBlock
,
bind
->
num
))
{
return
buildInvalidOperationMsg
(
&
pBuf
,
"too many rows in sql, total number of rows should be less than 32767"
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
buildBoundFields
(
SParsedDataColInfo
*
boundInfo
,
SSchema
*
pSchema
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
*
fields
=
taosMemoryCalloc
(
boundInfo
->
numOfBound
,
sizeof
(
TAOS_FIELD
));
if
(
NULL
==
*
fields
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
boundInfo
->
numOfBound
;
++
i
)
{
SSchema
*
pTagSchema
=
&
pSchema
[
boundInfo
->
boundColumns
[
i
]
-
1
];
strcpy
((
*
fields
)[
i
].
name
,
pTagSchema
->
name
);
(
*
fields
)[
i
].
type
=
pTagSchema
->
type
;
(
*
fields
)[
i
].
bytes
=
pTagSchema
->
bytes
;
}
*
fieldNum
=
boundInfo
->
numOfBound
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
SParsedDataColInfo
*
tags
=
(
SParsedDataColInfo
*
)
boundTags
;
if
(
NULL
==
tags
)
{
return
TSDB_CODE_QRY_APP_ERROR
;
}
SSchema
*
pSchema
=
getTableTagSchema
(
pDataBlock
->
pTableMeta
);
if
(
tags
->
numOfBound
<=
0
)
{
*
fieldNum
=
0
;
*
fields
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
CHECK_CODE
(
buildBoundFields
(
tags
,
pSchema
,
fieldNum
,
fields
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBuildStmtColFields
(
void
*
pBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
SSchema
*
pSchema
=
getTableColumnSchema
(
pDataBlock
->
pTableMeta
);
if
(
pDataBlock
->
boundColumnInfo
.
numOfBound
<=
0
)
{
*
fieldNum
=
0
;
*
fields
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
CHECK_CODE
(
buildBoundFields
(
&
pDataBlock
->
boundColumnInfo
,
pSchema
,
fieldNum
,
fields
));
return
TSDB_CODE_SUCCESS
;
}
source/libs/parser/src/parInsertData.c
浏览文件 @
c8fe5bc8
...
...
@@ -156,7 +156,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildCreateTbMsg
(
STableDataBlocks
*
pBlocks
,
SVCreateTbReq
*
pCreateTbReq
)
{
int32_t
buildCreateTbMsg
(
STableDataBlocks
*
pBlocks
,
SVCreateTbReq
*
pCreateTbReq
)
{
int32_t
len
=
tSerializeSVCreateTbReq
(
NULL
,
pCreateTbReq
);
if
(
pBlocks
->
nAllocSize
-
pBlocks
->
size
<
len
)
{
pBlocks
->
nAllocSize
+=
len
+
pBlocks
->
rowSize
;
...
...
@@ -571,166 +571,75 @@ int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBindStmtTagsValue
(
STableDataBlocks
*
pDataBlock
,
void
*
boundTags
,
int64_t
suid
,
SName
*
pName
,
TAOS_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
){
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SParsedDataColInfo
*
tags
=
(
SParsedDataColInfo
*
)
boundTags
;
if
(
NULL
==
tags
)
{
return
TSDB_CODE_QRY_APP_ERROR
;
}
SKVRowBuilder
tagBuilder
;
if
(
tdInitKVRowBuilder
(
&
tagBuilder
)
<
0
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
SSchema
*
pSchema
=
getTableTagSchema
(
pDataBlock
->
pTableMeta
);
SKvParam
param
=
{.
builder
=
&
tagBuilder
};
for
(
int
c
=
0
;
c
<
tags
->
numOfBound
;
++
c
)
{
if
(
bind
[
c
].
is_null
&&
bind
[
c
].
is_null
[
0
])
{
KvRowAppend
(
&
pBuf
,
NULL
,
0
,
&
param
);
continue
;
}
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
boundColumns
[
c
]
-
1
];
// colId starts with 1
param
.
schema
=
pTagSchema
;
void
qResetStmtDataBlock
(
void
*
block
,
bool
freeData
)
{
STableDataBlocks
*
pBlock
=
(
STableDataBlocks
*
)
block
;
int32_t
colLen
=
pTagSchema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
colLen
=
bind
[
c
].
length
[
0
];
}
CHECK_CODE
(
KvRowAppend
(
&
pBuf
,
(
char
*
)
bind
[
c
].
buffer
,
colLen
,
&
param
));
}
SKVRow
row
=
tdGetKVRowFromBuilder
(
&
tagBuilder
);
if
(
NULL
==
row
)
{
tdDestroyKVRowBuilder
(
&
tagBuilder
);
return
buildInvalidOperationMsg
(
&
pBuf
,
"tag value expected"
);
if
(
freeData
)
{
taosMemoryFree
(
pBlock
->
pData
);
}
tdSortKVRowByColIdx
(
row
);
SVCreateTbReq
tbReq
=
{
0
};
CHECK_CODE
(
buildCreateTbReq
(
&
tbReq
,
pName
,
row
,
suid
));
CHECK_CODE
(
buildCreateTbMsg
(
pDataBlock
,
&
tbReq
));
destroyCreateSubTbReq
(
&
tbReq
);
tdDestroyKVRowBuilder
(
&
tagBuilder
);
return
TSDB_CODE_SUCCESS
;
pBlock
->
pData
=
NULL
;
pBlock
->
ordered
=
true
;
pBlock
->
prevTS
=
INT64_MIN
;
pBlock
->
size
=
sizeof
(
SSubmitBlk
);
pBlock
->
tsSource
=
-
1
;
pBlock
->
numOfTables
=
1
;
pBlock
->
nAllocSize
=
TSDB_PAYLOAD_SIZE
;
pBlock
->
headerSize
=
pBlock
->
size
;
memset
(
&
pBlock
->
rowBuilder
,
0
,
sizeof
(
pBlock
->
rowBuilder
));
}
int32_t
qBindStmtColsValue
(
STableDataBlocks
*
pDataBlock
,
TAOS_MULTI_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
SSchema
*
pSchema
=
getTableColumnSchema
(
pDataBlock
->
pTableMeta
);
int32_t
extendedRowSize
=
getExtendedRowSize
(
pDataBlock
);
SParsedDataColInfo
*
spd
=
&
pDataBlock
->
boundColumnInfo
;
SRowBuilder
*
pBuilder
=
&
pDataBlock
->
rowBuilder
;
SMemParam
param
=
{.
rb
=
pBuilder
};
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
CHECK_CODE
(
allocateMemForSize
(
pDataBlock
,
extendedRowSize
*
bind
->
num
);
for
(
int32_t
r
=
0
;
r
<
bind
->
num
;
++
r
)
{
STSRow
*
row
=
(
STSRow
*
)(
pDataBlock
->
pData
+
pDataBlock
->
size
);
// skip the SSubmitBlk header
tdSRowResetBuf
(
pBuilder
,
row
);
// 1. set the parsed value from sql string
for
(
int
c
=
0
;
c
<
spd
->
numOfBound
;
++
c
)
{
SSchema
*
pColSchema
=
&
pSchema
[
spd
->
boundColumns
[
c
]
-
1
];
param
.
schema
=
pColSchema
;
getSTSRowAppendInfo
(
pBuilder
->
rowType
,
spd
,
c
,
&
param
.
toffset
,
&
param
.
colIdx
);
if
(
bind
[
c
].
is_null
&&
bind
[
c
].
is_null
[
r
])
{
CHECK_CODE
(
MemRowAppend
(
&
pBuf
,
NULL
,
0
,
&
param
));
}
else
{
int32_t
colLen
=
pColSchema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
pColSchema
->
type
))
{
colLen
=
bind
[
c
].
length
[
r
];
}
CHECK_CODE
(
MemRowAppend
(
&
pBuf
,
(
char
*
)
bind
[
c
].
buffer
+
bind
[
c
].
buffer_length
*
r
,
colLen
,
&
param
));
}
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
pColSchema
->
colId
)
{
TSKEY
tsKey
=
TD_ROW_KEY
(
row
);
checkTimestamp
(
pDataBlock
,
(
const
char
*
)
&
tsKey
);
}
}
// set the null value for the columns that do not assign values
if
((
spd
->
numOfBound
<
spd
->
numOfCols
)
&&
TD_IS_TP_ROW
(
row
))
{
for
(
int32_t
i
=
0
;
i
<
spd
->
numOfCols
;
++
i
)
{
if
(
spd
->
cols
[
i
].
valStat
==
VAL_STAT_NONE
)
{
// the primary TS key is not VAL_STAT_NONE
tdAppendColValToTpRow
(
pBuilder
,
TD_VTYPE_NONE
,
getNullValue
(
pSchema
[
i
].
type
),
true
,
pSchema
[
i
].
type
,
i
,
spd
->
cols
[
i
].
toffset
);
}
}
}
pDataBlock
->
size
+=
extendedRowSize
;
int32_t
qCloneStmtDataBlock
(
void
**
pDst
,
void
*
pSrc
)
{
*
pDst
=
taosMemoryMalloc
(
sizeof
(
STableDataBlocks
));
if
(
NULL
==
*
pDst
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)(
pDataBlock
->
pData
);
if
(
TSDB_CODE_SUCCESS
!=
setBlockInfo
(
pBlocks
,
pDataBlock
,
bind
->
num
))
{
return
buildInvalidOperationMsg
(
&
pBuf
,
"too many rows in sql, total number of rows should be less than 32767"
);
}
memcpy
(
*
pDst
,
pSrc
,
sizeof
(
STableDataBlocks
)
);
((
STableDataBlocks
*
)(
*
pDst
))
->
cloned
=
true
;
qResetStmtDataBlock
(
*
pDst
,
false
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
buildBoundFields
(
SParsedDataColInfo
*
boundInfo
,
SSchema
*
pSchema
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
*
fields
=
taosMemoryCalloc
(
boundInfo
->
numOfBound
,
sizeof
(
TAOS_FIELD
));
if
(
NULL
==
*
fields
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
qRebuildStmtDataBlock
(
void
**
pDst
,
void
*
pSrc
)
{
int32_t
code
=
qCloneStmtDataBlock
(
pDst
,
pSrc
);
if
(
code
)
{
return
code
;
}
for
(
int32_t
i
=
0
;
i
<
boundInfo
->
numOfBound
;
++
i
)
{
SSchema
*
pTagSchema
=
&
pSchema
[
boundInfo
->
boundColumns
[
i
]
-
1
]
;
strcpy
((
*
fields
)[
i
].
name
,
pTagSchema
->
name
);
(
*
fields
)[
i
].
type
=
pTagSchema
->
type
;
(
*
fields
)[
i
].
bytes
=
pTagSchema
->
bytes
;
STableDataBlocks
*
pBlock
=
(
STableDataBlocks
*
)
*
pDst
;
pBlock
->
pData
=
taosMemoryMalloc
(
pBlock
->
nAllocSize
)
;
if
(
NULL
==
pBlock
->
pData
)
{
qFreeStmtDataBlock
(
pBlock
)
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
fieldNum
=
boundInfo
->
numOfBound
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBuildStmtTagFields
(
STableDataBlocks
*
pDataBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
SParsedDataColInfo
*
tags
=
(
SParsedDataColInfo
*
)
boundTags
;
if
(
NULL
==
tags
)
{
return
TSDB_CODE_QRY_APP_ERROR
;
}
SSchema
*
pSchema
=
getTableTagSchema
(
pDataBlock
->
pTableMeta
);
if
(
tags
->
numOfBound
<=
0
)
{
*
fieldNum
=
0
;
*
fields
=
NULL
;
return
TSDB_CODE_SUCCESS
;
void
qFreeStmtDataBlock
(
void
*
pDataBlock
)
{
if
(
pDataBlock
==
NULL
)
{
return
;
}
CHECK_CODE
(
buildBoundFields
(
tags
,
pSchema
,
fieldNum
,
fields
));
return
TSDB_CODE_SUCCESS
;
taosMemoryFreeClear
(((
STableDataBlocks
*
)
pDataBlock
)
->
pData
);
taosMemoryFreeClear
(
pDataBlock
);
}
int32_t
qBuildStmtColFields
(
STableDataBlocks
*
pDataBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
SSchema
*
pSchema
=
getTableColumnSchema
(
pDataBlock
->
pTableMeta
);
if
(
pDataBlock
->
boundColumnInfo
.
numOfBound
<=
0
)
{
*
fieldNum
=
0
;
*
fields
=
NULL
;
return
TSDB_CODE_SUCCESS
;
void
qDestroyStmtDataBlock
(
void
*
pBlock
)
{
if
(
pBlock
==
NULL
)
{
return
;
}
CHECK_CODE
(
buildBoundFields
(
&
pDataBlock
->
boundColumnInfo
,
pSchema
,
fieldNum
,
fields
));
return
TSDB_CODE_SUCCESS
;
}
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
pDataBlock
->
cloned
=
false
;
destroyDataBlock
(
pDataBlock
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录