Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cc36796f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
cc36796f
编写于
11月 29, 2022
作者:
D
dapan1121
提交者:
GitHub
11月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18524 from taosdata/fix/TD-18639
enh: support multiple query groups in exchange operator
上级
d266d7a2
2bfcabda
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
66 addition
and
27 deletion
+66
-27
include/libs/parser/parser.h
include/libs/parser/parser.h
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+14
-2
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+29
-11
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+1
-1
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+13
-6
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+1
-1
source/libs/parser/src/parInsertSql.c
source/libs/parser/src/parInsertSql.c
+1
-3
source/util/src/tarray.c
source/util/src/tarray.c
+3
-0
source/util/src/tlog.c
source/util/src/tlog.c
+3
-2
未找到文件。
include/libs/parser/parser.h
浏览文件 @
cc36796f
...
...
@@ -29,7 +29,7 @@ struct SMetaData;
typedef
struct
SStmtCallback
{
TAOS_STMT
*
pStmt
;
int32_t
(
*
getTbNameFn
)(
TAOS_STMT
*
,
char
**
);
int32_t
(
*
setInfoFn
)(
TAOS_STMT
*
,
STableMeta
*
,
void
*
,
char
*
,
bool
,
SHashObj
*
,
SHashObj
*
,
const
char
*
);
int32_t
(
*
setInfoFn
)(
TAOS_STMT
*
,
STableMeta
*
,
void
*
,
SName
*
,
bool
,
SHashObj
*
,
SHashObj
*
,
const
char
*
);
int32_t
(
*
getExecInfoFn
)(
TAOS_STMT
*
,
SHashObj
**
,
SHashObj
**
);
}
SStmtCallback
;
...
...
source/client/src/clientImpl.c
浏览文件 @
cc36796f
...
...
@@ -2293,10 +2293,16 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
taosAsyncQueryImpl
(
*
(
int64_t
*
)
taos
,
sql
,
syncQueryFn
,
param
,
validateOnly
);
tsem_wait
(
&
param
->
sem
);
SRequestObj
*
pRequest
=
NULL
;
if
(
param
->
pRequest
!=
NULL
)
{
param
->
pRequest
->
syncQuery
=
true
;
pRequest
=
param
->
pRequest
;
}
else
{
taosMemoryFree
(
param
);
}
return
param
->
pRequest
;
return
pRequest
;
}
TAOS_RES
*
taosQueryImplWithReqid
(
TAOS
*
taos
,
const
char
*
sql
,
bool
validateOnly
,
int64_t
reqid
)
{
...
...
@@ -2310,8 +2316,14 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
taosAsyncQueryImplWithReqid
(
*
(
int64_t
*
)
taos
,
sql
,
syncQueryFn
,
param
,
validateOnly
,
reqid
);
tsem_wait
(
&
param
->
sem
);
SRequestObj
*
pRequest
=
NULL
;
if
(
param
->
pRequest
!=
NULL
)
{
param
->
pRequest
->
syncQuery
=
true
;
pRequest
=
param
->
pRequest
;
}
else
{
taosMemoryFree
(
param
);
}
return
param
->
pRequest
;
return
pRequest
;
}
source/client/src/clientStmt.c
浏览文件 @
cc36796f
...
...
@@ -152,9 +152,12 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtUpdateBindInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbF
Name
,
const
char
*
sTableName
,
bool
autoCreateTbl
)
{
int32_t
stmtUpdateBindInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
SName
*
tb
Name
,
const
char
*
sTableName
,
bool
autoCreateTbl
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
tNameExtractFullName
(
tbName
,
tbFName
);
memcpy
(
&
pStmt
->
bInfo
.
sname
,
tbName
,
sizeof
(
*
tbName
));
strncpy
(
pStmt
->
bInfo
.
tbFName
,
tbFName
,
sizeof
(
pStmt
->
bInfo
.
tbFName
)
-
1
);
pStmt
->
bInfo
.
tbFName
[
sizeof
(
pStmt
->
bInfo
.
tbFName
)
-
1
]
=
0
;
...
...
@@ -178,11 +181,11 @@ int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockH
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtUpdateInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
char
*
tbF
Name
,
bool
autoCreateTbl
,
int32_t
stmtUpdateInfo
(
TAOS_STMT
*
stmt
,
STableMeta
*
pTableMeta
,
void
*
tags
,
SName
*
tb
Name
,
bool
autoCreateTbl
,
SHashObj
*
pVgHash
,
SHashObj
*
pBlockHash
,
const
char
*
sTableName
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_ERR_RET
(
stmtUpdateBindInfo
(
stmt
,
pTableMeta
,
tags
,
tb
F
Name
,
sTableName
,
autoCreateTbl
));
STMT_ERR_RET
(
stmtUpdateBindInfo
(
stmt
,
pTableMeta
,
tags
,
tbName
,
sTableName
,
autoCreateTbl
));
STMT_ERR_RET
(
stmtUpdateExecInfo
(
stmt
,
pVgHash
,
pBlockHash
,
autoCreateTbl
));
pStmt
->
sql
.
autoCreateTbl
=
autoCreateTbl
;
...
...
@@ -773,7 +776,9 @@ int stmtAddBatch(TAOS_STMT* stmt) {
int
stmtUpdateTableUid
(
STscStmt
*
pStmt
,
SSubmitRsp
*
pRsp
)
{
tscDebug
(
"stmt start to update tbUid, blockNum: %d"
,
pRsp
->
nBlocks
);
size_t
keyLen
=
0
;
int32_t
code
=
0
;
int32_t
finalCode
=
0
;
size_t
keyLen
=
0
;
STableDataBlocks
**
pIter
=
taosHashIterate
(
pStmt
->
exec
.
pBlockHash
,
NULL
);
while
(
pIter
)
{
STableDataBlocks
*
pBlock
=
*
pIter
;
...
...
@@ -809,10 +814,20 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
}
else
{
tscDebug
(
"table %s not found in submit rsp, will update from catalog"
,
pStmt
->
bInfo
.
tbFName
);
if
(
NULL
==
pStmt
->
pCatalog
)
{
STMT_ERR_RET
(
catalogGetHandle
(
pStmt
->
taos
->
pAppInfo
->
clusterId
,
&
pStmt
->
pCatalog
));
code
=
catalogGetHandle
(
pStmt
->
taos
->
pAppInfo
->
clusterId
,
&
pStmt
->
pCatalog
);
if
(
code
)
{
pIter
=
taosHashIterate
(
pStmt
->
exec
.
pBlockHash
,
pIter
);
finalCode
=
code
;
continue
;
}
}
STMT_ERR_RET
(
stmtCreateRequest
(
pStmt
));
code
=
stmtCreateRequest
(
pStmt
);
if
(
code
)
{
pIter
=
taosHashIterate
(
pStmt
->
exec
.
pBlockHash
,
pIter
);
finalCode
=
code
;
continue
;
}
STableMeta
*
pTableMeta
=
NULL
;
SRequestConnInfo
conn
=
{.
pTrans
=
pStmt
->
taos
->
pAppInfo
->
pTransporter
,
...
...
@@ -823,20 +838,23 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
taos_free_result
(
pStmt
->
exec
.
pRequest
);
pStmt
->
exec
.
pRequest
=
NULL
;
if
(
TSDB_CODE_PAR_TABLE_NOT_EXIST
==
code
)
{
tscDebug
(
"tb %s not exist"
,
pStmt
->
bInfo
.
tbFName
);
return
TSDB_CODE_SUCCESS
;
if
(
code
||
NULL
==
pTableMeta
)
{
pIter
=
taosHashIterate
(
pStmt
->
exec
.
pBlockHash
,
pIter
);
finalCode
=
code
;
taosMemoryFree
(
pTableMeta
);
continue
;
}
pMeta
->
uid
=
pTableMeta
->
uid
;
pStmt
->
bInfo
.
tbUid
=
pTableMeta
->
uid
;
taosMemoryFree
(
pTableMeta
);
}
pIter
=
taosHashIterate
(
pStmt
->
exec
.
pBlockHash
,
pIter
);
}
return
TSDB_CODE_SUCCESS
;
return
finalCode
;
}
int
stmtExec
(
TAOS_STMT
*
stmt
)
{
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
cc36796f
...
...
@@ -1205,7 +1205,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
stbCtx
.
pName
=
&
stbName
;
STableMeta
*
stbMeta
=
NULL
;
ctgReadTbMetaFromCache
(
pCtg
,
&
stbCtx
,
&
stbMeta
);
(
void
)
ctgReadTbMetaFromCache
(
pCtg
,
&
stbCtx
,
&
stbMeta
);
if
(
stbMeta
&&
stbMeta
->
sversion
>=
pOut
->
tbMeta
->
sversion
)
{
ctgDebug
(
"use cached stb meta, tbName:%s"
,
tNameGetTableName
(
pName
));
exist
=
1
;
...
...
source/libs/command/src/explain.c
浏览文件 @
cc36796f
...
...
@@ -782,13 +782,18 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
{
SExchangePhysiNode
*
pExchNode
=
(
SExchangePhysiNode
*
)
pNode
;
SExplainGroup
*
group
=
taosHashGet
(
ctx
->
groupHash
,
&
pExchNode
->
srcStartGroupId
,
sizeof
(
pExchNode
->
srcStartGroupId
));
if
(
NULL
==
group
)
{
qError
(
"exchange src group %d not in groupHash"
,
pExchNode
->
srcStartGroupId
);
QRY_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
int32_t
nodeNum
=
0
;
for
(
int32_t
i
=
pExchNode
->
srcStartGroupId
;
i
<=
pExchNode
->
srcEndGroupId
;
++
i
)
{
SExplainGroup
*
group
=
taosHashGet
(
ctx
->
groupHash
,
&
pExchNode
->
srcStartGroupId
,
sizeof
(
pExchNode
->
srcStartGroupId
));
if
(
NULL
==
group
)
{
qError
(
"exchange src group %d not in groupHash"
,
pExchNode
->
srcStartGroupId
);
QRY_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
nodeNum
+=
group
->
nodeNum
;
}
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_EXCHANGE_FORMAT
,
pExchNode
->
singleChannel
?
1
:
group
->
nodeNum
);
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_EXCHANGE_FORMAT
,
pExchNode
->
singleChannel
?
1
:
nodeNum
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
&
tlen
));
...
...
@@ -819,7 +824,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
}
QRY_ERR_RET
(
qExplainAppendGroupResRows
(
ctx
,
pExchNode
->
srcStartGroupId
,
level
+
1
,
pExchNode
->
singleChannel
));
for
(
int32_t
i
=
pExchNode
->
srcStartGroupId
;
i
<=
pExchNode
->
srcEndGroupId
;
++
i
)
{
QRY_ERR_RET
(
qExplainAppendGroupResRows
(
ctx
,
i
,
level
+
1
,
pExchNode
->
singleChannel
));
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
{
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
cc36796f
...
...
@@ -1407,7 +1407,7 @@ SNode* createShowTableTagsStmt(SAstCreateContext* pCxt, SNode* pTbName, SNode* p
SNode
*
createCreateUserStmt
(
SAstCreateContext
*
pCxt
,
SToken
*
pUserName
,
const
SToken
*
pPassword
,
int8_t
sysinfo
)
{
CHECK_PARSER_STATUS
(
pCxt
);
char
password
[
TSDB_USET_PASSWORD_LEN
]
=
{
0
};
char
password
[
TSDB_USET_PASSWORD_LEN
+
3
]
=
{
0
};
if
(
!
checkUserName
(
pCxt
,
pUserName
)
||
!
checkPassword
(
pCxt
,
pPassword
,
password
))
{
return
NULL
;
}
...
...
source/libs/parser/src/parInsertSql.c
浏览文件 @
cc36796f
...
...
@@ -1529,9 +1529,7 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
memcpy
(
tags
,
&
pCxt
->
tags
,
sizeof
(
pCxt
->
tags
));
SStmtCallback
*
pStmtCb
=
pCxt
->
pComCxt
->
pStmtCb
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
tNameExtractFullName
(
&
pStmt
->
targetTableName
,
tbFName
);
int32_t
code
=
(
*
pStmtCb
->
setInfoFn
)(
pStmtCb
->
pStmt
,
pStmt
->
pTableMeta
,
tags
,
tbFName
,
pStmt
->
usingTableProcessing
,
int32_t
code
=
(
*
pStmtCb
->
setInfoFn
)(
pStmtCb
->
pStmt
,
pStmt
->
pTableMeta
,
tags
,
&
pStmt
->
targetTableName
,
pStmt
->
usingTableProcessing
,
pStmt
->
pVgroupsHashObj
,
pStmt
->
pTableBlockHashObj
,
pStmt
->
usingTableName
.
tname
);
memset
(
&
pCxt
->
tags
,
0
,
sizeof
(
pCxt
->
tags
));
...
...
source/util/src/tarray.c
浏览文件 @
cc36796f
...
...
@@ -205,6 +205,9 @@ void* taosArrayPop(SArray* pArray) {
}
void
*
taosArrayGet
(
const
SArray
*
pArray
,
size_t
index
)
{
if
(
NULL
==
pArray
)
{
return
NULL
;
}
assert
(
index
<
pArray
->
size
);
return
TARRAY_GET_ELEM
(
pArray
,
index
);
}
...
...
source/util/src/tlog.c
浏览文件 @
cc36796f
...
...
@@ -21,7 +21,7 @@
#define LOG_MAX_LINE_SIZE (1024)
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
#define LOG_MAX_LINE_DUMP_SIZE (
65
* 1024)
#define LOG_MAX_LINE_DUMP_SIZE (
1024
* 1024)
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
#define LOG_FILE_NAME_LEN 300
...
...
@@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
if
(
!
osLogSpaceAvailable
())
return
;
if
(
!
(
dflag
&
DEBUG_FILE
)
&&
!
(
dflag
&
DEBUG_SCREEN
))
return
;
char
buffer
[
LOG_MAX_LINE_DUMP_BUFFER_SIZE
]
;
char
*
buffer
=
taosMemoryMalloc
(
LOG_MAX_LINE_DUMP_BUFFER_SIZE
)
;
int32_t
len
=
taosBuildLogHead
(
buffer
,
flags
);
va_list
argpointer
;
...
...
@@ -509,6 +509,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
buffer
[
len
]
=
0
;
taosPrintLogImp
(
level
,
dflag
,
buffer
,
len
);
taosMemoryFree
(
buffer
);
}
void
taosDumpData
(
unsigned
char
*
msg
,
int32_t
len
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录