Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b9374718
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b9374718
编写于
10月 11, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
10月 11, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17290 from taosdata/fix/3.0_bugfix_wxy
feat: support batch loading of csv files
上级
de67ce68
0563eb24
变更
14
展开全部
隐藏空白更改
内联
并排
Showing
14 changed file
with
695 addition
and
321 deletion
+695
-321
include/common/tglobal.h
include/common/tglobal.h
+1
-0
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+4
-0
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+0
-1
include/libs/parser/parser.h
include/libs/parser/parser.h
+11
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+17
-8
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+86
-65
source/client/src/clientMain.c
source/client/src/clientMain.c
+93
-75
source/client/src/clientSml.c
source/client/src/clientSml.c
+129
-116
source/common/src/tglobal.c
source/common/src/tglobal.c
+26
-17
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+228
-0
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+80
-28
source/libs/parser/src/parser.c
source/libs/parser/src/parser.c
+10
-0
source/libs/parser/test/mockCatalog.cpp
source/libs/parser/test/mockCatalog.cpp
+10
-10
source/libs/parser/test/parTestUtil.cpp
source/libs/parser/test/parTestUtil.cpp
+0
-1
未找到文件。
include/common/tglobal.h
浏览文件 @
b9374718
...
@@ -103,6 +103,7 @@ extern bool tsKeepColumnName;
...
@@ -103,6 +103,7 @@ extern bool tsKeepColumnName;
// client
// client
extern
int32_t
tsMinSlidingTime
;
extern
int32_t
tsMinSlidingTime
;
extern
int32_t
tsMinIntervalTime
;
extern
int32_t
tsMinIntervalTime
;
extern
int32_t
tsMaxMemUsedByInsert
;
// build info
// build info
extern
char
version
[];
extern
char
version
[];
...
...
include/libs/catalog/catalog.h
浏览文件 @
b9374718
...
@@ -303,6 +303,10 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re
...
@@ -303,6 +303,10 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re
int32_t
catalogClearCache
(
void
);
int32_t
catalogClearCache
(
void
);
SMetaData
*
catalogCloneMetaData
(
SMetaData
*
pData
);
void
catalogFreeMetaData
(
SMetaData
*
pData
);
/**
/**
* Destroy catalog and relase all resources
* Destroy catalog and relase all resources
*/
*/
...
...
include/libs/nodes/querynodes.h
浏览文件 @
b9374718
...
@@ -385,7 +385,6 @@ typedef struct SCmdMsgInfo {
...
@@ -385,7 +385,6 @@ typedef struct SCmdMsgInfo {
SEpSet
epSet
;
SEpSet
epSet
;
void
*
pMsg
;
void
*
pMsg
;
int32_t
msgLen
;
int32_t
msgLen
;
void
*
pExtension
;
// todo remove it soon
}
SCmdMsgInfo
;
}
SCmdMsgInfo
;
typedef
enum
EQueryExecMode
{
typedef
enum
EQueryExecMode
{
...
...
include/libs/parser/parser.h
浏览文件 @
b9374718
...
@@ -33,6 +33,13 @@ typedef struct SStmtCallback {
...
@@ -33,6 +33,13 @@ typedef struct SStmtCallback {
int32_t
(
*
getExecInfoFn
)(
TAOS_STMT
*
,
SHashObj
**
,
SHashObj
**
);
int32_t
(
*
getExecInfoFn
)(
TAOS_STMT
*
,
SHashObj
**
,
SHashObj
**
);
}
SStmtCallback
;
}
SStmtCallback
;
typedef
struct
SParseCsvCxt
{
TdFilePtr
fp
;
// last parsed file
int32_t
tableNo
;
// last parsed table
SName
tableName
;
// last parsed table
const
char
*
pLastSqlPos
;
// the location of the last parsed sql
}
SParseCsvCxt
;
typedef
struct
SParseContext
{
typedef
struct
SParseContext
{
uint64_t
requestId
;
uint64_t
requestId
;
int64_t
requestRid
;
int64_t
requestRid
;
...
@@ -57,6 +64,8 @@ typedef struct SParseContext {
...
@@ -57,6 +64,8 @@ typedef struct SParseContext {
SArray
*
pTableMetaPos
;
// sql table pos => catalog data pos
SArray
*
pTableMetaPos
;
// sql table pos => catalog data pos
SArray
*
pTableVgroupPos
;
// sql table pos => catalog data pos
SArray
*
pTableVgroupPos
;
// sql table pos => catalog data pos
int64_t
allocatorId
;
int64_t
allocatorId
;
bool
needMultiParse
;
SParseCsvCxt
csvCxt
;
}
SParseContext
;
}
SParseContext
;
int32_t
qParseSql
(
SParseContext
*
pCxt
,
SQuery
**
pQuery
);
int32_t
qParseSql
(
SParseContext
*
pCxt
,
SQuery
**
pQuery
);
...
@@ -67,6 +76,8 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
...
@@ -67,6 +76,8 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
int32_t
qAnalyseSqlSemantic
(
SParseContext
*
pCxt
,
const
struct
SCatalogReq
*
pCatalogReq
,
int32_t
qAnalyseSqlSemantic
(
SParseContext
*
pCxt
,
const
struct
SCatalogReq
*
pCatalogReq
,
const
struct
SMetaData
*
pMetaData
,
SQuery
*
pQuery
);
const
struct
SMetaData
*
pMetaData
,
SQuery
*
pQuery
);
void
qDestroyParseContext
(
SParseContext
*
pCxt
);
void
qDestroyQuery
(
SQuery
*
pQueryNode
);
void
qDestroyQuery
(
SQuery
*
pQueryNode
);
int32_t
qExtractResultSchema
(
const
SNode
*
pRoot
,
int32_t
*
numOfCols
,
SSchema
**
pSchema
);
int32_t
qExtractResultSchema
(
const
SNode
*
pRoot
,
int32_t
*
numOfCols
,
SSchema
**
pSchema
);
...
...
source/client/inc/clientInt.h
浏览文件 @
b9374718
...
@@ -380,16 +380,25 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
...
@@ -380,16 +380,25 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
// --- mq
// --- mq
void
hbMgrInitMqHbRspHandle
();
void
hbMgrInitMqHbRspHandle
();
typedef
struct
SSqlCallbackWrapper
{
SParseContext
*
pParseCtx
;
SCatalogReq
*
pCatalogReq
;
SMetaData
*
pResultMeta
;
SRequestObj
*
pRequest
;
}
SSqlCallbackWrapper
;
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
bool
keepQuery
,
void
**
res
);
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
bool
keepQuery
,
void
**
res
);
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryPlan
*
pDag
,
SArray
*
pNodeList
);
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryPlan
*
pDag
,
SArray
*
pNodeList
);
void
launchAsyncQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SMetaData
*
pResultMeta
);
void
launchAsyncQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SMetaData
*
pResultMeta
,
SSqlCallbackWrapper
*
pWrapper
);
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
);
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
);
int32_t
updateQnodeList
(
SAppInstInfo
*
pInfo
,
SArray
*
pNodeList
);
int32_t
updateQnodeList
(
SAppInstInfo
*
pInfo
,
SArray
*
pNodeList
);
void
doAsyncQuery
(
SRequestObj
*
pRequest
,
bool
forceUpdateMeta
);
void
doAsyncQuery
(
SRequestObj
*
pRequest
,
bool
forceUpdateMeta
);
int32_t
removeMeta
(
STscObj
*
pTscObj
,
SArray
*
tbList
);
int32_t
removeMeta
(
STscObj
*
pTscObj
,
SArray
*
tbList
);
int32_t
handleAlterTbExecRes
(
void
*
res
,
struct
SCatalog
*
pCatalog
);
int32_t
handleAlterTbExecRes
(
void
*
res
,
struct
SCatalog
*
pCatalog
);
int32_t
handleCreateTbExecRes
(
void
*
res
,
SCatalog
*
pCatalog
);
int32_t
handleCreateTbExecRes
(
void
*
res
,
SCatalog
*
pCatalog
);
bool
qnodeRequired
(
SRequestObj
*
pRequest
);
bool
qnodeRequired
(
SRequestObj
*
pRequest
);
int32_t
continueInsertFromCsv
(
SSqlCallbackWrapper
*
pWrapper
,
SRequestObj
*
pRequest
);
void
destorySqlCallbackWrapper
(
SSqlCallbackWrapper
*
pWrapper
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/client/src/clientImpl.c
浏览文件 @
b9374718
...
@@ -868,10 +868,11 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
...
@@ -868,10 +868,11 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return
code
;
return
code
;
}
}
//todo refacto the error code mgmt
//
todo refacto the error code mgmt
void
schedulerExecCb
(
SExecResult
*
pResult
,
void
*
param
,
int32_t
code
)
{
void
schedulerExecCb
(
SExecResult
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
SSqlCallbackWrapper
*
pWrapper
=
param
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
pRequest
->
code
=
code
;
pRequest
->
code
=
code
;
if
(
pResult
)
{
if
(
pResult
)
{
...
@@ -882,7 +883,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
...
@@ -882,7 +883,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
int32_t
type
=
pRequest
->
type
;
int32_t
type
=
pRequest
->
type
;
if
(
TDMT_VND_SUBMIT
==
type
||
TDMT_VND_DELETE
==
type
||
TDMT_VND_CREATE_TABLE
==
type
)
{
if
(
TDMT_VND_SUBMIT
==
type
||
TDMT_VND_DELETE
==
type
||
TDMT_VND_CREATE_TABLE
==
type
)
{
if
(
pResult
)
{
if
(
pResult
)
{
pRequest
->
body
.
resInfo
.
numOfRows
=
pResult
->
numOfRows
;
pRequest
->
body
.
resInfo
.
numOfRows
+
=
pResult
->
numOfRows
;
// record the insert rows
// record the insert rows
if
(
TDMT_VND_SUBMIT
==
type
)
{
if
(
TDMT_VND_SUBMIT
==
type
)
{
...
@@ -899,12 +900,13 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
...
@@ -899,12 +900,13 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
pRequest
->
requestId
);
pRequest
->
requestId
);
if
(
code
!=
TSDB_CODE_SUCCESS
&&
NEED_CLIENT_HANDLE_ERROR
(
code
)
&&
pRequest
->
sqlstr
!=
NULL
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
&&
NEED_CLIENT_HANDLE_ERROR
(
code
)
&&
pRequest
->
sqlstr
!=
NULL
)
{
tscDebug
(
"0x%"
PRIx64
" client retry to handle the error, code:%s, tryCount:%d, reqId:0x%"
PRIx64
,
tscDebug
(
"0x%"
PRIx64
" client retry to handle the error, code:%s, tryCount:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
retry
,
pRequest
->
requestId
);
tstrerror
(
code
),
pRequest
->
retry
,
pRequest
->
requestId
);
pRequest
->
prevCode
=
code
;
pRequest
->
prevCode
=
code
;
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
qDestroyQuery
(
pRequest
->
pQuery
);
qDestroyQuery
(
pRequest
->
pQuery
);
pRequest
->
pQuery
=
NULL
;
pRequest
->
pQuery
=
NULL
;
destorySqlCallbackWrapper
(
pWrapper
);
doAsyncQuery
(
pRequest
,
true
);
doAsyncQuery
(
pRequest
,
true
);
return
;
return
;
}
}
...
@@ -920,6 +922,15 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
...
@@ -920,6 +922,15 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
pRequest
->
code
=
code1
;
pRequest
->
code
=
code1
;
}
}
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
&&
NULL
!=
pWrapper
->
pParseCtx
&&
pWrapper
->
pParseCtx
->
needMultiParse
)
{
code
=
continueInsertFromCsv
(
pWrapper
,
pRequest
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
return
;
}
}
destorySqlCallbackWrapper
(
pWrapper
);
// return to client
// return to client
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
}
...
@@ -1020,76 +1031,86 @@ SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool vali
...
@@ -1020,76 +1031,86 @@ SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool vali
return
launchQueryImpl
(
pRequest
,
pQuery
,
false
,
NULL
);
return
launchQueryImpl
(
pRequest
,
pQuery
,
false
,
NULL
);
}
}
void
launchAsyncQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SMetaData
*
pResultMeta
)
{
static
int32_t
asyncExecSchQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SMetaData
*
pResultMeta
,
SSqlCallbackWrapper
*
pWrapper
)
{
pRequest
->
type
=
pQuery
->
msgType
;
SArray
*
pMnodeList
=
taosArrayInit
(
4
,
sizeof
(
SQueryNodeLoad
));
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pUser
=
pRequest
->
pTscObj
->
user
,
.
sysInfo
=
pRequest
->
pTscObj
->
sysInfo
,
.
allocatorId
=
pRequest
->
allocatorRefId
};
SQueryPlan
*
pDag
=
NULL
;
int32_t
code
=
qCreateQueryPlan
(
&
cxt
,
&
pDag
,
pMnodeList
);
if
(
code
)
{
tscError
(
"0x%"
PRIx64
" failed to create query plan, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
}
else
{
pRequest
->
body
.
subplanNum
=
pDag
->
numOfSubplans
;
}
pRequest
->
metric
.
planEnd
=
taosGetTimestampUs
();
if
(
TSDB_CODE_SUCCESS
==
code
&&
!
pRequest
->
validateOnly
)
{
SArray
*
pNodeList
=
NULL
;
buildAsyncExecNodeList
(
pRequest
,
&
pNodeList
,
pMnodeList
,
pResultMeta
);
SRequestConnInfo
conn
=
{.
pTrans
=
getAppInfo
(
pRequest
)
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
};
SSchedulerReq
req
=
{
.
syncReq
=
false
,
.
localReq
=
(
tsQueryPolicy
==
QUERY_POLICY_CLIENT
),
.
pConn
=
&
conn
,
.
pNodeList
=
pNodeList
,
.
pDag
=
pDag
,
.
allocatorRefId
=
pRequest
->
allocatorRefId
,
.
sql
=
pRequest
->
sqlstr
,
.
startTs
=
pRequest
->
metric
.
start
,
.
execFp
=
schedulerExecCb
,
.
cbParam
=
pWrapper
,
.
chkKillFp
=
chkRequestKilled
,
.
chkKillParam
=
(
void
*
)
pRequest
->
self
,
.
pExecRes
=
NULL
,
};
code
=
schedulerExecJob
(
&
req
,
&
pRequest
->
body
.
queryJob
);
taosArrayDestroy
(
pNodeList
);
}
else
{
tscDebug
(
"0x%"
PRIx64
" plan not executed, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
destorySqlCallbackWrapper
(
pWrapper
);
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
// todo not to be released here
taosArrayDestroy
(
pMnodeList
);
return
code
;
}
void
launchAsyncQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SMetaData
*
pResultMeta
,
SSqlCallbackWrapper
*
pWrapper
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
pRequest
->
body
.
execMode
=
pQuery
->
execMode
;
pRequest
->
body
.
execMode
=
pQuery
->
execMode
;
if
(
QUERY_EXEC_MODE_SCHEDULE
!=
pRequest
->
body
.
execMode
)
{
destorySqlCallbackWrapper
(
pWrapper
);
}
switch
(
pQuery
->
execMode
)
{
switch
(
pQuery
->
execMode
)
{
case
QUERY_EXEC_MODE_LOCAL
:
case
QUERY_EXEC_MODE_LOCAL
:
asyncExecLocalCmd
(
pRequest
,
pQuery
);
asyncExecLocalCmd
(
pRequest
,
pQuery
);
return
;
break
;
case
QUERY_EXEC_MODE_RPC
:
case
QUERY_EXEC_MODE_RPC
:
code
=
asyncExecDdlQuery
(
pRequest
,
pQuery
);
code
=
asyncExecDdlQuery
(
pRequest
,
pQuery
);
break
;
break
;
case
QUERY_EXEC_MODE_SCHEDULE
:
{
case
QUERY_EXEC_MODE_SCHEDULE
:
{
SArray
*
pMnodeList
=
taosArrayInit
(
4
,
sizeof
(
SQueryNodeLoad
));
code
=
asyncExecSchQuery
(
pRequest
,
pQuery
,
pResultMeta
,
pWrapper
);
pRequest
->
type
=
pQuery
->
msgType
;
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pUser
=
pRequest
->
pTscObj
->
user
,
.
sysInfo
=
pRequest
->
pTscObj
->
sysInfo
,
.
allocatorId
=
pRequest
->
allocatorRefId
};
SAppInstInfo
*
pAppInfo
=
getAppInfo
(
pRequest
);
SQueryPlan
*
pDag
=
NULL
;
code
=
qCreateQueryPlan
(
&
cxt
,
&
pDag
,
pMnodeList
);
if
(
code
)
{
tscError
(
"0x%"
PRIx64
" failed to create query plan, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
}
else
{
pRequest
->
body
.
subplanNum
=
pDag
->
numOfSubplans
;
}
pRequest
->
metric
.
planEnd
=
taosGetTimestampUs
();
if
(
TSDB_CODE_SUCCESS
==
code
&&
!
pRequest
->
validateOnly
)
{
SArray
*
pNodeList
=
NULL
;
buildAsyncExecNodeList
(
pRequest
,
&
pNodeList
,
pMnodeList
,
pResultMeta
);
SRequestConnInfo
conn
=
{
.
pTrans
=
pAppInfo
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
};
SSchedulerReq
req
=
{
.
syncReq
=
false
,
.
localReq
=
(
tsQueryPolicy
==
QUERY_POLICY_CLIENT
),
.
pConn
=
&
conn
,
.
pNodeList
=
pNodeList
,
.
pDag
=
pDag
,
.
allocatorRefId
=
pRequest
->
allocatorRefId
,
.
sql
=
pRequest
->
sqlstr
,
.
startTs
=
pRequest
->
metric
.
start
,
.
execFp
=
schedulerExecCb
,
.
cbParam
=
pRequest
,
.
chkKillFp
=
chkRequestKilled
,
.
chkKillParam
=
(
void
*
)
pRequest
->
self
,
.
pExecRes
=
NULL
,
};
code
=
schedulerExecJob
(
&
req
,
&
pRequest
->
body
.
queryJob
);
taosArrayDestroy
(
pNodeList
);
}
else
{
tscDebug
(
"0x%"
PRIx64
" plan not executed, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
// todo not to be released here
taosArrayDestroy
(
pMnodeList
);
break
;
break
;
}
}
case
QUERY_EXEC_MODE_EMPTY_RESULT
:
case
QUERY_EXEC_MODE_EMPTY_RESULT
:
...
...
source/client/src/clientMain.c
浏览文件 @
b9374718
...
@@ -667,43 +667,47 @@ const char *taos_get_server_info(TAOS *taos) {
...
@@ -667,43 +667,47 @@ const char *taos_get_server_info(TAOS *taos) {
return
pTscObj
->
sDetailVer
;
return
pTscObj
->
sDetailVer
;
}
}
typedef
struct
SqlParseWrapper
{
SParseContext
*
pCtx
;
SCatalogReq
catalogReq
;
SRequestObj
*
pRequest
;
}
SqlParseWrapper
;
static
void
destoryTablesReq
(
void
*
p
)
{
static
void
destoryTablesReq
(
void
*
p
)
{
STablesReq
*
pRes
=
(
STablesReq
*
)
p
;
STablesReq
*
pRes
=
(
STablesReq
*
)
p
;
taosArrayDestroy
(
pRes
->
pTables
);
taosArrayDestroy
(
pRes
->
pTables
);
}
}
static
void
destorySqlParseWrapper
(
SqlParseWrapper
*
pWrapper
)
{
static
void
destoryCatalogReq
(
SCatalogReq
*
pCatalogReq
)
{
taosArrayDestroy
(
pWrapper
->
catalogReq
.
pDbVgroup
);
if
(
NULL
==
pCatalogReq
)
{
taosArrayDestroy
(
pWrapper
->
catalogReq
.
pDbCfg
);
return
;
taosArrayDestroy
(
pWrapper
->
catalogReq
.
pDbInfo
);
}
taosArrayDestroyEx
(
pWrapper
->
catalogReq
.
pTableMeta
,
destoryTablesReq
);
taosArrayDestroy
(
pCatalogReq
->
pDbVgroup
);
taosArrayDestroyEx
(
pWrapper
->
catalogReq
.
pTableHash
,
destoryTablesReq
);
taosArrayDestroy
(
pCatalogReq
->
pDbCfg
);
taosArrayDestroy
(
pWrapper
->
catalogReq
.
pUdf
);
taosArrayDestroy
(
pCatalogReq
->
pDbInfo
);
taosArrayDestroy
(
pWrapper
->
catalogReq
.
pIndex
);
taosArrayDestroyEx
(
pCatalogReq
->
pTableMeta
,
destoryTablesReq
);
taosArrayDestroy
(
pWrapper
->
catalogReq
.
pUser
);
taosArrayDestroyEx
(
pCatalogReq
->
pTableHash
,
destoryTablesReq
);
taosArrayDestroy
(
pWrapper
->
catalogReq
.
pTableIndex
);
taosArrayDestroy
(
pCatalogReq
->
pUdf
);
taosArrayDestroy
(
pWrapper
->
pCtx
->
pTableMetaPos
);
taosArrayDestroy
(
pCatalogReq
->
pIndex
);
taosArrayDestroy
(
pWrapper
->
pCtx
->
pTableVgroupPos
);
taosArrayDestroy
(
pCatalogReq
->
pUser
);
taosMemoryFree
(
pWrapper
->
pCtx
);
taosArrayDestroy
(
pCatalogReq
->
pTableIndex
);
taosMemoryFree
(
pCatalogReq
);
}
void
destorySqlCallbackWrapper
(
SSqlCallbackWrapper
*
pWrapper
)
{
if
(
NULL
==
pWrapper
)
{
return
;
}
destoryCatalogReq
(
pWrapper
->
pCatalogReq
);
qDestroyParseContext
(
pWrapper
->
pParseCtx
);
catalogFreeMetaData
(
pWrapper
->
pResultMeta
);
taosMemoryFree
(
pWrapper
);
taosMemoryFree
(
pWrapper
);
}
}
void
retrieveMetaCallback
(
SMetaData
*
pResultMeta
,
void
*
param
,
int32_t
code
)
{
void
retrieveMetaCallback
(
SMetaData
*
pResultMeta
,
void
*
param
,
int32_t
code
)
{
S
qlParseWrapper
*
pWrapper
=
(
SqlParse
Wrapper
*
)
param
;
S
SqlCallbackWrapper
*
pWrapper
=
(
SSqlCallback
Wrapper
*
)
param
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
SQuery
*
pQuery
=
pRequest
->
pQuery
;
SQuery
*
pQuery
=
pRequest
->
pQuery
;
pRequest
->
metric
.
ctgEnd
=
taosGetTimestampUs
();
pRequest
->
metric
.
ctgEnd
=
taosGetTimestampUs
();
qDebug
(
"0x%"
PRIx64
" start to semantic analysis, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
requestId
);
qDebug
(
"0x%"
PRIx64
" start to semantic analysis, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
requestId
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
code
=
qAnalyseSqlSemantic
(
pWrapper
->
p
Ctx
,
&
pWrapper
->
c
atalogReq
,
pResultMeta
,
pQuery
);
code
=
qAnalyseSqlSemantic
(
pWrapper
->
p
ParseCtx
,
pWrapper
->
pC
atalogReq
,
pResultMeta
,
pQuery
);
pRequest
->
stableQuery
=
pQuery
->
stableQuery
;
pRequest
->
stableQuery
=
pQuery
->
stableQuery
;
if
(
pQuery
->
pRoot
)
{
if
(
pQuery
->
pRoot
)
{
pRequest
->
stmtType
=
pQuery
->
pRoot
->
type
;
pRequest
->
stmtType
=
pQuery
->
pRoot
->
type
;
...
@@ -712,6 +716,13 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
...
@@ -712,6 +716,13 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
pRequest
->
metric
.
semanticEnd
=
taosGetTimestampUs
();
pRequest
->
metric
.
semanticEnd
=
taosGetTimestampUs
();
if
(
code
==
TSDB_CODE_SUCCESS
&&
pWrapper
->
pParseCtx
->
needMultiParse
)
{
pWrapper
->
pResultMeta
=
catalogCloneMetaData
(
pResultMeta
);
if
(
NULL
==
pWrapper
->
pResultMeta
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
pQuery
->
haveResultSet
)
{
if
(
pQuery
->
haveResultSet
)
{
setResSchemaInfo
(
&
pRequest
->
body
.
resInfo
,
pQuery
->
pResSchema
,
pQuery
->
numOfResCols
);
setResSchemaInfo
(
&
pRequest
->
body
.
resInfo
,
pQuery
->
pResSchema
,
pQuery
->
numOfResCols
);
...
@@ -722,15 +733,13 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
...
@@ -722,15 +733,13 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
TSWAP
(
pRequest
->
tableList
,
(
pQuery
)
->
pTableList
);
TSWAP
(
pRequest
->
tableList
,
(
pQuery
)
->
pTableList
);
TSWAP
(
pRequest
->
targetTableList
,
(
pQuery
)
->
pTargetTableList
);
TSWAP
(
pRequest
->
targetTableList
,
(
pQuery
)
->
pTargetTableList
);
destorySqlParseWrapper
(
pWrapper
);
double
el
=
(
pRequest
->
metric
.
semanticEnd
-
pRequest
->
metric
.
ctgEnd
)
/
1000
.
0
;
double
el
=
(
pRequest
->
metric
.
semanticEnd
-
pRequest
->
metric
.
ctgEnd
)
/
1000
.
0
;
tscDebug
(
"0x%"
PRIx64
" analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%"
PRIx64
,
tscDebug
(
"0x%"
PRIx64
" analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
el
,
pRequest
->
requestId
);
pRequest
->
self
,
el
,
pRequest
->
requestId
);
launchAsyncQuery
(
pRequest
,
pQuery
,
pResultMeta
);
launchAsyncQuery
(
pRequest
,
pQuery
,
pResultMeta
,
pWrapper
);
}
else
{
}
else
{
destorySql
Parse
Wrapper
(
pWrapper
);
destorySql
Callback
Wrapper
(
pWrapper
);
qDestroyQuery
(
pRequest
->
pQuery
);
qDestroyQuery
(
pRequest
->
pQuery
);
pRequest
->
pQuery
=
NULL
;
pRequest
->
pQuery
=
NULL
;
...
@@ -750,6 +759,16 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
...
@@ -750,6 +759,16 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
}
}
}
int32_t
continueInsertFromCsv
(
SSqlCallbackWrapper
*
pWrapper
,
SRequestObj
*
pRequest
)
{
qDestroyQuery
(
pRequest
->
pQuery
);
pRequest
->
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
if
(
NULL
==
pRequest
->
pQuery
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
retrieveMetaCallback
(
pWrapper
->
pResultMeta
,
pWrapper
,
TSDB_CODE_SUCCESS
);
return
TSDB_CODE_SUCCESS
;
}
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
)
{
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
)
{
int64_t
connId
=
*
(
int64_t
*
)
taos
;
int64_t
connId
=
*
(
int64_t
*
)
taos
;
taosAsyncQueryImpl
(
connId
,
sql
,
fp
,
param
,
false
);
taosAsyncQueryImpl
(
connId
,
sql
,
fp
,
param
,
false
);
...
@@ -786,37 +805,48 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
...
@@ -786,37 +805,48 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
}
}
void
doAsyncQuery
(
SRequestObj
*
pRequest
,
bool
updateMetaForce
)
{
void
doAsyncQuery
(
SRequestObj
*
pRequest
,
bool
updateMetaForce
)
{
S
ParseContext
*
pCxt
=
NULL
;
S
TscObj
*
pTscObj
=
pRequest
->
pTscObj
;
S
TscObj
*
pTscObj
=
pRequest
->
pTscObj
;
S
SqlCallbackWrapper
*
pWrapper
=
NULL
;
int32_t
code
=
0
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pRequest
->
retry
++
>
REQUEST_TOTAL_EXEC_TIMES
)
{
if
(
pRequest
->
retry
++
>
REQUEST_TOTAL_EXEC_TIMES
)
{
code
=
pRequest
->
prevCode
;
code
=
pRequest
->
prevCode
;
goto
_error
;
}
}
code
=
createParseContext
(
pRequest
,
&
pCxt
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SSqlCallbackWrapper
));
goto
_error
;
if
(
pWrapper
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
pWrapper
->
pRequest
=
pRequest
;
}
}
}
pCxt
->
mgmtEpSet
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCxt
->
pCatalog
);
code
=
createParseContext
(
pRequest
,
&
pWrapper
->
pParseCtx
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
}
pRequest
->
metric
.
syntaxStart
=
taosGetTimestampUs
();
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pWrapper
->
pParseCtx
->
mgmtEpSet
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
SCatalogReq
catalogReq
=
{.
forceUpdate
=
updateMetaForce
,
.
qNodeRequired
=
qnodeRequired
(
pRequest
)};
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pWrapper
->
pParseCtx
->
pCatalog
);
code
=
qParseSqlSyntax
(
pCxt
,
&
pRequest
->
pQuery
,
&
catalogReq
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
}
pRequest
->
metric
.
syntaxEnd
=
taosGetTimestampUs
();
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pRequest
->
metric
.
syntaxStart
=
taosGetTimestampUs
();
pWrapper
->
pCatalogReq
=
taosMemoryCalloc
(
1
,
sizeof
(
SCatalogReq
));
if
(
pWrapper
->
pCatalogReq
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
pWrapper
->
pCatalogReq
->
forceUpdate
=
updateMetaForce
;
pWrapper
->
pCatalogReq
->
qNodeRequired
=
qnodeRequired
(
pRequest
);
code
=
qParseSqlSyntax
(
pWrapper
->
pParseCtx
,
&
pRequest
->
pQuery
,
pWrapper
->
pCatalogReq
);
}
pRequest
->
metric
.
syntaxEnd
=
taosGetTimestampUs
();
}
if
(
!
updateMetaForce
)
{
if
(
TSDB_CODE_SUCCESS
==
code
&&
!
updateMetaForce
)
{
SAppClusterSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
SAppClusterSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
if
(
NULL
==
pRequest
->
pQuery
->
pRoot
)
{
if
(
NULL
==
pRequest
->
pQuery
->
pRoot
)
{
atomic_add_fetch_64
((
int64_t
*
)
&
pActivity
->
numOfInsertsReq
,
1
);
atomic_add_fetch_64
((
int64_t
*
)
&
pActivity
->
numOfInsertsReq
,
1
);
...
@@ -825,38 +855,26 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
...
@@ -825,38 +855,26 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
}
}
}
}
SqlParseWrapper
*
pWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SqlParseWrapper
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
pWrapper
==
NULL
)
{
SRequestConnInfo
conn
=
{.
pTrans
=
pWrapper
->
pParseCtx
->
pTransporter
,
code
=
TSDB_CODE_OUT_OF_MEMORY
;
.
requestId
=
pWrapper
->
pParseCtx
->
requestId
,
goto
_error
;
.
requestObjRefId
=
pWrapper
->
pParseCtx
->
requestRid
,
}
.
mgmtEps
=
pWrapper
->
pParseCtx
->
mgmtEpSet
};
pWrapper
->
pCtx
=
pCxt
;
pWrapper
->
pRequest
=
pRequest
;
pWrapper
->
catalogReq
=
catalogReq
;
SRequestConnInfo
conn
=
{.
pTrans
=
pCxt
->
pTransporter
,
pRequest
->
metric
.
ctgStart
=
taosGetTimestampUs
();
.
requestId
=
pCxt
->
requestId
,
.
requestObjRefId
=
pCxt
->
requestRid
,
.
mgmtEps
=
pCxt
->
mgmtEpSet
};
pRequest
->
metric
.
ctgStart
=
taosGetTimestampUs
();
code
=
catalogAsyncGetAllMeta
(
pWrapper
->
pParseCtx
->
pCatalog
,
&
conn
,
pWrapper
->
pCatalogReq
,
retrieveMetaCallback
,
pWrapper
,
&
pRequest
->
body
.
queryJob
);
code
=
catalogAsyncGetAllMeta
(
pCxt
->
pCatalog
,
&
conn
,
&
catalogReq
,
retrieveMetaCallback
,
pWrapper
,
&
pRequest
->
body
.
queryJob
);
pCxt
=
NULL
;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
return
;
}
}
_error:
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
tscError
(
"0x%"
PRIx64
" error happens, code:%d - %s, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
tscError
(
"0x%"
PRIx64
" error happens, code:%d - %s, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
requestId
);
pRequest
->
requestId
);
taosMemoryFree
(
pCxt
);
destorySqlCallbackWrapper
(
pWrapper
);
terrno
=
code
;
terrno
=
code
;
pRequest
->
code
=
code
;
pRequest
->
code
=
code
;
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
)
;
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
}
}
static
void
fetchCallback
(
void
*
pResult
,
void
*
param
,
int32_t
code
)
{
static
void
fetchCallback
(
void
*
pResult
,
void
*
param
,
int32_t
code
)
{
...
...
source/client/src/clientSml.c
浏览文件 @
b9374718
此差异已折叠。
点击以展开。
source/common/src/tglobal.c
浏览文件 @
b9374718
...
@@ -85,7 +85,8 @@ uint16_t tsTelemPort = 80;
...
@@ -85,7 +85,8 @@ uint16_t tsTelemPort = 80;
char
tsSmlTagName
[
TSDB_COL_NAME_LEN
]
=
"_tag_null"
;
char
tsSmlTagName
[
TSDB_COL_NAME_LEN
]
=
"_tag_null"
;
char
tsSmlChildTableName
[
TSDB_TABLE_NAME_LEN
]
=
""
;
// user defined child table name can be specified in tag value.
char
tsSmlChildTableName
[
TSDB_TABLE_NAME_LEN
]
=
""
;
// user defined child table name can be specified in tag value.
// If set to empty system will generate table name using MD5 hash.
// If set to empty system will generate table name using MD5 hash.
bool
tsSmlDataFormat
=
false
;
// true means that the name and order of cols in each line are the same(only for influx protocol)
// true means that the name and order of cols in each line are the same(only for influx protocol)
bool
tsSmlDataFormat
=
false
;
// query
// query
int32_t
tsQueryPolicy
=
1
;
int32_t
tsQueryPolicy
=
1
;
...
@@ -125,6 +126,9 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
...
@@ -125,6 +126,9 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 database precision unit for interval time range, changed accordingly
// 1 database precision unit for interval time range, changed accordingly
int32_t
tsMinIntervalTime
=
1
;
int32_t
tsMinIntervalTime
=
1
;
// maximum memory allowed to be allocated for a single csv load (in MB)
int32_t
tsMaxMemUsedByInsert
=
1024
;
// the maximum allowed query buffer size during query processing for each data node.
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// -1 no limit (default)
// 0 no query allowed, queries are disabled
// 0 no query allowed, queries are disabled
...
@@ -296,6 +300,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
...
@@ -296,6 +300,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddString
(
pCfg
,
"smlChildTableName"
,
""
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"smlChildTableName"
,
""
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"smlTagName"
,
tsSmlTagName
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"smlTagName"
,
tsSmlTagName
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"smlDataFormat"
,
tsSmlDataFormat
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"smlDataFormat"
,
tsSmlDataFormat
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"maxMemUsedByInsert"
,
tsMaxMemUsedByInsert
,
1
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
tsNumOfTaskQueueThreads
=
tsNumOfCores
/
2
;
tsNumOfTaskQueueThreads
=
tsNumOfCores
/
2
;
tsNumOfTaskQueueThreads
=
TMAX
(
tsNumOfTaskQueueThreads
,
4
);
tsNumOfTaskQueueThreads
=
TMAX
(
tsNumOfTaskQueueThreads
,
4
);
...
@@ -374,8 +379,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -374,8 +379,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeStreamThreads
=
TMAX
(
tsNumOfVnodeStreamThreads
,
4
);
tsNumOfVnodeStreamThreads
=
TMAX
(
tsNumOfVnodeStreamThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeStreamThreads"
,
tsNumOfVnodeStreamThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeStreamThreads"
,
tsNumOfVnodeStreamThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
// tsNumOfVnodeFetchThreads = 1;
// tsNumOfVnodeFetchThreads = 1;
// if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1, 0) != 0) return -1;
// if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1, 0) != 0) return -1;
tsNumOfVnodeWriteThreads
=
tsNumOfCores
;
tsNumOfVnodeWriteThreads
=
tsNumOfCores
;
tsNumOfVnodeWriteThreads
=
TMAX
(
tsNumOfVnodeWriteThreads
,
1
);
tsNumOfVnodeWriteThreads
=
TMAX
(
tsNumOfVnodeWriteThreads
,
1
);
...
@@ -497,15 +502,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
...
@@ -497,15 +502,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem
->
stype
=
stype
;
pItem
->
stype
=
stype
;
}
}
/*
/*
pItem = cfgGetItem(tsCfg, "numOfVnodeFetchThreads");
pItem = cfgGetItem(tsCfg, "numOfVnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeFetchThreads = numOfCores / 4;
tsNumOfVnodeFetchThreads = numOfCores / 4;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
pItem->i32 = tsNumOfVnodeFetchThreads;
pItem->i32 = tsNumOfVnodeFetchThreads;
pItem->stype = stype;
pItem->stype = stype;
}
}
*/
*/
pItem
=
cfgGetItem
(
tsCfg
,
"numOfVnodeWriteThreads"
);
pItem
=
cfgGetItem
(
tsCfg
,
"numOfVnodeWriteThreads"
);
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
...
@@ -648,6 +653,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
...
@@ -648,6 +653,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy
(
tsSmlTagName
,
cfgGetItem
(
pCfg
,
"smlTagName"
)
->
str
,
TSDB_COL_NAME_LEN
);
tstrncpy
(
tsSmlTagName
,
cfgGetItem
(
pCfg
,
"smlTagName"
)
->
str
,
TSDB_COL_NAME_LEN
);
tsSmlDataFormat
=
cfgGetItem
(
pCfg
,
"smlDataFormat"
)
->
bval
;
tsSmlDataFormat
=
cfgGetItem
(
pCfg
,
"smlDataFormat"
)
->
bval
;
tsMaxMemUsedByInsert
=
cfgGetItem
(
pCfg
,
"maxMemUsedByInsert"
)
->
i32
;
tsShellActivityTimer
=
cfgGetItem
(
pCfg
,
"shellActivityTimer"
)
->
i32
;
tsShellActivityTimer
=
cfgGetItem
(
pCfg
,
"shellActivityTimer"
)
->
i32
;
tsCompressMsgSize
=
cfgGetItem
(
pCfg
,
"compressMsgSize"
)
->
i32
;
tsCompressMsgSize
=
cfgGetItem
(
pCfg
,
"compressMsgSize"
)
->
i32
;
tsCompressColData
=
cfgGetItem
(
pCfg
,
"compressColData"
)
->
i32
;
tsCompressColData
=
cfgGetItem
(
pCfg
,
"compressColData"
)
->
i32
;
...
@@ -705,7 +712,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
...
@@ -705,7 +712,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfMnodeReadThreads
=
cfgGetItem
(
pCfg
,
"numOfMnodeReadThreads"
)
->
i32
;
tsNumOfMnodeReadThreads
=
cfgGetItem
(
pCfg
,
"numOfMnodeReadThreads"
)
->
i32
;
tsNumOfVnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeQueryThreads"
)
->
i32
;
tsNumOfVnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeQueryThreads"
)
->
i32
;
tsNumOfVnodeStreamThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeStreamThreads"
)
->
i32
;
tsNumOfVnodeStreamThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeStreamThreads"
)
->
i32
;
// tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
// tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeWriteThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeWriteThreads"
)
->
i32
;
tsNumOfVnodeWriteThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeWriteThreads"
)
->
i32
;
tsNumOfVnodeSyncThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeSyncThreads"
)
->
i32
;
tsNumOfVnodeSyncThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeSyncThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
...
@@ -877,6 +884,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
...
@@ -877,6 +884,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsMaxShellConns
=
cfgGetItem
(
pCfg
,
"maxShellConns"
)
->
i32
;
tsMaxShellConns
=
cfgGetItem
(
pCfg
,
"maxShellConns"
)
->
i32
;
}
else
if
(
strcasecmp
(
"maxNumOfDistinctRes"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"maxNumOfDistinctRes"
,
name
)
==
0
)
{
tsMaxNumOfDistinctResults
=
cfgGetItem
(
pCfg
,
"maxNumOfDistinctRes"
)
->
i32
;
tsMaxNumOfDistinctResults
=
cfgGetItem
(
pCfg
,
"maxNumOfDistinctRes"
)
->
i32
;
}
else
if
(
strcasecmp
(
"maxMemUsedByInsert"
,
name
)
==
0
)
{
tsMaxMemUsedByInsert
=
cfgGetItem
(
pCfg
,
"maxMemUsedByInsert"
)
->
i32
;
}
}
break
;
break
;
}
}
...
@@ -955,10 +964,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
...
@@ -955,10 +964,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfMnodeReadThreads
=
cfgGetItem
(
pCfg
,
"numOfMnodeReadThreads"
)
->
i32
;
tsNumOfMnodeReadThreads
=
cfgGetItem
(
pCfg
,
"numOfMnodeReadThreads"
)
->
i32
;
}
else
if
(
strcasecmp
(
"numOfVnodeQueryThreads"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"numOfVnodeQueryThreads"
,
name
)
==
0
)
{
tsNumOfVnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeQueryThreads"
)
->
i32
;
tsNumOfVnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeQueryThreads"
)
->
i32
;
/*
/*
} else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) {
} else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) {
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
*/
*/
}
else
if
(
strcasecmp
(
"numOfVnodeWriteThreads"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"numOfVnodeWriteThreads"
,
name
)
==
0
)
{
tsNumOfVnodeWriteThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeWriteThreads"
)
->
i32
;
tsNumOfVnodeWriteThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeWriteThreads"
)
->
i32
;
}
else
if
(
strcasecmp
(
"numOfVnodeSyncThreads"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"numOfVnodeSyncThreads"
,
name
)
==
0
)
{
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
b9374718
...
@@ -1193,4 +1193,232 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
...
@@ -1193,4 +1193,232 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
return
(
SName
*
)
taosArrayGet
(
pReq
->
pTables
,
pFetch
->
tbIdx
);
return
(
SName
*
)
taosArrayGet
(
pReq
->
pTables
,
pFetch
->
tbIdx
);
}
}
static
void
*
ctgCloneDbVgroup
(
void
*
pSrc
)
{
return
taosArrayDup
((
const
SArray
*
)
pSrc
);
}
static
void
ctgFreeDbVgroup
(
void
*
p
)
{
taosArrayDestroy
((
SArray
*
)((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneDbCfgInfo
(
void
*
pSrc
)
{
SDbCfgInfo
*
pDst
=
taosMemoryMalloc
(
sizeof
(
SDbCfgInfo
));
if
(
NULL
==
pDst
)
{
return
NULL
;
}
memcpy
(
pDst
,
pSrc
,
sizeof
(
SDbCfgInfo
));
return
pDst
;
}
static
void
ctgFreeDbCfgInfo
(
void
*
p
)
{
taosMemoryFree
(((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneDbInfo
(
void
*
pSrc
)
{
SDbInfo
*
pDst
=
taosMemoryMalloc
(
sizeof
(
SDbInfo
));
if
(
NULL
==
pDst
)
{
return
NULL
;
}
memcpy
(
pDst
,
pSrc
,
sizeof
(
SDbInfo
));
return
pDst
;
}
static
void
ctgFreeDbInfo
(
void
*
p
)
{
taosMemoryFree
(((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneTableMeta
(
void
*
pSrc
)
{
STableMeta
*
pMeta
=
pSrc
;
int32_t
size
=
sizeof
(
STableMeta
)
+
(
pMeta
->
tableInfo
.
numOfColumns
+
pMeta
->
tableInfo
.
numOfTags
)
*
sizeof
(
SSchema
);
STableMeta
*
pDst
=
taosMemoryMalloc
(
size
);
if
(
NULL
==
pDst
)
{
return
NULL
;
}
memcpy
(
pDst
,
pSrc
,
size
);
return
pDst
;
}
static
void
ctgFreeTableMeta
(
void
*
p
)
{
taosMemoryFree
(((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneVgroupInfo
(
void
*
pSrc
)
{
SVgroupInfo
*
pDst
=
taosMemoryMalloc
(
sizeof
(
SVgroupInfo
));
if
(
NULL
==
pDst
)
{
return
NULL
;
}
memcpy
(
pDst
,
pSrc
,
sizeof
(
SVgroupInfo
));
return
pDst
;
}
static
void
ctgFreeVgroupInfo
(
void
*
p
)
{
taosMemoryFree
(((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneTableIndices
(
void
*
pSrc
)
{
return
taosArrayDup
((
const
SArray
*
)
pSrc
);
}
static
void
ctgFreeTableIndices
(
void
*
p
)
{
taosArrayDestroy
((
SArray
*
)((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneFuncInfo
(
void
*
pSrc
)
{
SFuncInfo
*
pDst
=
taosMemoryMalloc
(
sizeof
(
SFuncInfo
));
if
(
NULL
==
pDst
)
{
return
NULL
;
}
memcpy
(
pDst
,
pSrc
,
sizeof
(
SFuncInfo
));
return
pDst
;
}
static
void
ctgFreeFuncInfo
(
void
*
p
)
{
taosMemoryFree
(((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneIndexInfo
(
void
*
pSrc
)
{
SIndexInfo
*
pDst
=
taosMemoryMalloc
(
sizeof
(
SIndexInfo
));
if
(
NULL
==
pDst
)
{
return
NULL
;
}
memcpy
(
pDst
,
pSrc
,
sizeof
(
SIndexInfo
));
return
pDst
;
}
static
void
ctgFreeIndexInfo
(
void
*
p
)
{
taosMemoryFree
(((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneUserAuth
(
void
*
pSrc
)
{
bool
*
pDst
=
taosMemoryMalloc
(
sizeof
(
bool
));
if
(
NULL
==
pDst
)
{
return
NULL
;
}
*
pDst
=
*
(
bool
*
)
pSrc
;
return
pDst
;
}
static
void
ctgFreeUserAuth
(
void
*
p
)
{
taosMemoryFree
(((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneQnodeList
(
void
*
pSrc
)
{
return
taosArrayDup
((
const
SArray
*
)
pSrc
);
}
static
void
ctgFreeQnodeList
(
void
*
p
)
{
taosArrayDestroy
((
SArray
*
)((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneTableCfg
(
void
*
pSrc
)
{
STableCfg
*
pDst
=
taosMemoryMalloc
(
sizeof
(
STableCfg
));
if
(
NULL
==
pDst
)
{
return
NULL
;
}
memcpy
(
pDst
,
pSrc
,
sizeof
(
STableCfg
));
return
pDst
;
}
static
void
ctgFreeTableCfg
(
void
*
p
)
{
taosMemoryFree
(((
SMetaRes
*
)
p
)
->
pRes
);
}
static
void
*
ctgCloneDnodeList
(
void
*
pSrc
)
{
return
taosArrayDup
((
const
SArray
*
)
pSrc
);
}
static
void
ctgFreeDnodeList
(
void
*
p
)
{
taosArrayDestroy
((
SArray
*
)((
SMetaRes
*
)
p
)
->
pRes
);
}
static
int32_t
ctgCloneMetaDataArray
(
SArray
*
pSrc
,
FCopy
copyFunc
,
SArray
**
pDst
)
{
if
(
NULL
==
pSrc
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
size
=
taosArrayGetSize
(
pSrc
);
*
pDst
=
taosArrayInit
(
size
,
sizeof
(
SMetaRes
));
if
(
NULL
==
*
pDst
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SMetaRes
*
pRes
=
taosArrayGet
(
pSrc
,
i
);
SMetaRes
res
=
{.
code
=
pRes
->
code
,
.
pRes
=
copyFunc
(
pRes
->
pRes
)};
if
(
NULL
==
res
.
pRes
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
taosArrayPush
(
*
pDst
,
&
res
);
}
return
TSDB_CODE_SUCCESS
;
}
SMetaData
*
catalogCloneMetaData
(
SMetaData
*
pData
)
{
SMetaData
*
pRes
=
taosMemoryCalloc
(
1
,
sizeof
(
SMetaData
));
if
(
NULL
==
pRes
)
{
return
NULL
;
}
int32_t
code
=
ctgCloneMetaDataArray
(
pData
->
pDbVgroup
,
ctgCloneDbVgroup
,
&
pRes
->
pDbVgroup
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pDbCfg
,
ctgCloneDbCfgInfo
,
&
pRes
->
pDbCfg
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pDbInfo
,
ctgCloneDbInfo
,
&
pRes
->
pDbInfo
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pTableMeta
,
ctgCloneTableMeta
,
&
pRes
->
pTableMeta
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pTableHash
,
ctgCloneVgroupInfo
,
&
pRes
->
pTableHash
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pTableIndex
,
ctgCloneTableIndices
,
&
pRes
->
pTableIndex
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pUdfList
,
ctgCloneFuncInfo
,
&
pRes
->
pUdfList
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pIndex
,
ctgCloneIndexInfo
,
&
pRes
->
pIndex
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pUser
,
ctgCloneUserAuth
,
&
pRes
->
pUser
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pQnodeList
,
ctgCloneQnodeList
,
&
pRes
->
pQnodeList
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pTableCfg
,
ctgCloneTableCfg
,
&
pRes
->
pTableCfg
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
ctgCloneMetaDataArray
(
pData
->
pDnodeList
,
ctgCloneDnodeList
,
&
pRes
->
pDnodeList
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
catalogFreeMetaData
(
pRes
);
return
NULL
;
}
return
pRes
;
}
void
catalogFreeMetaData
(
SMetaData
*
pData
)
{
if
(
NULL
==
pData
)
{
return
;
}
taosArrayDestroyEx
(
pData
->
pDbVgroup
,
ctgFreeDbVgroup
);
taosArrayDestroyEx
(
pData
->
pDbCfg
,
ctgFreeDbCfgInfo
);
taosArrayDestroyEx
(
pData
->
pDbInfo
,
ctgFreeDbInfo
);
taosArrayDestroyEx
(
pData
->
pTableMeta
,
ctgFreeTableMeta
);
taosArrayDestroyEx
(
pData
->
pTableHash
,
ctgFreeVgroupInfo
);
taosArrayDestroyEx
(
pData
->
pTableIndex
,
ctgFreeTableIndices
);
taosArrayDestroyEx
(
pData
->
pUdfList
,
ctgFreeFuncInfo
);
taosArrayDestroyEx
(
pData
->
pIndex
,
ctgFreeIndexInfo
);
taosArrayDestroyEx
(
pData
->
pUser
,
ctgFreeUserAuth
);
taosArrayDestroyEx
(
pData
->
pQnodeList
,
ctgFreeQnodeList
);
taosArrayDestroyEx
(
pData
->
pTableCfg
,
ctgFreeTableCfg
);
taosArrayDestroyEx
(
pData
->
pDnodeList
,
ctgFreeDnodeList
);
taosMemoryFreeClear
(
pData
->
pSvrVer
);
taosMemoryFree
(
pData
);
}
source/libs/parser/src/parInsert.c
浏览文件 @
b9374718
...
@@ -266,7 +266,7 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pT
...
@@ -266,7 +266,7 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pT
return
catalogGetTableHashVgroup
(
pBasicCtx
->
pCatalog
,
&
conn
,
pTbName
,
pVg
);
return
catalogGetTableHashVgroup
(
pBasicCtx
->
pCatalog
,
&
conn
,
pTbName
,
pVg
);
}
}
static
int32_t
getTableMetaImpl
(
SInsertParseContext
*
pCxt
,
int32_t
tbNo
,
SName
*
name
,
char
*
dbFname
,
bool
isStb
)
{
static
int32_t
getTableMetaImpl
(
SInsertParseContext
*
pCxt
,
int32_t
tbNo
,
SName
*
name
,
bool
isStb
)
{
CHECK_CODE
(
getTableSchema
(
pCxt
,
tbNo
,
name
,
isStb
,
&
pCxt
->
pTableMeta
));
CHECK_CODE
(
getTableSchema
(
pCxt
,
tbNo
,
name
,
isStb
,
&
pCxt
->
pTableMeta
));
if
(
!
isStb
)
{
if
(
!
isStb
)
{
SVgroupInfo
vg
;
SVgroupInfo
vg
;
...
@@ -276,12 +276,12 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName*
...
@@ -276,12 +276,12 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName*
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
getTableMeta
(
SInsertParseContext
*
pCxt
,
int32_t
tbNo
,
SName
*
name
,
char
*
dbFname
)
{
static
int32_t
getTableMeta
(
SInsertParseContext
*
pCxt
,
int32_t
tbNo
,
SName
*
name
)
{
return
getTableMetaImpl
(
pCxt
,
tbNo
,
name
,
dbFname
,
false
);
return
getTableMetaImpl
(
pCxt
,
tbNo
,
name
,
false
);
}
}
static
int32_t
getSTableMeta
(
SInsertParseContext
*
pCxt
,
int32_t
tbNo
,
SName
*
name
,
char
*
dbFname
)
{
static
int32_t
getSTableMeta
(
SInsertParseContext
*
pCxt
,
int32_t
tbNo
,
SName
*
name
)
{
return
getTableMetaImpl
(
pCxt
,
tbNo
,
name
,
dbFname
,
true
);
return
getTableMetaImpl
(
pCxt
,
tbNo
,
name
,
true
);
}
}
static
int32_t
getDBCfg
(
SInsertParseContext
*
pCxt
,
const
char
*
pDbFName
,
SDbCfgInfo
*
pInfo
)
{
static
int32_t
getDBCfg
(
SInsertParseContext
*
pCxt
,
const
char
*
pDbFName
,
SDbCfgInfo
*
pInfo
)
{
...
@@ -1178,7 +1178,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName*
...
@@ -1178,7 +1178,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName*
tNameGetFullDbName
(
&
sname
,
dbFName
);
tNameGetFullDbName
(
&
sname
,
dbFName
);
strcpy
(
pCxt
->
sTableName
,
sname
.
tname
);
strcpy
(
pCxt
->
sTableName
,
sname
.
tname
);
CHECK_CODE
(
getSTableMeta
(
pCxt
,
tbNo
,
&
sname
,
dbFName
));
CHECK_CODE
(
getSTableMeta
(
pCxt
,
tbNo
,
&
sname
));
if
(
TSDB_SUPER_TABLE
!=
pCxt
->
pTableMeta
->
tableType
)
{
if
(
TSDB_SUPER_TABLE
!=
pCxt
->
pTableMeta
->
tableType
)
{
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"create table only from super table is allowed"
);
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"create table only from super table is allowed"
);
}
}
...
@@ -1385,6 +1385,10 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataB
...
@@ -1385,6 +1385,10 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataB
(
*
numOfRows
)
++
;
(
*
numOfRows
)
++
;
}
}
pCxt
->
pSql
=
pRawSql
;
pCxt
->
pSql
=
pRawSql
;
if
(
pDataBlock
->
nAllocSize
>
tsMaxMemUsedByInsert
*
1024
*
1024
)
{
break
;
}
}
}
if
(
0
==
(
*
numOfRows
)
&&
(
!
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
)))
{
if
(
0
==
(
*
numOfRows
)
&&
(
!
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
)))
{
...
@@ -1393,23 +1397,13 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataB
...
@@ -1393,23 +1397,13 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataB
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
parseDataFromFile
(
SInsertParseContext
*
pCxt
,
SToken
filePath
,
STableDataBlocks
*
dataBuf
)
{
static
int32_t
parseDataFromFileAgain
(
SInsertParseContext
*
pCxt
,
int16_t
tableNo
,
const
SName
*
pTableName
,
char
filePathStr
[
TSDB_FILENAME_LEN
]
=
{
0
};
STableDataBlocks
*
dataBuf
)
{
if
(
TK_NK_STRING
==
filePath
.
type
)
{
trimString
(
filePath
.
z
,
filePath
.
n
,
filePathStr
,
sizeof
(
filePathStr
));
}
else
{
strncpy
(
filePathStr
,
filePath
.
z
,
filePath
.
n
);
}
TdFilePtr
fp
=
taosOpenFile
(
filePathStr
,
TD_FILE_READ
|
TD_FILE_STREAM
);
if
(
NULL
==
fp
)
{
return
TAOS_SYSTEM_ERROR
(
errno
);
}
int32_t
maxNumOfRows
;
int32_t
maxNumOfRows
;
CHECK_CODE
(
allocateMemIfNeed
(
dataBuf
,
getExtendedRowSize
(
dataBuf
),
&
maxNumOfRows
));
CHECK_CODE
(
allocateMemIfNeed
(
dataBuf
,
getExtendedRowSize
(
dataBuf
),
&
maxNumOfRows
));
int32_t
numOfRows
=
0
;
int32_t
numOfRows
=
0
;
CHECK_CODE
(
parseCsvFile
(
pCxt
,
fp
,
dataBuf
,
maxNumOfRows
,
&
numOfRows
));
CHECK_CODE
(
parseCsvFile
(
pCxt
,
pCxt
->
pComCxt
->
csvCxt
.
fp
,
dataBuf
,
maxNumOfRows
,
&
numOfRows
));
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)(
dataBuf
->
pData
);
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)(
dataBuf
->
pData
);
if
(
TSDB_CODE_SUCCESS
!=
setBlockInfo
(
pBlocks
,
dataBuf
,
numOfRows
))
{
if
(
TSDB_CODE_SUCCESS
!=
setBlockInfo
(
pBlocks
,
dataBuf
,
numOfRows
))
{
...
@@ -1417,12 +1411,38 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
...
@@ -1417,12 +1411,38 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
"too many rows in sql, total number of rows should be less than INT32_MAX"
);
"too many rows in sql, total number of rows should be less than INT32_MAX"
);
}
}
if
(
!
taosEOFFile
(
pCxt
->
pComCxt
->
csvCxt
.
fp
))
{
pCxt
->
pComCxt
->
needMultiParse
=
true
;
pCxt
->
pComCxt
->
csvCxt
.
tableNo
=
tableNo
;
memcpy
(
&
pCxt
->
pComCxt
->
csvCxt
.
tableName
,
pTableName
,
sizeof
(
SName
));
pCxt
->
pComCxt
->
csvCxt
.
pLastSqlPos
=
pCxt
->
pSql
;
}
dataBuf
->
numOfTables
=
1
;
dataBuf
->
numOfTables
=
1
;
pCxt
->
totalNum
+=
numOfRows
;
pCxt
->
totalNum
+=
numOfRows
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
parseDataFromFile
(
SInsertParseContext
*
pCxt
,
int16_t
tableNo
,
const
SName
*
pTableName
,
SToken
filePath
,
STableDataBlocks
*
dataBuf
)
{
char
filePathStr
[
TSDB_FILENAME_LEN
]
=
{
0
};
if
(
TK_NK_STRING
==
filePath
.
type
)
{
trimString
(
filePath
.
z
,
filePath
.
n
,
filePathStr
,
sizeof
(
filePathStr
));
}
else
{
strncpy
(
filePathStr
,
filePath
.
z
,
filePath
.
n
);
}
pCxt
->
pComCxt
->
csvCxt
.
fp
=
taosOpenFile
(
filePathStr
,
TD_FILE_READ
|
TD_FILE_STREAM
);
if
(
NULL
==
pCxt
->
pComCxt
->
csvCxt
.
fp
)
{
return
TAOS_SYSTEM_ERROR
(
errno
);
}
return
parseDataFromFileAgain
(
pCxt
,
tableNo
,
pTableName
,
dataBuf
);
}
static
void
destroyInsertParseContextForTable
(
SInsertParseContext
*
pCxt
)
{
static
void
destroyInsertParseContextForTable
(
SInsertParseContext
*
pCxt
)
{
if
(
!
pCxt
->
pComCxt
->
needMultiParse
)
{
taosCloseFile
(
&
pCxt
->
pComCxt
->
csvCxt
.
fp
);
}
taosMemoryFreeClear
(
pCxt
->
pTableMeta
);
taosMemoryFreeClear
(
pCxt
->
pTableMeta
);
destroyBoundColumnInfo
(
&
pCxt
->
tags
);
destroyBoundColumnInfo
(
&
pCxt
->
tags
);
tdDestroySVCreateTbReq
(
&
pCxt
->
createTblReq
);
tdDestroySVCreateTbReq
(
&
pCxt
->
createTblReq
);
...
@@ -1481,7 +1501,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
...
@@ -1481,7 +1501,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"invalid charactor in SQL"
,
sToken
.
z
);
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"invalid charactor in SQL"
,
sToken
.
z
);
}
}
if
(
0
==
pCxt
->
totalNum
&&
(
!
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
)))
{
if
(
0
==
pCxt
->
totalNum
&&
(
!
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
))
&&
!
pCxt
->
pComCxt
->
needMultiParse
)
{
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"no data in sql"
);
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"no data in sql"
);
}
}
break
;
break
;
...
@@ -1536,7 +1557,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
...
@@ -1536,7 +1557,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
autoCreateTbl
=
true
;
autoCreateTbl
=
true
;
}
else
if
(
!
existedUsing
)
{
}
else
if
(
!
existedUsing
)
{
CHECK_CODE
(
getTableMeta
(
pCxt
,
tbNum
,
&
name
,
dbFName
));
CHECK_CODE
(
getTableMeta
(
pCxt
,
tbNum
,
&
name
));
if
(
TSDB_SUPER_TABLE
==
pCxt
->
pTableMeta
->
tableType
)
{
if
(
TSDB_SUPER_TABLE
==
pCxt
->
pTableMeta
->
tableType
)
{
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"insert data into super table is not supported"
);
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"insert data into super table is not supported"
);
}
}
...
@@ -1577,17 +1598,22 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
...
@@ -1577,17 +1598,22 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
if
(
0
==
sToken
.
n
||
(
TK_NK_STRING
!=
sToken
.
type
&&
TK_NK_ID
!=
sToken
.
type
))
{
if
(
0
==
sToken
.
n
||
(
TK_NK_STRING
!=
sToken
.
type
&&
TK_NK_ID
!=
sToken
.
type
))
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"file path is required following keyword FILE"
,
sToken
.
z
);
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"file path is required following keyword FILE"
,
sToken
.
z
);
}
}
CHECK_CODE
(
parseDataFromFile
(
pCxt
,
sToken
,
dataBuf
));
CHECK_CODE
(
parseDataFromFile
(
pCxt
,
tbNum
,
&
name
,
sToken
,
dataBuf
));
pCxt
->
pOutput
->
insertType
=
TSDB_QUERY_TYPE_FILE_INSERT
;
pCxt
->
pOutput
->
insertType
=
TSDB_QUERY_TYPE_FILE_INSERT
;
tbNum
++
;
tbNum
++
;
continue
;
if
(
!
pCxt
->
pComCxt
->
needMultiParse
)
{
continue
;
}
else
{
parserInfo
(
"0x%"
PRIx64
" insert from csv. File is too large, do it in batches."
,
pCxt
->
pComCxt
->
requestId
);
break
;
}
}
}
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"keyword VALUES or FILE is expected"
,
sToken
.
z
);
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"keyword VALUES or FILE is expected"
,
sToken
.
z
);
}
}
qDebug
(
"0x%"
PRIx64
" insert input rows: %d"
,
pCxt
->
pComCxt
->
requestId
,
pCxt
->
totalNum
);
parserInfo
(
"0x%"
PRIx64
" insert input rows: %d"
,
pCxt
->
pComCxt
->
requestId
,
pCxt
->
totalNum
);
if
(
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
))
{
if
(
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
))
{
SParsedDataColInfo
*
tags
=
taosMemoryMalloc
(
sizeof
(
pCxt
->
tags
));
SParsedDataColInfo
*
tags
=
taosMemoryMalloc
(
sizeof
(
pCxt
->
tags
));
...
@@ -1612,6 +1638,26 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
...
@@ -1612,6 +1638,26 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return
buildOutput
(
pCxt
);
return
buildOutput
(
pCxt
);
}
}
static
int32_t
parseInsertBodyAgain
(
SInsertParseContext
*
pCxt
)
{
STableDataBlocks
*
dataBuf
=
NULL
;
CHECK_CODE
(
getTableMeta
(
pCxt
,
pCxt
->
pComCxt
->
csvCxt
.
tableNo
,
&
pCxt
->
pComCxt
->
csvCxt
.
tableName
));
CHECK_CODE
(
getDataBlockFromList
(
pCxt
->
pTableBlockHashObj
,
&
pCxt
->
pTableMeta
->
uid
,
sizeof
(
pCxt
->
pTableMeta
->
uid
),
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
getTableInfo
(
pCxt
->
pTableMeta
).
rowSize
,
pCxt
->
pTableMeta
,
&
dataBuf
,
NULL
,
&
pCxt
->
createTblReq
));
CHECK_CODE
(
parseDataFromFileAgain
(
pCxt
,
pCxt
->
pComCxt
->
csvCxt
.
tableNo
,
&
pCxt
->
pComCxt
->
csvCxt
.
tableName
,
dataBuf
));
if
(
taosEOFFile
(
pCxt
->
pComCxt
->
csvCxt
.
fp
))
{
CHECK_CODE
(
parseInsertBody
(
pCxt
));
pCxt
->
pComCxt
->
needMultiParse
=
false
;
return
TSDB_CODE_SUCCESS
;
}
parserInfo
(
"0x%"
PRIx64
" insert again input rows: %d"
,
pCxt
->
pComCxt
->
requestId
,
pCxt
->
totalNum
);
// merge according to vgId
if
(
taosHashGetSize
(
pCxt
->
pTableBlockHashObj
)
>
0
)
{
CHECK_CODE
(
mergeTableDataBlocks
(
pCxt
->
pTableBlockHashObj
,
pCxt
->
pOutput
->
payloadType
,
&
pCxt
->
pVgDataBlocks
));
}
return
buildOutput
(
pCxt
);
}
// INSERT INTO
// INSERT INTO
// tb_name
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
...
@@ -1621,7 +1667,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
...
@@ -1621,7 +1667,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t
parseInsertSql
(
SParseContext
*
pContext
,
SQuery
**
pQuery
,
SParseMetaCache
*
pMetaCache
)
{
int32_t
parseInsertSql
(
SParseContext
*
pContext
,
SQuery
**
pQuery
,
SParseMetaCache
*
pMetaCache
)
{
SInsertParseContext
context
=
{
SInsertParseContext
context
=
{
.
pComCxt
=
pContext
,
.
pComCxt
=
pContext
,
.
pSql
=
(
char
*
)
pContext
->
pSql
,
.
pSql
=
pContext
->
needMultiParse
?
(
char
*
)
pContext
->
csvCxt
.
pLastSqlPos
:
(
char
*
)
pContext
->
pSql
,
.
msg
=
{.
buf
=
pContext
->
pMsg
,
.
len
=
pContext
->
msgLen
},
.
msg
=
{.
buf
=
pContext
->
pMsg
,
.
len
=
pContext
->
msgLen
},
.
pTableMeta
=
NULL
,
.
pTableMeta
=
NULL
,
.
createTblReq
=
{
0
},
.
createTblReq
=
{
0
},
...
@@ -1691,10 +1737,16 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
...
@@ -1691,10 +1737,16 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
context
.
pOutput
->
payloadType
=
PAYLOAD_TYPE_KV
;
context
.
pOutput
->
payloadType
=
PAYLOAD_TYPE_KV
;
int32_t
code
=
skipInsertInto
(
&
context
.
pSql
,
&
context
.
msg
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
!
context
.
pComCxt
->
needMultiParse
)
{
code
=
parseInsertBody
(
&
context
);
code
=
skipInsertInto
(
&
context
.
pSql
,
&
context
.
msg
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
parseInsertBody
(
&
context
);
}
}
else
{
code
=
parseInsertBodyAgain
(
&
context
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
||
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
if
(
TSDB_CODE_SUCCESS
==
code
||
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
SName
*
pTable
=
taosHashIterate
(
context
.
pTableNameHashObj
,
NULL
);
SName
*
pTable
=
taosHashIterate
(
context
.
pTableNameHashObj
,
NULL
);
while
(
NULL
!=
pTable
)
{
while
(
NULL
!=
pTable
)
{
...
...
source/libs/parser/src/parser.c
浏览文件 @
b9374718
...
@@ -214,6 +214,16 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
...
@@ -214,6 +214,16 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
return
code
;
return
code
;
}
}
void
qDestroyParseContext
(
SParseContext
*
pCxt
)
{
if
(
NULL
==
pCxt
)
{
return
;
}
taosArrayDestroy
(
pCxt
->
pTableMetaPos
);
taosArrayDestroy
(
pCxt
->
pTableVgroupPos
);
taosMemoryFree
(
pCxt
);
}
void
qDestroyQuery
(
SQuery
*
pQueryNode
)
{
nodesDestroyNode
((
SNode
*
)
pQueryNode
);
}
void
qDestroyQuery
(
SQuery
*
pQueryNode
)
{
nodesDestroyNode
((
SNode
*
)
pQueryNode
);
}
int32_t
qExtractResultSchema
(
const
SNode
*
pRoot
,
int32_t
*
numOfCols
,
SSchema
**
pSchema
)
{
int32_t
qExtractResultSchema
(
const
SNode
*
pRoot
,
int32_t
*
numOfCols
,
SSchema
**
pSchema
)
{
...
...
source/libs/parser/test/mockCatalog.cpp
浏览文件 @
b9374718
...
@@ -130,16 +130,16 @@ void generatePerformanceSchema(MockCatalogService* mcs) {
...
@@ -130,16 +130,16 @@ void generatePerformanceSchema(MockCatalogService* mcs) {
* c5 | column | DOUBLE | 8 |
* c5 | column | DOUBLE | 8 |
*/
*/
void
generateTestTables
(
MockCatalogService
*
mcs
,
const
std
::
string
&
db
)
{
void
generateTestTables
(
MockCatalogService
*
mcs
,
const
std
::
string
&
db
)
{
ITableBuilder
&
builder
=
mcs
->
createTableBuilder
(
db
,
"t1"
,
TSDB_NORMAL_TABLE
,
6
)
mcs
->
createTableBuilder
(
db
,
"t1"
,
TSDB_NORMAL_TABLE
,
6
)
.
setPrecision
(
TSDB_TIME_PRECISION_MILLI
)
.
setPrecision
(
TSDB_TIME_PRECISION_MILLI
)
.
setVgid
(
1
)
.
setVgid
(
1
)
.
addColumn
(
"ts"
,
TSDB_DATA_TYPE_TIMESTAMP
)
.
addColumn
(
"ts"
,
TSDB_DATA_TYPE_TIMESTAMP
)
.
addColumn
(
"c1"
,
TSDB_DATA_TYPE_INT
)
.
addColumn
(
"c1"
,
TSDB_DATA_TYPE_INT
)
.
addColumn
(
"c2"
,
TSDB_DATA_TYPE_BINARY
,
20
)
.
addColumn
(
"c2"
,
TSDB_DATA_TYPE_BINARY
,
20
)
.
addColumn
(
"c3"
,
TSDB_DATA_TYPE_BIGINT
)
.
addColumn
(
"c3"
,
TSDB_DATA_TYPE_BIGINT
)
.
addColumn
(
"c4"
,
TSDB_DATA_TYPE_DOUBLE
)
.
addColumn
(
"c4"
,
TSDB_DATA_TYPE_DOUBLE
)
.
addColumn
(
"c5"
,
TSDB_DATA_TYPE_DOUBLE
);
.
addColumn
(
"c5"
,
TSDB_DATA_TYPE_DOUBLE
)
builder
.
done
();
.
done
();
}
}
/*
/*
...
...
source/libs/parser/test/parTestUtil.cpp
浏览文件 @
b9374718
...
@@ -343,7 +343,6 @@ class ParserTestBaseImpl {
...
@@ -343,7 +343,6 @@ class ParserTestBaseImpl {
unique_ptr
<
SQuery
*
,
void
(
*
)(
SQuery
**
)
>
query
((
SQuery
**
)
taosMemoryCalloc
(
1
,
sizeof
(
SQuery
*
)),
destroyQuery
);
unique_ptr
<
SQuery
*
,
void
(
*
)(
SQuery
**
)
>
query
((
SQuery
**
)
taosMemoryCalloc
(
1
,
sizeof
(
SQuery
*
)),
destroyQuery
);
doParseSql
(
&
cxt
,
query
.
get
());
doParseSql
(
&
cxt
,
query
.
get
());
SQuery
*
pQuery
=
*
(
query
.
get
());
if
(
g_dump
)
{
if
(
g_dump
)
{
dump
();
dump
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录