Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
3dc445f9
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3dc445f9
编写于
5月 21, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-225]
上级
4b672b45
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
141 addition
and
197 deletion
+141
-197
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+1
-2
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+22
-11
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+1
-1
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+17
-16
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+36
-36
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+1
-5
src/client/src/tscServer.c
src/client/src/tscServer.c
+23
-23
src/client/src/tscSql.c
src/client/src/tscSql.c
+3
-3
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+4
-4
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+33
-96
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
3dc445f9
...
...
@@ -237,7 +237,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void
tscClearSubqueryInfo
(
SSqlCmd
*
pCmd
);
int32_t
tscAddQueryInfo
(
SSqlCmd
*
pCmd
);
SQueryInfo
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
);
SQueryInfo
*
tscGetQueryInfoS
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
SQueryInfo
*
tscGetQueryInfoS
(
SSqlCmd
*
pCmd
);
void
tscClearTableMetaInfo
(
STableMetaInfo
*
pTableMetaInfo
);
...
...
@@ -256,7 +256,6 @@ int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int
tscGetTableMetaEx
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
,
bool
createIfNotExists
);
void
tscResetForNextRetrieve
(
SSqlRes
*
pRes
);
void
tscDoQuery
(
SSqlObj
*
pSql
);
void
executeQuery
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
void
doExecuteQuery
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
...
...
src/client/inc/tsclient.h
浏览文件 @
3dc445f9
...
...
@@ -251,19 +251,32 @@ typedef struct {
SVgroupsInfo
*
pVgroupInfo
;
}
STableMetaVgroupInfo
;
typedef
struct
SInsertStatementParam
{
SName
**
pTableNameList
;
// all involved tableMeta list of current insert sql statement.
int32_t
numOfTables
;
SHashObj
*
pTableBlockHashList
;
// data block for each table
SArray
*
pDataBlocks
;
// SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t
schemaAttached
;
// denote if submit block is built with table schema or not
STagData
tagData
;
// NOTE: pTagData->data is used as a variant length array
int32_t
dataSourceType
;
// from file or from sql statement
char
msg
[
512
];
// error message
char
*
sql
;
// current sql statement position
uint32_t
insertType
;
// TODO remove it
}
SInsertStatementParam
;
// TODO extract sql parser supporter
typedef
struct
{
int
command
;
uint8_t
msgType
;
SInsertStatementParam
insertParam
;
char
reserve1
[
3
];
// fix bus error on arm32
bool
autoCreated
;
// create table if it is not existed during retrieve table meta in mnode
union
{
int32_t
count
;
int32_t
numOfTablesInSubmit
;
};
uint32_t
insertType
;
// TODO remove it
char
*
curSql
;
// current sql, resume position of sql after parsing paused
int8_t
parseFinished
;
char
reserve2
[
3
];
// fix bus error on arm32
...
...
@@ -276,24 +289,22 @@ typedef struct {
SHashObj
*
pTableMetaMap
;
// local buffer to keep the queried table meta, before validating the AST
SQueryInfo
*
pQueryInfo
;
int32_t
clauseIndex
;
// index of multiple subclause query
SQueryInfo
*
active
;
// current active query info
int32_t
batchSize
;
// for parameter ('?') binding and batch processing
int32_t
numOfParams
;
int8_t
dataSourceType
;
// load data from file or not
char
reserve4
[
3
];
// fix bus error on arm32
int8_t
submitSchema
;
// submit block is built with table schema
char
reserve5
[
3
];
// fix bus error on arm32
char
reserve4
[
3
];
// fix bus error on arm32
//
int8_t submitSchema; // submit block is built with table schema
char
reserve5
[
3
];
// fix bus error on arm32
STagData
tagData
;
// NOTE: pTagData->data is used as a variant length array
SName
**
pTableNameList
;
// all involved tableMeta list of current insert sql statement.
int32_t
numOfTables
;
//
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
//
int32_t numOfTables;
SHashObj
*
pTableBlockHashList
;
// data block for each table
SArray
*
pDataBlocks
;
// SArray<STableDataBlocks*>. Merged submit block for each vgroup
//
SHashObj *pTableBlockHashList; // data block for each table
//
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int32_t
resColumnId
;
}
SSqlCmd
;
...
...
src/client/src/tscAsync.c
浏览文件 @
3dc445f9
...
...
@@ -460,7 +460,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto
_error
;
}
if
(
pCmd
->
insertType
==
TSDB_QUERY_TYPE_STMT_INSERT
)
{
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_STMT_INSERT
))
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
);
code
=
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
...
...
src/client/src/tscParseInsert.c
浏览文件 @
3dc445f9
...
...
@@ -441,7 +441,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, SSqlCmd *pCmd, int1
*
str
+=
index
;
if
(
sToken
.
type
==
TK_QUESTION
)
{
if
(
pCmd
->
insertType
!=
TSDB_QUERY_TYPE_STMT_INSERT
)
{
if
(
pCmd
->
insert
Param
.
insert
Type
!=
TSDB_QUERY_TYPE_STMT_INSERT
)
{
return
tscSQLSyntaxErrMsg
(
pCmd
->
payload
,
"? only allowed in binding insertion"
,
*
str
);
}
...
...
@@ -1120,9 +1120,9 @@ int tsParseInsertSql(SSqlObj *pSql) {
return
code
;
}
if
(
NULL
==
pCmd
->
pTableBlockHashList
)
{
pCmd
->
pTableBlockHashList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
NULL
==
pCmd
->
pTableBlockHashList
)
{
if
(
NULL
==
pCmd
->
insertParam
.
pTableBlockHashList
)
{
pCmd
->
insertParam
.
pTableBlockHashList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
NULL
==
pCmd
->
insertParam
.
pTableBlockHashList
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_clean
;
}
...
...
@@ -1130,7 +1130,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
str
=
pCmd
->
curSql
;
}
tscDebug
(
"0x%"
PRIx64
" create data block list hashList:%p"
,
pSql
->
self
,
pCmd
->
pTableBlockHashList
);
tscDebug
(
"0x%"
PRIx64
" create data block list hashList:%p"
,
pSql
->
self
,
pCmd
->
insertParam
.
pTableBlockHashList
);
while
(
1
)
{
int32_t
index
=
0
;
...
...
@@ -1241,7 +1241,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_DEFAULT_PAYLOAD_SIZE
,
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
insertParam
.
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tinfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
dataBuf
,
NULL
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1261,7 +1261,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_DEFAULT_PAYLOAD_SIZE
,
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
insertParam
.
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tinfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
dataBuf
,
NULL
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1297,7 +1297,8 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto
_clean
;
}
if
((
pCmd
->
insertType
!=
TSDB_QUERY_TYPE_STMT_INSERT
)
&&
taosHashGetSize
(
pCmd
->
pTableBlockHashList
)
>
0
)
{
// merge according to vgId
// merge according to vgId
if
(
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_STMT_INSERT
)
&&
taosHashGetSize
(
pCmd
->
insertParam
.
pTableBlockHashList
)
>
0
)
{
if
((
code
=
tscMergeTableDataBlocks
(
pSql
,
true
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean
;
}
...
...
@@ -1326,9 +1327,9 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
pCmd
->
count
=
0
;
pCmd
->
command
=
TSDB_SQL_INSERT
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
);
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_INSERT
|
pCmd
->
insertType
);
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_INSERT
|
TSDB_QUERY_TYPE_STMT_INSERT
);
sToken
=
tStrGetToken
(
pSql
->
sqlstr
,
&
index
,
false
);
if
(
sToken
.
type
!=
TK_INTO
)
{
...
...
@@ -1410,7 +1411,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return
code
;
}
STableDataBlocks
*
pDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
0
);
STableDataBlocks
*
pDataBlock
=
taosArrayGetP
(
pCmd
->
insertParam
.
pDataBlocks
,
0
);
if
((
code
=
tscCopyDataBlockToPayload
(
pSql
,
pDataBlock
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1466,11 +1467,11 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
destroyTableNameList
(
pCmd
);
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
pCmd
->
insertParam
.
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
insertParam
.
pDataBlocks
);
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
pCmd
->
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
if
(
pCmd
->
insertParam
.
pTableBlockHashList
==
NULL
)
{
pCmd
->
insertParam
.
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
pCmd
->
insertParam
.
pTableBlockHashList
==
NULL
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
...
...
@@ -1478,7 +1479,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
STableDataBlocks
*
pTableDataBlock
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tscGetDataBlockFromList
(
pCmd
->
insertParam
.
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tinfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
pTableDataBlock
,
NULL
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pParentSql
->
res
.
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
src/client/src/tscPrepare.c
浏览文件 @
3dc445f9
...
...
@@ -308,7 +308,7 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
int32_t
fillTablesColumnsNull
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
STableDataBlocks
**
p
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
NULL
);
STableDataBlocks
**
p
=
taosHashIterate
(
pCmd
->
insertParam
.
pTableBlockHashList
,
NULL
);
STableDataBlocks
*
pOneTableBlock
=
*
p
;
while
(
pOneTableBlock
)
{
...
...
@@ -317,7 +317,7 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
fillColumnsNull
(
pOneTableBlock
,
pBlocks
->
numOfRows
);
}
p
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
p
);
p
=
taosHashIterate
(
pCmd
->
insertParam
.
pTableBlockHashList
,
p
);
if
(
p
==
NULL
)
{
break
;
}
...
...
@@ -840,12 +840,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
STableDataBlocks
*
pBlock
=
NULL
;
if
(
pStmt
->
multiTbInsert
)
{
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
if
(
pCmd
->
insertParam
.
pTableBlockHashList
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" Table block hash list is empty"
,
pStmt
->
pSql
->
self
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
pTableBlockHashList
,
(
const
char
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
));
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
insertParam
.
pTableBlockHashList
,
(
const
char
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
));
if
(
t1
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" no table data block in hash list, uid:%"
PRId64
,
pStmt
->
pSql
->
self
,
pStmt
->
mtb
.
currentUid
);
return
TSDB_CODE_TSC_APP_ERROR
;
...
...
@@ -856,12 +856,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
pCmd
->
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
pCmd
->
insertParam
.
pTableBlockHashList
==
NULL
)
{
pCmd
->
insertParam
.
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
}
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tscGetDataBlockFromList
(
pCmd
->
insertParam
.
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
pTableMeta
->
tableInfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
pBlock
,
NULL
);
if
(
ret
!=
0
)
{
return
ret
;
...
...
@@ -904,12 +904,12 @@ static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int c
STableDataBlocks
*
pBlock
=
NULL
;
if
(
pStmt
->
multiTbInsert
)
{
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
if
(
pCmd
->
insertParam
.
pTableBlockHashList
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" Table block hash list is empty"
,
pStmt
->
pSql
->
self
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
pTableBlockHashList
,
(
const
char
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
));
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
insertParam
.
pTableBlockHashList
,
(
const
char
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
));
if
(
t1
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" no table data block in hash list, uid:%"
PRId64
,
pStmt
->
pSql
->
self
,
pStmt
->
mtb
.
currentUid
);
return
TSDB_CODE_TSC_APP_ERROR
;
...
...
@@ -920,12 +920,12 @@ static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int c
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
pCmd
->
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
pCmd
->
insertParam
.
pTableBlockHashList
==
NULL
)
{
pCmd
->
insertParam
.
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
}
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tscGetDataBlockFromList
(
pCmd
->
insertParam
.
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
pTableMeta
->
tableInfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
pBlock
,
NULL
);
if
(
ret
!=
0
)
{
return
ret
;
...
...
@@ -991,11 +991,11 @@ static int insertStmtUpdateBatch(STscStmt* stmt) {
return
TSDB_CODE_TSC_APP_ERROR
;
}
if
(
taosHashGetSize
(
pCmd
->
pTableBlockHashList
)
==
0
)
{
if
(
taosHashGetSize
(
pCmd
->
insertParam
.
pTableBlockHashList
)
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
pTableBlockHashList
,
(
const
char
*
)
&
stmt
->
mtb
.
currentUid
,
sizeof
(
stmt
->
mtb
.
currentUid
));
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pCmd
->
insertParam
.
pTableBlockHashList
,
(
const
char
*
)
&
stmt
->
mtb
.
currentUid
,
sizeof
(
stmt
->
mtb
.
currentUid
));
if
(
t1
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" no table data block in hash list, uid:%"
PRId64
,
pSql
->
self
,
stmt
->
mtb
.
currentUid
);
return
TSDB_CODE_TSC_APP_ERROR
;
...
...
@@ -1031,9 +1031,9 @@ static int insertStmtReset(STscStmt* pStmt) {
if
(
pCmd
->
batchSize
>
2
)
{
int32_t
alloced
=
(
pCmd
->
batchSize
+
1
)
/
2
;
size_t
size
=
taosArrayGetSize
(
pCmd
->
pDataBlocks
);
size_t
size
=
taosArrayGetSize
(
pCmd
->
insertParam
.
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableDataBlocks
*
pBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
i
);
STableDataBlocks
*
pBlock
=
taosArrayGetP
(
pCmd
->
insertParam
.
pDataBlocks
,
i
);
uint32_t
totalDataSize
=
pBlock
->
size
-
sizeof
(
SSubmitBlk
);
pBlock
->
size
=
sizeof
(
SSubmitBlk
)
+
totalDataSize
/
alloced
;
...
...
@@ -1055,21 +1055,21 @@ static int insertStmtExecute(STscStmt* stmt) {
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
if
(
taosHashGetSize
(
pCmd
->
pTableBlockHashList
)
==
0
)
{
if
(
taosHashGetSize
(
pCmd
->
insertParam
.
pTableBlockHashList
)
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
pCmd
->
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
pCmd
->
insertParam
.
pTableBlockHashList
==
NULL
)
{
pCmd
->
insertParam
.
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
}
STableDataBlocks
*
pBlock
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tscGetDataBlockFromList
(
pCmd
->
insertParam
.
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
pTableMeta
->
tableInfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
pBlock
,
NULL
);
assert
(
ret
==
0
);
pBlock
->
size
=
sizeof
(
SSubmitBlk
)
+
pCmd
->
batchSize
*
pBlock
->
rowSize
;
...
...
@@ -1086,7 +1086,7 @@ static int insertStmtExecute(STscStmt* stmt) {
return
code
;
}
STableDataBlocks
*
pDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
0
);
STableDataBlocks
*
pDataBlock
=
taosArrayGetP
(
pCmd
->
insertParam
.
pDataBlocks
,
0
);
code
=
tscCopyDataBlockToPayload
(
stmt
->
pSql
,
pDataBlock
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -1112,7 +1112,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pCmd
->
numOfTables
=
0
;
tfree
(
pCmd
->
pTableNameList
);
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
pCmd
->
insertParam
.
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
insertParam
.
pDataBlocks
);
return
pSql
->
res
.
code
;
}
...
...
@@ -1120,7 +1120,7 @@ static int insertStmtExecute(STscStmt* stmt) {
static
void
insertBatchClean
(
STscStmt
*
pStmt
)
{
SSqlCmd
*
pCmd
=
&
pStmt
->
pSql
->
cmd
;
SSqlObj
*
pSql
=
pStmt
->
pSql
;
int32_t
size
=
taosHashGetSize
(
pCmd
->
pTableBlockHashList
);
int32_t
size
=
taosHashGetSize
(
pCmd
->
insertParam
.
pTableBlockHashList
);
// data block reset
pCmd
->
batchSize
=
0
;
...
...
@@ -1134,7 +1134,7 @@ static void insertBatchClean(STscStmt* pStmt) {
tfree
(
pCmd
->
pTableNameList
);
/*
STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL);
STableDataBlocks** p = taosHashIterate(pCmd->
insertParam.
pTableBlockHashList, NULL);
STableDataBlocks* pOneTableBlock = *p;
...
...
@@ -1145,7 +1145,7 @@ static void insertBatchClean(STscStmt* pStmt) {
pBlocks->numOfRows = 0;
p = taosHashIterate(pCmd->pTableBlockHashList, p);
p = taosHashIterate(pCmd->
insertParam.
pTableBlockHashList, p);
if (p == NULL) {
break;
}
...
...
@@ -1154,10 +1154,10 @@ static void insertBatchClean(STscStmt* pStmt) {
}
*/
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
pCmd
->
insertParam
.
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
insertParam
.
pDataBlocks
);
pCmd
->
numOfTables
=
0
;
taosHashEmpty
(
pCmd
->
pTableBlockHashList
);
taosHashEmpty
(
pCmd
->
insertParam
.
pTableBlockHashList
);
tscFreeSqlResult
(
pSql
);
tscFreeSubobj
(
pSql
);
tfree
(
pSql
->
pSubs
);
...
...
@@ -1376,7 +1376,7 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
SSubmitBlk
*
pBlk
=
(
SSubmitBlk
*
)
(
*
t1
)
->
pData
;
pCmd
->
batchSize
=
pBlk
->
numOfRows
;
taosHashPut
(
pCmd
->
pTableBlockHashList
,
(
void
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
),
(
void
*
)
t1
,
POINTER_BYTES
);
taosHashPut
(
pCmd
->
insertParam
.
pTableBlockHashList
,
(
void
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
),
(
void
*
)
t1
,
POINTER_BYTES
);
tscDebug
(
"0x%"
PRIx64
" table:%s is already prepared, uid:%"
PRIu64
,
pSql
->
self
,
name
,
pStmt
->
mtb
.
currentUid
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1391,11 +1391,11 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
pSql
->
cmd
.
numOfParams
=
0
;
pSql
->
cmd
.
batchSize
=
0
;
if
(
taosHashGetSize
(
pCmd
->
pTableBlockHashList
)
>
0
)
{
SHashObj
*
hashList
=
pCmd
->
pTableBlockHashList
;
pCmd
->
pTableBlockHashList
=
NULL
;
if
(
taosHashGetSize
(
pCmd
->
insertParam
.
pTableBlockHashList
)
>
0
)
{
SHashObj
*
hashList
=
pCmd
->
insertParam
.
pTableBlockHashList
;
pCmd
->
insertParam
.
pTableBlockHashList
=
NULL
;
tscResetSqlCmd
(
pCmd
,
true
);
pCmd
->
pTableBlockHashList
=
hashList
;
pCmd
->
insertParam
.
pTableBlockHashList
=
hashList
;
}
int32_t
code
=
tsParseSql
(
pStmt
->
pSql
,
true
);
...
...
@@ -1411,7 +1411,7 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
STableDataBlocks
*
pBlock
=
NULL
;
code
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
code
=
tscGetDataBlockFromList
(
pCmd
->
insertParam
.
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
pTableMeta
->
tableInfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
pBlock
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -1688,14 +1688,14 @@ int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
SSqlCmd
*
pCmd
=
&
pStmt
->
pSql
->
cmd
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
pCmd
->
pTableBlockHashList
==
NULL
)
{
pCmd
->
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
pCmd
->
insertParam
.
pTableBlockHashList
==
NULL
)
{
pCmd
->
insertParam
.
pTableBlockHashList
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
}
STableDataBlocks
*
pBlock
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pCmd
->
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
tscGetDataBlockFromList
(
pCmd
->
insertParam
.
pTableBlockHashList
,
pTableMeta
->
id
.
uid
,
TSDB_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
pTableMeta
->
tableInfo
.
rowSize
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
&
pBlock
,
NULL
);
if
(
ret
!=
0
)
{
// todo handle error
...
...
src/client/src/tscSQLParser.c
浏览文件 @
3dc445f9
...
...
@@ -293,7 +293,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return
tscSQLSyntaxErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
NULL
,
pInfo
->
msg
);
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
);
if
(
pQueryInfo
==
NULL
)
{
pRes
->
code
=
terrno
;
return
pRes
->
code
;
...
...
@@ -656,7 +656,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
tscPrintSelNodeList
(
pSql
,
i
);
pCmd
->
clauseIndex
+=
1
;
if
((
i
+
1
)
<
size
&&
pQueryInfo
->
sibling
==
NULL
)
{
if
((
code
=
tscAddQueryInfo
(
pCmd
))
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -671,9 +670,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return
code
;
}
// restore the clause index
pCmd
->
clauseIndex
=
0
;
// set the command/global limit parameters from the first subclause to the sqlcmd object
pCmd
->
active
=
pCmd
->
pQueryInfo
;
pCmd
->
command
=
pCmd
->
pQueryInfo
->
command
;
...
...
src/client/src/tscServer.c
浏览文件 @
3dc445f9
...
...
@@ -584,22 +584,22 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
);
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
;
char
*
pMsg
=
pSql
->
cmd
.
payload
;
// NOTE: shell message size should not include SMsgDesc
int32_t
size
=
pSql
->
cmd
.
payloadLen
-
sizeof
(
SMsgDesc
);
SMsgDesc
*
pMsgDesc
=
(
SMsgDesc
*
)
pMsg
;
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
// always one vnode
pMsg
+=
sizeof
(
SMsgDesc
);
SSubmitMsg
*
pShellMsg
=
(
SSubmitMsg
*
)
pMsg
;
pShellMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pShellMsg
->
header
.
contLen
=
htonl
(
size
);
// the length not includes the size of SMsgDesc
pShellMsg
->
length
=
pShellMsg
->
header
.
contLen
;
pShellMsg
->
numOfBlocks
=
htonl
(
pSql
->
cmd
.
numOfTablesInSubmit
);
//
number of tables to be inserted
//
char* pMsg = pSql->cmd.payload;
//
//
// NOTE: shell message size should not include SMsgDesc
//
int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
//
//
SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
// pMsgDesc->numOfVnodes = htonl(1);
// always one vnode
//
//
pMsg += sizeof(SMsgDesc);
//
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
//
// pShellMsg->header.vgId = htonl(pTableMeta->vgId); // data in current block all routes to the same vgroup
// pShellMsg->header.contLen = htonl(size);
// the length not includes the size of SMsgDesc
// pShellMsg->length
= pShellMsg->header.contLen;
//
// pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // the
number of tables to be inserted
// pSql->cmd.payloadLen is set during copying data into payload
pSql
->
cmd
.
msgType
=
TSDB_MSG_TYPE_SUBMIT
;
...
...
@@ -608,15 +608,15 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
taosHashGetClone
(
tscVgroupMap
,
&
pTableMeta
->
vgId
,
sizeof
(
pTableMeta
->
vgId
),
NULL
,
&
vgroupInfo
,
sizeof
(
SNewVgroupInfo
));
tscDumpEpSetFromVgroupInfo
(
&
pSql
->
epSet
,
&
vgroupInfo
);
tscDebug
(
"0x%"
PRIx64
"
build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
->
self
,
pTableMeta
->
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
pSql
->
epSet
.
numOfEps
);
tscDebug
(
"0x%"
PRIx64
"
submit msg built, numberOfEP:%d"
,
pSql
->
self
,
pSql
->
epSet
.
numOfEps
);
return
TSDB_CODE_SUCCESS
;
}
/*
* for table query, simply return the size <= 1k
*/
static
int32_t
tscEstimateQueryMsgSize
(
SSqlObj
*
pSql
,
int32_t
clauseIndex
)
{
static
int32_t
tscEstimateQueryMsgSize
(
SSqlObj
*
pSql
)
{
const
static
int32_t
MIN_QUERY_MSG_PKT_SIZE
=
TSDB_MAX_BYTES_PER_ROW
*
5
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -815,7 +815,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
size
=
tscEstimateQueryMsgSize
(
pSql
,
pCmd
->
clauseIndex
);
int32_t
size
=
tscEstimateQueryMsgSize
(
pSql
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
tscError
(
"%p failed to malloc for query msg"
,
pSql
);
...
...
@@ -2155,7 +2155,7 @@ static void createHbObj(STscObj* pObj) {
pSql
->
fp
=
tscProcessHeartBeatRsp
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
&
pSql
->
cmd
);
if
(
pQueryInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tfree
(
pSql
);
...
...
@@ -2369,7 +2369,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
tscAddQueryInfo
(
&
pNew
->
cmd
);
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
);
pNew
->
cmd
.
autoCreated
=
pSql
->
cmd
.
autoCreated
;
// create table if not exists
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pNew
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
+
pSql
->
cmd
.
payloadLen
))
{
...
...
@@ -2592,7 +2592,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
pNew
->
cmd
.
command
=
TSDB_SQL_STABLEVGROUP
;
// TODO TEST IT
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
);
if
(
pNewQueryInfo
==
NULL
)
{
tscFreeSqlObj
(
pNew
);
return
code
;
...
...
src/client/src/tscSql.c
浏览文件 @
3dc445f9
...
...
@@ -901,9 +901,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower
(
pSql
->
sqlstr
,
sql
);
pCmd
->
curSql
=
NULL
;
if
(
NULL
!=
pCmd
->
pTableBlockHashList
)
{
taosHashCleanup
(
pCmd
->
pTableBlockHashList
);
pCmd
->
pTableBlockHashList
=
NULL
;
if
(
NULL
!=
pCmd
->
insertParam
.
pTableBlockHashList
)
{
taosHashCleanup
(
pCmd
->
insertParam
.
pTableBlockHashList
);
pCmd
->
insertParam
.
pTableBlockHashList
=
NULL
;
}
pSql
->
fp
=
asyncCallback
;
...
...
src/client/src/tscSubquery.c
浏览文件 @
3dc445f9
...
...
@@ -3056,7 +3056,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// clean up tableMeta in cache
tscFreeQueryInfo
(
&
pSql
->
cmd
,
false
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
&
pSql
->
cmd
);
STableMetaInfo
*
pMasterTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pParentObj
->
cmd
,
0
);
tscAddTableMetaInfo
(
pQueryInfo
,
&
pMasterTableMetaInfo
->
name
,
NULL
,
NULL
,
NULL
,
NULL
);
...
...
@@ -3145,7 +3145,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return
TSDB_CODE_SUCCESS
;
}
pSql
->
subState
.
numOfSub
=
(
uint16_t
)
taosArrayGetSize
(
pCmd
->
pDataBlocks
);
pSql
->
subState
.
numOfSub
=
(
uint16_t
)
taosArrayGetSize
(
pCmd
->
insertParam
.
pDataBlocks
);
assert
(
pSql
->
subState
.
numOfSub
>
0
);
pRes
->
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -3195,7 +3195,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pNew
->
fetchFp
=
pNew
->
fp
;
pSql
->
pSubs
[
numOfSub
]
=
pNew
;
STableDataBlocks
*
pTableDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
numOfSub
);
STableDataBlocks
*
pTableDataBlock
=
taosArrayGetP
(
pCmd
->
insertParam
.
pDataBlocks
,
numOfSub
);
pRes
->
code
=
tscCopyDataBlockToPayload
(
pNew
,
pTableDataBlock
);
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"0x%"
PRIx64
" sub:%p create subObj success. orderOfSub:%d"
,
pSql
->
self
,
pNew
,
numOfSub
);
...
...
@@ -3213,7 +3213,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
goto
_error
;
}
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
pCmd
->
insertParam
.
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
insertParam
.
pDataBlocks
);
// use the local variable
for
(
int32_t
j
=
0
;
j
<
numOfSub
;
++
j
)
{
...
...
src/client/src/tscUtil.c
浏览文件 @
3dc445f9
...
...
@@ -1130,8 +1130,8 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
destroyTableNameList
(
pCmd
);
pCmd
->
pTableBlockHashList
=
tscDestroyBlockHashTable
(
pCmd
->
pTableBlockHashList
,
removeMeta
);
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
pCmd
->
insertParam
.
pTableBlockHashList
=
tscDestroyBlockHashTable
(
pCmd
->
insertParam
.
pTableBlockHashList
,
removeMeta
);
pCmd
->
insertParam
.
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
insertParam
.
pDataBlocks
);
tscFreeQueryInfo
(
pCmd
,
removeMeta
);
if
(
pCmd
->
pTableMetaMap
!=
NULL
)
{
...
...
@@ -1343,12 +1343,9 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
assert
(
pDataBlock
->
pTableMeta
!=
NULL
);
pCmd
->
numOfTablesInSubmit
=
pDataBlock
->
numOfTables
;
// assert(pCmd->numOfClause == 1);
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
);
// todo re
facto
r
// todo re
move it late
r
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
if
(
pTableMetaInfo
->
pTableMeta
!=
pDataBlock
->
pTableMeta
)
{
tNameAssign
(
&
pTableMetaInfo
->
name
,
&
pDataBlock
->
tableName
);
...
...
@@ -1358,13 +1355,13 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
}
pTableMetaInfo
->
pTableMeta
=
tscTableMetaDup
(
pDataBlock
->
pTableMeta
);
pTableMetaInfo
->
tableMetaSize
=
tscGetTableMetaSize
(
pDataBlock
->
pTableMeta
);
pTableMetaInfo
->
tableMetaSize
=
tscGetTableMetaSize
(
pDataBlock
->
pTableMeta
);
}
/*
* the
submit message consists of :
[RPC header|message body|digest]
* the dataBlock only includes the RPC Header buffer and actual submit message body,
space for digest needs
* additional space.
* the
format of submit message is as follows
[RPC header|message body|digest]
* the dataBlock only includes the RPC Header buffer and actual submit message body,
*
space for digest needs
additional space.
*/
int
ret
=
tscAllocPayload
(
pCmd
,
pDataBlock
->
size
+
100
);
if
(
TSDB_CODE_SUCCESS
!=
ret
)
{
...
...
@@ -1374,13 +1371,24 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
assert
(
pDataBlock
->
size
<=
pDataBlock
->
nAllocSize
);
memcpy
(
pCmd
->
payload
,
pDataBlock
->
pData
,
pDataBlock
->
size
);
/*
* the payloadLen should be actual message body size
* the old value of payloadLen is the allocated payload size
*/
//the payloadLen should be actual message body size, the old value of payloadLen is the allocated payload size
pCmd
->
payloadLen
=
pDataBlock
->
size
;
// NOTE: shell message size should not include SMsgDesc
int32_t
size
=
pCmd
->
payloadLen
-
sizeof
(
SMsgDesc
);
SMsgDesc
*
pMsgDesc
=
(
SMsgDesc
*
)
pCmd
->
payload
;
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
// always for one vnode
SSubmitMsg
*
pShellMsg
=
(
SSubmitMsg
*
)(
pCmd
->
payload
+
sizeof
(
SMsgDesc
));
pShellMsg
->
header
.
vgId
=
htonl
(
pDataBlock
->
pTableMeta
->
vgId
);
// data in current block all routes to the same vgroup
pShellMsg
->
header
.
contLen
=
htonl
(
size
);
// the length not includes the size of SMsgDesc
pShellMsg
->
length
=
pShellMsg
->
header
.
contLen
;
pShellMsg
->
numOfBlocks
=
htonl
(
pDataBlock
->
numOfTables
);
// the number of tables to be inserted
assert
(
pCmd
->
allocSize
>=
(
uint32_t
)(
pCmd
->
payloadLen
+
100
)
&&
pCmd
->
payloadLen
>
0
);
tscDebug
(
"0x%"
PRIx64
" submit msg built, vgId:%d numOfTables:%d"
,
pSql
->
self
,
pDataBlock
->
pTableMeta
->
vgId
,
pDataBlock
->
numOfTables
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1542,25 +1550,25 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
}
static
void
extractTableNameList
(
SSqlCmd
*
pCmd
,
bool
freeBlockMap
)
{
pCmd
->
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pCmd
->
pTableBlockHashList
);
pCmd
->
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pCmd
->
insertParam
.
pTableBlockHashList
);
if
(
pCmd
->
pTableNameList
==
NULL
)
{
pCmd
->
pTableNameList
=
calloc
(
pCmd
->
numOfTables
,
POINTER_BYTES
);
}
else
{
memset
(
pCmd
->
pTableNameList
,
0
,
pCmd
->
numOfTables
*
POINTER_BYTES
);
}
STableDataBlocks
**
p1
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
NULL
);
STableDataBlocks
**
p1
=
taosHashIterate
(
pCmd
->
insertParam
.
pTableBlockHashList
,
NULL
);
int32_t
i
=
0
;
while
(
p1
)
{
STableDataBlocks
*
pBlocks
=
*
p1
;
tfree
(
pCmd
->
pTableNameList
[
i
]);
pCmd
->
pTableNameList
[
i
++
]
=
tNameDup
(
&
pBlocks
->
tableName
);
p1
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
p1
);
p1
=
taosHashIterate
(
pCmd
->
insertParam
.
pTableBlockHashList
,
p1
);
}
if
(
freeBlockMap
)
{
pCmd
->
pTableBlockHashList
=
tscDestroyBlockHashTable
(
pCmd
->
pTableBlockHashList
,
false
);
pCmd
->
insertParam
.
pTableBlockHashList
=
tscDestroyBlockHashTable
(
pCmd
->
insertParam
.
pTableBlockHashList
,
false
);
}
}
...
...
@@ -1571,7 +1579,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
void
*
pVnodeDataBlockHashList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
SArray
*
pVnodeDataBlockList
=
taosArrayInit
(
8
,
POINTER_BYTES
);
STableDataBlocks
**
p
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
NULL
);
STableDataBlocks
**
p
=
taosHashIterate
(
pCmd
->
insertParam
.
pTableBlockHashList
,
NULL
);
STableDataBlocks
*
pOneTableBlock
=
*
p
;
while
(
pOneTableBlock
)
{
...
...
@@ -1642,7 +1650,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
tscDebug
(
"0x%"
PRIx64
" table %s data block is empty"
,
pSql
->
self
,
pOneTableBlock
->
tableName
.
tname
);
}
p
=
taosHashIterate
(
pCmd
->
pTableBlockHashList
,
p
);
p
=
taosHashIterate
(
pCmd
->
insertParam
.
pTableBlockHashList
,
p
);
if
(
p
==
NULL
)
{
break
;
}
...
...
@@ -1653,7 +1661,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
extractTableNameList
(
pCmd
,
freeBlockMap
);
// free the table data blocks;
pCmd
->
pDataBlocks
=
pVnodeDataBlockList
;
pCmd
->
insertParam
.
pDataBlocks
=
pVnodeDataBlockList
;
taosHashCleanup
(
pVnodeDataBlockHashList
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1848,7 +1856,6 @@ void* sqlExprDestroy(SExprInfo* pExpr) {
tExprTreeDestroy
(
pExpr
->
pExpr
,
NULL
);
}
printf
(
"free---------------%p
\n
"
,
pExpr
);
tfree
(
pExpr
);
return
NULL
;
}
...
...
@@ -1918,7 +1925,6 @@ SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde
return
NULL
;
}
printf
(
"malloc======================%p
\n
"
,
pExpr
);
SSqlExpr
*
p
=
&
pExpr
->
base
;
p
->
functionId
=
functionId
;
...
...
@@ -2625,7 +2631,7 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return
pQueryInfo
->
pTableMetaInfo
[
tableIndex
];
}
SQueryInfo
*
tscGetQueryInfoS
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
)
{
SQueryInfo
*
tscGetQueryInfoS
(
SSqlCmd
*
pCmd
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
...
...
@@ -3025,9 +3031,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
pNew
->
sqlstr
=
NULL
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
0
);
assert
(
pSql
->
cmd
.
clauseIndex
==
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
);
STableMetaInfo
*
pMasterTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
);
tscAddTableMetaInfo
(
pQueryInfo
,
&
pMasterTableMetaInfo
->
name
,
NULL
,
NULL
,
NULL
,
NULL
);
...
...
@@ -3102,7 +3106,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pnCmd
->
pTableMetaMap
=
NULL
;
pnCmd
->
pQueryInfo
=
NULL
;
pnCmd
->
clauseIndex
=
0
;
pnCmd
->
pDataBlocks
=
NULL
;
pnCmd
->
numOfTables
=
0
;
...
...
@@ -3401,71 +3404,6 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
doExecuteQuery
(
pSql
,
pQueryInfo
);
}
/**
* todo remove it
* To decide if current is a two-stage super table query, join query, or insert. And invoke different
* procedure accordingly
* @param pSql
*/
void
tscDoQuery
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
code
=
TSDB_CODE_SUCCESS
;
if
(
pCmd
->
command
>
TSDB_SQL_LOCAL
)
{
tscProcessLocalCmd
(
pSql
);
return
;
}
if
(
pCmd
->
dataSourceType
==
DATA_FROM_DATA_FILE
)
{
tscImportDataFromFile
(
pSql
);
}
else
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
uint16_t
type
=
pQueryInfo
->
type
;
if
((
pCmd
->
command
==
TSDB_SQL_SELECT
)
&&
(
!
TSDB_QUERY_HAS_TYPE
(
type
,
TSDB_QUERY_TYPE_SUBQUERY
))
&&
(
!
TSDB_QUERY_HAS_TYPE
(
type
,
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)))
{
tscAddIntoSqlList
(
pSql
);
}
if
(
TSDB_QUERY_HAS_TYPE
(
type
,
TSDB_QUERY_TYPE_INSERT
))
{
// multi-vnodes insertion
tscHandleMultivnodeInsert
(
pSql
);
return
;
}
if
(
QUERY_IS_JOIN_QUERY
(
type
))
{
if
(
!
TSDB_QUERY_HAS_TYPE
(
type
,
TSDB_QUERY_TYPE_SUBQUERY
))
{
tscHandleMasterJoinQuery
(
pSql
);
}
else
{
// for first stage sub query, iterate all vnodes to get all timestamp
if
(
!
TSDB_QUERY_HAS_TYPE
(
type
,
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
))
{
tscBuildAndSendRequest
(
pSql
,
NULL
);
}
else
{
// secondary stage join query.
if
(
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
// super table query
tscLockByThread
(
&
pSql
->
squeryLock
);
tscHandleMasterSTableQuery
(
pSql
);
tscUnlockByThread
(
&
pSql
->
squeryLock
);
}
else
{
tscBuildAndSendRequest
(
pSql
,
NULL
);
}
}
}
return
;
}
else
if
(
tscMultiRoundQuery
(
pQueryInfo
,
0
)
&&
pQueryInfo
->
round
==
0
)
{
tscHandleFirstRoundStableQuery
(
pSql
);
// todo lock?
return
;
}
else
if
(
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
// super table query
tscLockByThread
(
&
pSql
->
squeryLock
);
tscHandleMasterSTableQuery
(
pSql
);
tscUnlockByThread
(
&
pSql
->
squeryLock
);
return
;
}
pCmd
->
active
=
pQueryInfo
;
tscBuildAndSendRequest
(
pSql
,
NULL
);
}
}
int16_t
tscGetJoinTagColIdByUid
(
STagCond
*
pTagCond
,
uint64_t
uid
)
{
int32_t
i
=
0
;
while
(
i
<
TSDB_MAX_JOIN_TABLE_NUM
)
{
...
...
@@ -3687,7 +3625,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
pCmd
->
clauseIndex
++
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
pSql
->
cmd
.
command
=
pQueryInfo
->
command
;
...
...
@@ -3708,7 +3645,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
pSql
->
subState
.
numOfSub
=
0
;
pSql
->
fp
=
fp
;
tscDebug
(
"0x%"
PRIx64
" try data in the next subclause
:%d"
,
pSql
->
self
,
pCmd
->
clauseIndex
);
tscDebug
(
"0x%"
PRIx64
" try data in the next subclause
"
,
pSql
->
self
);
if
(
pCmd
->
command
>
TSDB_SQL_LOCAL
)
{
tscProcessLocalCmd
(
pSql
);
}
else
{
...
...
@@ -4283,7 +4220,7 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt
int
code
=
TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH
;
char
*
str
=
(
char
*
)
pNameList
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
);
if
(
pQueryInfo
==
NULL
)
{
pSql
->
res
.
code
=
terrno
;
return
terrno
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录