Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
6df8ed9f
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6df8ed9f
编写于
5月 07, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
support bind multiple tables
上级
a080f2c9
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
448 addition
and
67 deletion
+448
-67
src/client/inc/tscSubquery.h
src/client/inc/tscSubquery.h
+2
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+2
-1
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+5
-2
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+358
-16
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+52
-48
src/inc/taos.h
src/inc/taos.h
+2
-0
src/query/src/qTokenizer.c
src/query/src/qTokenizer.c
+22
-0
src/util/inc/tstoken.h
src/util/inc/tstoken.h
+3
-0
tests/examples/c/makefile
tests/examples/c/makefile
+2
-0
未找到文件。
src/client/inc/tscSubquery.h
浏览文件 @
6df8ed9f
...
...
@@ -48,6 +48,8 @@ void tscLockByThread(int64_t *lockedBy);
void
tscUnlockByThread
(
int64_t
*
lockedBy
);
int
tsInsertInitialCheck
(
SSqlObj
*
pSql
);
#ifdef __cplusplus
}
#endif
...
...
src/client/inc/tsclient.h
浏览文件 @
6df8ed9f
...
...
@@ -372,6 +372,7 @@ typedef struct SSqlObj {
tsem_t
rspSem
;
SSqlCmd
cmd
;
SSqlRes
res
;
bool
isBind
;
SSubqueryState
subState
;
struct
SSqlObj
**
pSubs
;
...
...
src/client/src/tscParseInsert.c
浏览文件 @
6df8ed9f
...
...
@@ -386,7 +386,7 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
* The server time/client time should not be mixed up in one sql string
* Do not employ sort operation is not involved if server time is used.
*/
static
int32_t
tsCheckTimestamp
(
STableDataBlocks
*
pDataBlocks
,
const
char
*
start
)
{
int32_t
tsCheckTimestamp
(
STableDataBlocks
*
pDataBlocks
,
const
char
*
start
)
{
// once the data block is disordered, we do NOT keep previous timestamp any more
if
(
!
pDataBlocks
->
ordered
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -411,6 +411,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
if
(
k
<=
pDataBlocks
->
prevTS
&&
(
pDataBlocks
->
tsSource
==
TSDB_USE_CLI_TS
))
{
pDataBlocks
->
ordered
=
false
;
tscWarn
(
"NOT ordered input timestamp"
);
}
pDataBlocks
->
prevTS
=
k
;
...
...
@@ -693,6 +694,8 @@ void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
pBlocks
->
numOfRows
=
i
+
1
;
dataBuf
->
size
=
sizeof
(
SSubmitBlk
)
+
dataBuf
->
rowSize
*
pBlocks
->
numOfRows
;
}
dataBuf
->
prevTS
=
INT64_MIN
;
}
static
int32_t
doParseInsertStatement
(
SSqlCmd
*
pCmd
,
char
**
str
,
STableDataBlocks
*
dataBuf
,
int32_t
*
totalNum
)
{
...
...
@@ -1262,7 +1265,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto
_clean
;
}
if
(
taosHashGetSize
(
pCmd
->
pTableBlockHashList
)
>
0
)
{
// merge according to vgId
if
(
(
pCmd
->
insertType
!=
TSDB_QUERY_TYPE_STMT_INSERT
)
&&
taosHashGetSize
(
pCmd
->
pTableBlockHashList
)
>
0
)
{
// merge according to vgId
if
((
code
=
tscMergeTableDataBlocks
(
pSql
,
true
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
...
...
src/client/src/tscPrepare.c
浏览文件 @
6df8ed9f
...
...
@@ -24,6 +24,7 @@
#include "tscSubquery.h"
int
tsParseInsertSql
(
SSqlObj
*
pSql
);
int32_t
tsCheckTimestamp
(
STableDataBlocks
*
pDataBlocks
,
const
char
*
start
);
////////////////////////////////////////////////////////////////////////////////
// functions for normal statement preparation
...
...
@@ -43,10 +44,21 @@ typedef struct SNormalStmt {
tVariant
*
params
;
}
SNormalStmt
;
typedef
struct
SMultiTbStmt
{
bool
nameSet
;
uint64_t
currentUid
;
uint32_t
tbNum
;
SStrToken
tbname
;
SHashObj
*
pTableHash
;
}
SMultiTbStmt
;
typedef
struct
STscStmt
{
bool
isInsert
;
bool
multiTbInsert
;
int64_t
prevTs
;
STscObj
*
taos
;
SSqlObj
*
pSql
;
SMultiTbStmt
mtb
;
SNormalStmt
normal
;
}
STscStmt
;
...
...
@@ -255,7 +267,7 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation
static
int
doBindParam
(
char
*
data
,
SParamInfo
*
param
,
TAOS_BIND
*
bind
)
{
static
int
doBindParam
(
STableDataBlocks
*
pBlock
,
char
*
data
,
SParamInfo
*
param
,
TAOS_BIND
*
bind
)
{
if
(
bind
->
is_null
!=
NULL
&&
*
(
bind
->
is_null
))
{
setNull
(
data
+
param
->
offset
,
param
->
type
,
param
->
bytes
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -690,12 +702,36 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
}
memcpy
(
data
+
param
->
offset
,
bind
->
buffer
,
size
);
if
(
param
->
offset
==
0
)
{
if
(
tsCheckTimestamp
(
pBlock
,
data
+
param
->
offset
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"invalid timestamp"
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int
insertStmtBindParam
(
STscStmt
*
stmt
,
TAOS_BIND
*
bind
)
{
SSqlCmd
*
pCmd
=
&
stmt
->
pSql
->
cmd
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STableDataBlocks
*
pBlock
=
NULL
;
if
(
pStmt
->
multiTbInsert
)
{
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
tscError
(
"Table block hash list is empty"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
pTableBlockHashList
,
(
const
char
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
));
if
(
t1
==
NULL
)
{
tscError
(
"no table data block in hash list, uid:%"
PRId64
,
pStmt
->
mtb
.
currentUid
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
pBlock
=
*
t1
;
}
else
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -703,16 +739,15 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
pCmd
->
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
}
STableDataBlocks
*
pBlock
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
pTableMeta
->
tableInfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
pBlock
,
NULL
);
if
(
ret
!=
0
)
{
// todo handle error
return
ret
;
}
}
uint32_t
totalDataSize
=
sizeof
(
SSubmitBlk
)
+
pCmd
->
batchSize
*
pBlock
->
rowSize
;
uint32_t
totalDataSize
=
sizeof
(
SSubmitBlk
)
+
(
pCmd
->
batchSize
+
1
)
*
pBlock
->
rowSize
;
if
(
totalDataSize
>
pBlock
->
nAllocSize
)
{
const
double
factor
=
1
.
5
;
...
...
@@ -729,7 +764,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
for
(
uint32_t
j
=
0
;
j
<
pBlock
->
numOfParams
;
++
j
)
{
SParamInfo
*
param
=
&
pBlock
->
params
[
j
];
int
code
=
doBindParam
(
data
,
param
,
&
bind
[
param
->
idx
]);
int
code
=
doBindParam
(
pBlock
,
data
,
param
,
&
bind
[
param
->
idx
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"param %d: type mismatch or invalid"
,
param
->
idx
);
return
code
;
...
...
@@ -739,9 +774,98 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
return
TSDB_CODE_SUCCESS
;
}
static
int
insertStmtBindParamBatch
(
STscStmt
*
stmt
,
TAOS_BIND
*
bind
,
int32_t
num
)
{
SSqlCmd
*
pCmd
=
&
stmt
->
pSql
->
cmd
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STableDataBlocks
*
pBlock
=
NULL
;
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
tscError
(
"Table block hash list is empty"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
pTableBlockHashList
,
(
const
char
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
));
if
(
t1
==
NULL
)
{
tscError
(
"no table data block in hash list, uid:%"
PRId64
,
pStmt
->
mtb
.
currentUid
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
pBlock
=
*
t1
;
uint32_t
totalDataSize
=
sizeof
(
SSubmitBlk
)
+
(
pCmd
->
batchSize
+
num
)
*
pBlock
->
rowSize
;
if
(
totalDataSize
>
pBlock
->
nAllocSize
)
{
const
double
factor
=
1
.
5
;
void
*
tmp
=
realloc
(
pBlock
->
pData
,
(
uint32_t
)(
totalDataSize
*
factor
));
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pBlock
->
pData
=
(
char
*
)
tmp
;
pBlock
->
nAllocSize
=
(
uint32_t
)(
totalDataSize
*
factor
);
}
for
(
uint32_t
i
=
0
;
i
<
num
;
++
i
)
{
char
*
data
=
pBlock
->
pData
+
sizeof
(
SSubmitBlk
)
+
pBlock
->
rowSize
*
(
pCmd
->
batchSize
+
i
);
TAOS_BIND
*
tbind
=
bind
+
pBlock
->
numOfParams
*
i
;
for
(
uint32_t
j
=
0
;
j
<
pBlock
->
numOfParams
;
++
j
)
{
SParamInfo
*
param
=
&
pBlock
->
params
[
j
];
int
code
=
doBindParam
(
pBlock
,
data
,
param
,
&
tbind
[
param
->
idx
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"param %d: type mismatch or invalid"
,
param
->
idx
);
return
code
;
}
}
}
pCmd
->
batchSize
+=
num
-
1
;
return
TSDB_CODE_SUCCESS
;
}
static
int
insertStmtUpdateBatch
(
STscStmt
*
stmt
)
{
SSqlObj
*
pSql
=
stmt
->
pSql
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
STableDataBlocks
*
pBlock
=
NULL
;
assert
(
pCmd
->
numOfClause
==
1
);
if
(
taosHashGetSize
(
pCmd
->
pTableBlockHashList
)
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
pTableBlockHashList
,
(
const
char
*
)
&
stmt
->
mtb
.
currentUid
,
sizeof
(
stmt
->
mtb
.
currentUid
));
if
(
t1
==
NULL
)
{
tscError
(
"no table data block in hash list, uid:%"
PRId64
,
stmt
->
mtb
.
currentUid
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
pBlock
=
*
t1
;
STableMeta
*
pTableMeta
=
pBlock
->
pTableMeta
;
pBlock
->
size
=
sizeof
(
SSubmitBlk
)
+
pCmd
->
batchSize
*
pBlock
->
rowSize
;
SSubmitBlk
*
pBlk
=
(
SSubmitBlk
*
)
pBlock
->
pData
;
pBlk
->
numOfRows
=
pCmd
->
batchSize
;
pBlk
->
dataLen
=
0
;
pBlk
->
uid
=
pTableMeta
->
id
.
uid
;
pBlk
->
tid
=
pTableMeta
->
id
.
tid
;
return
TSDB_CODE_SUCCESS
;
}
static
int
insertStmtAddBatch
(
STscStmt
*
stmt
)
{
SSqlCmd
*
pCmd
=
&
stmt
->
pSql
->
cmd
;
++
pCmd
->
batchSize
;
if
(
stmt
->
multiTbInsert
)
{
return
insertStmtUpdateBatch
(
stmt
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -835,6 +959,80 @@ static int insertStmtExecute(STscStmt* stmt) {
return
pSql
->
res
.
code
;
}
static
void
insertBatchClean
(
STscStmt
*
pStmt
)
{
SSqlCmd
*
pCmd
=
&
pStmt
->
pSql
->
cmd
;
SSqlObj
*
pSql
=
pStmt
->
pSql
;
int32_t
size
=
taosHashGetSize
(
pCmd
->
pTableBlockHashList
);
// data block reset
pCmd
->
batchSize
=
0
;
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
if
(
pCmd
->
pTableNameList
&&
pCmd
->
pTableNameList
[
i
])
{
tfree
(
pCmd
->
pTableNameList
[
i
]);
}
}
tfree
(
pCmd
->
pTableNameList
);
STableDataBlocks
**
p
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
NULL
);
STableDataBlocks
*
pOneTableBlock
=
*
p
;
while
(
1
)
{
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
pOneTableBlock
->
size
=
sizeof
(
SSubmitBlk
);
pBlocks
->
numOfRows
=
0
;
p
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
p
);
if
(
p
==
NULL
)
{
break
;
}
pOneTableBlock
=
*
p
;
}
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
pCmd
->
numOfTables
=
0
;
tscFreeSqlResult
(
pSql
);
tscFreeSubobj
(
pSql
);
tfree
(
pSql
->
pSubs
);
pSql
->
subState
.
numOfSub
=
0
;
}
static
int
insertBatchStmtExecute
(
STscStmt
*
pStmt
)
{
int32_t
code
=
0
;
if
(
pStmt
->
mtb
.
nameSet
==
false
)
{
tscError
(
"no table name set"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
pStmt
->
pSql
->
retry
=
pStmt
->
pSql
->
maxRetry
+
1
;
//no retry
if
(
taosHashGetSize
(
pStmt
->
pSql
->
cmd
.
pTableBlockHashList
)
>
0
)
{
// merge according to vgId
if
((
code
=
tscMergeTableDataBlocks
(
pStmt
->
pSql
,
false
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
code
=
tscHandleMultivnodeInsert
(
pStmt
->
pSql
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// wait for the callback function to post the semaphore
tsem_wait
(
&
pStmt
->
pSql
->
rspSem
);
insertBatchClean
(
pStmt
);
return
pStmt
->
pSql
->
res
.
code
;
}
////////////////////////////////////////////////////////////////////////////////
// interface functions
...
...
@@ -866,6 +1064,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
pSql
->
signature
=
pSql
;
pSql
->
pTscObj
=
pObj
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA
;
pSql
->
isBind
=
true
;
pStmt
->
pSql
=
pSql
;
return
pStmt
;
...
...
@@ -917,6 +1116,32 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
registerSqlObj
(
pSql
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
((
ret
=
tsInsertInitialCheck
(
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
int32_t
index
=
0
;
SStrToken
sToken
=
tStrGetToken
(
pCmd
->
curSql
,
&
index
,
false
);
if
(
sToken
.
n
==
0
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
if
(
sToken
.
n
==
1
&&
sToken
.
type
==
TK_QUESTION
)
{
pStmt
->
multiTbInsert
=
true
;
pStmt
->
mtb
.
tbname
=
sToken
;
pStmt
->
mtb
.
nameSet
=
false
;
if
(
pStmt
->
mtb
.
pTableHash
==
NULL
)
{
pStmt
->
mtb
.
pTableHash
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
}
return
TSDB_CODE_SUCCESS
;
}
pStmt
->
multiTbInsert
=
false
;
memset
(
&
pStmt
->
mtb
,
0
,
sizeof
(
pStmt
->
mtb
));
int32_t
code
=
tsParseSql
(
pSql
,
true
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
// wait for the callback function to post the semaphore
...
...
@@ -931,9 +1156,100 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
return
normalStmtPrepare
(
pStmt
);
}
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
SSqlObj
*
pSql
=
pStmt
->
pSql
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
stmt
==
NULL
||
pStmt
->
pSql
==
NULL
||
pStmt
->
taos
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
TSDB_CODE_TSC_DISCONNECTED
;
}
if
(
name
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_APP_ERROR
;
tscError
(
"name is NULL"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
if
(
pStmt
->
multiTbInsert
==
false
||
!
tscIsInsertData
(
pSql
->
sqlstr
))
{
terrno
=
TSDB_CODE_TSC_APP_ERROR
;
tscError
(
"not multi table insert"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
uint64_t
*
uid
=
(
uint64_t
*
)
taosHashGet
(
pStmt
->
mtb
.
pTableHash
,
name
,
strlen
(
name
));
if
(
uid
!=
NULL
)
{
pStmt
->
mtb
.
currentUid
=
*
uid
;
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
pTableBlockHashList
,
(
const
char
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
));
if
(
t1
==
NULL
)
{
tscError
(
"no table data block in hash list, uid:%"
PRId64
,
pStmt
->
mtb
.
currentUid
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
SSubmitBlk
*
pBlk
=
(
SSubmitBlk
*
)
(
*
t1
)
->
pData
;
pCmd
->
batchSize
=
pBlk
->
numOfRows
;
tscDebug
(
"table:%s is already prepared, uid:%"
PRIu64
,
name
,
pStmt
->
mtb
.
currentUid
);
return
TSDB_CODE_SUCCESS
;
}
pStmt
->
mtb
.
tbname
=
tscReplaceStrToken
(
&
pSql
->
sqlstr
,
&
pStmt
->
mtb
.
tbname
,
name
);
pStmt
->
mtb
.
nameSet
=
true
;
tscDebug
(
"sqlstr set to %s"
,
pSql
->
sqlstr
);
pSql
->
cmd
.
parseFinished
=
0
;
pSql
->
cmd
.
numOfParams
=
0
;
pSql
->
cmd
.
batchSize
=
0
;
if
(
taosHashGetSize
(
pCmd
->
pTableBlockHashList
)
>
0
)
{
SHashObj
*
hashList
=
pCmd
->
pTableBlockHashList
;
pCmd
->
pTableBlockHashList
=
NULL
;
tscResetSqlCmd
(
pCmd
,
true
);
pCmd
->
pTableBlockHashList
=
hashList
;
}
int32_t
code
=
tsParseSql
(
pStmt
->
pSql
,
true
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
// wait for the callback function to post the semaphore
tsem_wait
(
&
pStmt
->
pSql
->
rspSem
);
code
=
pStmt
->
pSql
->
res
.
code
;
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
STableDataBlocks
*
pBlock
=
NULL
;
code
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
pTableMeta
->
tableInfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
pBlock
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
SSubmitBlk
*
blk
=
(
SSubmitBlk
*
)
pBlock
->
pData
;
blk
->
numOfRows
=
0
;
pStmt
->
mtb
.
currentUid
=
pTableMeta
->
id
.
uid
;
pStmt
->
mtb
.
tbNum
++
;
taosHashPut
(
pStmt
->
mtb
.
pTableHash
,
name
,
strlen
(
name
),
(
char
*
)
&
pTableMeta
->
id
.
uid
,
sizeof
(
pTableMeta
->
id
.
uid
));
tscDebug
(
"table:%s is prepared, uid:%"
PRIu64
,
name
,
pStmt
->
mtb
.
currentUid
);
}
return
code
;
}
int
taos_stmt_close
(
TAOS_STMT
*
stmt
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
!
pStmt
->
isInsert
)
{
taosHashCleanup
(
pStmt
->
mtb
.
pTableHash
);
SNormalStmt
*
normal
=
&
pStmt
->
normal
;
if
(
normal
->
params
!=
NULL
)
{
for
(
uint16_t
i
=
0
;
i
<
normal
->
numParams
;
i
++
)
{
...
...
@@ -953,12 +1269,34 @@ int taos_stmt_close(TAOS_STMT* stmt) {
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
bind
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
pStmt
->
isInsert
)
{
if
(
pStmt
->
multiTbInsert
&&
pStmt
->
mtb
.
nameSet
==
false
)
{
tscError
(
"no table name set"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
return
insertStmtBindParam
(
pStmt
,
bind
);
}
else
{
return
normalStmtBindParam
(
pStmt
,
bind
);
}
}
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
bind
,
int32_t
num
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
num
<=
0
||
bind
==
NULL
)
{
tscError
(
"invalid parameter"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
if
(
!
pStmt
->
isInsert
||
!
pStmt
->
multiTbInsert
||
!
pStmt
->
mtb
.
nameSet
)
{
tscError
(
"not or invalid batch insert"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
return
insertStmtBindParamBatch
(
pStmt
,
bind
,
num
);
}
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
pStmt
->
isInsert
)
{
...
...
@@ -979,7 +1317,11 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
int
ret
=
0
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
pStmt
->
isInsert
)
{
if
(
pStmt
->
multiTbInsert
)
{
ret
=
insertBatchStmtExecute
(
pStmt
);
}
else
{
ret
=
insertStmtExecute
(
pStmt
);
}
}
else
{
// normal stmt query
char
*
sql
=
normalStmtBuildSql
(
pStmt
);
if
(
sql
==
NULL
)
{
...
...
src/client/src/tscUtil.c
浏览文件 @
6df8ed9f
...
...
@@ -1255,6 +1255,8 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
STableDataBlocks
*
pOneTableBlock
=
*
p
;
while
(
pOneTableBlock
)
{
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
if
(
pBlocks
->
numOfRows
>
0
)
{
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t
expandSize
=
getRowExpandSize
(
pOneTableBlock
->
pTableMeta
);
STableDataBlocks
*
dataBuf
=
NULL
;
...
...
@@ -1268,7 +1270,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
return
ret
;
}
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
int64_t
destSize
=
dataBuf
->
size
+
pOneTableBlock
->
size
+
pBlocks
->
numOfRows
*
expandSize
+
sizeof
(
STColumn
)
*
tscGetNumOfColumns
(
pOneTableBlock
->
pTableMeta
);
if
(
dataBuf
->
nAllocSize
<
destSize
)
{
...
...
@@ -1315,6 +1316,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
// the length does not include the SSubmitBlk structure
pBlocks
->
dataLen
=
htonl
(
finalLen
);
dataBuf
->
numOfTables
+=
1
;
}
else
{
tscWarn
(
"table %s data block is empty"
,
pOneTableBlock
->
tableName
.
tname
);
}
p
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
p
);
if
(
p
==
NULL
)
{
...
...
src/inc/taos.h
浏览文件 @
6df8ed9f
...
...
@@ -101,10 +101,12 @@ typedef struct TAOS_BIND {
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
);
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
int
taos_stmt_is_insert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
);
int
taos_stmt_get_param
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
);
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
bind
);
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
bind
,
int32_t
num
);
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
);
int
taos_stmt_execute
(
TAOS_STMT
*
stmt
);
TAOS_RES
*
taos_stmt_use_result
(
TAOS_STMT
*
stmt
);
...
...
src/query/src/qTokenizer.c
浏览文件 @
6df8ed9f
...
...
@@ -560,6 +560,28 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenId) {
return
0
;
}
SStrToken
tscReplaceStrToken
(
char
**
str
,
SStrToken
*
token
,
const
char
*
new
)
{
char
*
src
=
*
str
;
int32_t
nsize
=
strlen
(
new
);
int32_t
size
=
strlen
(
*
str
)
-
token
->
n
+
nsize
+
1
;
int32_t
bsize
=
(
uint64_t
)
token
->
z
-
(
uint64_t
)
src
;
SStrToken
ntoken
;
*
str
=
calloc
(
1
,
size
);
strncpy
(
*
str
,
src
,
bsize
);
strcat
(
*
str
,
new
);
strcat
(
*
str
,
token
->
z
+
token
->
n
);
ntoken
.
n
=
nsize
;
ntoken
.
z
=
*
str
+
bsize
;
tfree
(
src
);
return
ntoken
;
}
SStrToken
tStrGetToken
(
char
*
str
,
int32_t
*
i
,
bool
isPrevOptr
)
{
SStrToken
t0
=
{
0
};
...
...
src/util/inc/tstoken.h
浏览文件 @
6df8ed9f
...
...
@@ -182,6 +182,9 @@ static FORCE_INLINE int32_t tGetNumericStringType(const SStrToken* pToken) {
void
taosCleanupKeywordsTable
();
SStrToken
tscReplaceStrToken
(
char
**
str
,
SStrToken
*
token
,
const
char
*
new
);
#ifdef __cplusplus
}
#endif
...
...
tests/examples/c/makefile
浏览文件 @
6df8ed9f
...
...
@@ -14,6 +14,7 @@ exe:
gcc
$(CFLAGS)
./asyncdemo.c
-o
$(ROOT)
asyncdemo
$(LFLAGS)
gcc
$(CFLAGS)
./demo.c
-o
$(ROOT)
demo
$(LFLAGS)
gcc
$(CFLAGS)
./prepare.c
-o
$(ROOT)
prepare
$(LFLAGS)
gcc
$(CFLAGS)
./batchprepare.c
-o
$(ROOT)
batchprepare
$(LFLAGS)
gcc
$(CFLAGS)
./stream.c
-o
$(ROOT)
stream
$(LFLAGS)
gcc
$(CFLAGS)
./subscribe.c
-o
$(ROOT)
subscribe
$(LFLAGS)
gcc
$(CFLAGS)
./apitest.c
-o
$(ROOT)
apitest
$(LFLAGS)
...
...
@@ -22,6 +23,7 @@ clean:
rm
$(ROOT)
asyncdemo
rm
$(ROOT)
demo
rm
$(ROOT)
prepare
rm
$(ROOT)
batchprepare
rm
$(ROOT)
stream
rm
$(ROOT)
subscribe
rm
$(ROOT)
apitest
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录