Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
069d715f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
069d715f
编写于
1月 21, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-11818]update log, fix bug in select * from super_table.
上级
4a60cd96
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
93 addition
and
77 deletion
+93
-77
include/common/tmsg.h
include/common/tmsg.h
+2
-2
include/libs/planner/planner.h
include/libs/planner/planner.h
+1
-1
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+31
-35
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+1
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+19
-7
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+18
-2
source/libs/planner/test/phyPlanTests.cpp
source/libs/planner/test/phyPlanTests.cpp
+3
-3
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+9
-9
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+5
-5
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+3
-12
未找到文件。
include/common/tmsg.h
浏览文件 @
069d715f
...
...
@@ -696,13 +696,13 @@ typedef struct SVgroupInfo {
uint32_t
hashEnd
;
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupInfo
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupMsg
;
typedef
struct
{
...
...
include/libs/planner/planner.h
浏览文件 @
069d715f
...
...
@@ -178,7 +178,7 @@ struct SQueryNode;
* @param requestId
* @return
*/
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pQueryInfo
,
struct
SQueryDag
**
pDag
,
uint64_t
requestId
);
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pNode
,
struct
SQueryDag
**
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
,
SArray
*
pNodeList
,
uint64_t
requestId
);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
...
...
source/client/inc/clientInt.h
浏览文件 @
069d715f
...
...
@@ -136,6 +136,7 @@ typedef struct SReqResultInfo {
TAOS_ROW
row
;
char
**
pCol
;
uint32_t
numOfRows
;
uint64_t
totalRows
;
uint32_t
current
;
bool
completed
;
}
SReqResultInfo
;
...
...
source/client/src/clientImpl.c
浏览文件 @
069d715f
...
...
@@ -25,9 +25,9 @@
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
static
void
setQueryResult
By
Rsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
);
static
void
setQueryResult
From
Rsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
);
static
bool
stringLengthCheck
(
const
char
*
str
,
size_t
maxsize
)
{
static
bool
stringLengthCheck
(
const
char
*
str
,
size_t
maxsize
)
{
if
(
str
==
NULL
)
{
return
false
;
}
...
...
@@ -59,7 +59,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
static
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
static
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SDataBlockSchema
*
pDataBlockSchema
);
static
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
if
(
taos_init
()
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -202,43 +202,38 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQueryNode
*
pQueryNode
,
SQueryDag
**
pDag
)
{
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQueryNode
*
pQueryNode
,
SQueryDag
**
pDag
,
SArray
*
pNodeList
)
{
pRequest
->
type
=
pQueryNode
->
type
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
int32_t
code
=
qCreateQueryDag
(
pQueryNode
,
pDag
,
pRequest
->
requestId
);
SSchema
*
pSchema
=
NULL
;
int32_t
numOfCols
=
0
;
int32_t
code
=
qCreateQueryDag
(
pQueryNode
,
pDag
,
&
pSchema
,
&
numOfCols
,
pNodeList
,
pRequest
->
requestId
);
if
(
code
!=
0
)
{
return
code
;
}
if
(
pQueryNode
->
type
==
TSDB_SQL_SELECT
)
{
SArray
*
pa
=
taosArrayGetP
((
*
pDag
)
->
pSubplans
,
0
);
SSubplan
*
pPlan
=
taosArrayGetP
(
pa
,
0
);
SDataBlockSchema
*
pDataBlockSchema
=
&
(
pPlan
->
pDataSink
->
schema
);
setResSchemaInfo
(
pResInfo
,
pDataBlockSchema
);
setResSchemaInfo
(
&
pRequest
->
body
.
resInfo
,
pSchema
,
numOfCols
);
pRequest
->
type
=
TDMT_VND_QUERY
;
}
return
code
;
}
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
S
DataBlockSchema
*
pDataBlockSchema
)
{
assert
(
p
DataBlockSchema
!=
NULL
&&
pDataBlockSchema
->
numOfCols
>
0
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
S
Schema
*
pSchema
,
int32_t
numOfCols
)
{
assert
(
p
Schema
!=
NULL
&&
numOfCols
>
0
);
pResInfo
->
numOfCols
=
pDataBlockSchema
->
numOfCols
;
pResInfo
->
fields
=
calloc
(
pDataBlockSchema
->
numOfCols
,
sizeof
(
pDataBlockSchema
->
pSchema
[
0
]));
pResInfo
->
numOfCols
=
numOfCols
;
pResInfo
->
fields
=
calloc
(
numOfCols
,
sizeof
(
pSchema
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pResInfo
->
numOfCols
;
++
i
)
{
SSchema
*
pSchema
=
&
pDataBlockSchema
->
pSchema
[
i
];
pResInfo
->
fields
[
i
].
bytes
=
pSchema
->
bytes
;
pResInfo
->
fields
[
i
].
type
=
pSchema
->
type
;
tstrncpy
(
pResInfo
->
fields
[
i
].
name
,
pSchema
->
name
,
tListLen
(
pResInfo
->
fields
[
i
].
name
));
pResInfo
->
fields
[
i
].
bytes
=
pSchema
[
i
].
bytes
;
pResInfo
->
fields
[
i
].
type
=
pSchema
[
i
].
type
;
tstrncpy
(
pResInfo
->
fields
[
i
].
name
,
pSchema
[
i
].
name
,
tListLen
(
pResInfo
->
fields
[
i
].
name
));
}
}
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
)
{
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
SArray
*
pNodeList
)
{
if
(
TSDB_SQL_INSERT
==
pRequest
->
type
||
TSDB_SQL_CREATE_TABLE
==
pRequest
->
type
)
{
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
...
...
@@ -256,14 +251,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return
pRequest
->
code
;
}
SArray
*
execNode
=
taosArrayInit
(
4
,
sizeof
(
SQueryNodeAddr
));
SQueryNodeAddr
addr
=
{.
numOfEps
=
1
,
.
inUse
=
0
,
.
nodeId
=
2
};
addr
.
epAddr
[
0
].
port
=
7100
;
strcpy
(
addr
.
epAddr
[
0
].
fqdn
,
"localhost"
);
taosArrayPush
(
execNode
,
&
addr
);
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
execNode
,
pDag
,
&
pRequest
->
body
.
pQueryJob
);
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
pQueryJob
);
}
typedef
struct
tmq_t
tmq_t
;
...
...
@@ -399,7 +387,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQueryNode
,
&
pRequest
->
body
.
pDag
,
pRequest
->
requestId
),
_return
);
SSchema
*
schema
=
NULL
;
int32_t
numOfCols
=
0
;
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQueryNode
,
&
pRequest
->
body
.
pDag
,
&
schema
,
&
numOfCols
,
NULL
,
pRequest
->
requestId
),
_return
);
pStr
=
qDagToString
(
pRequest
->
body
.
pDag
);
if
(
pStr
==
NULL
)
{
...
...
@@ -492,6 +482,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQueryNode
=
NULL
;
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
terrno
=
TSDB_CODE_SUCCESS
;
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
...
...
@@ -500,8 +491,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
if
(
qIsDdlQuery
(
pQueryNode
))
{
CHECK_CODE_GOTO
(
execDdlQuery
(
pRequest
,
pQueryNode
),
_return
);
}
else
{
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQueryNode
,
&
pRequest
->
body
.
pDag
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
),
_return
);
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQueryNode
,
&
pRequest
->
body
.
pDag
,
pNodeList
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
,
pNodeList
),
_return
);
pRequest
->
code
=
terrno
;
}
...
...
@@ -719,13 +710,17 @@ void* doFetchRow(SRequestObj* pRequest) {
return
NULL
;
}
int32_t
code
=
scheduleFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pRequest
->
body
.
resInfo
.
pData
);
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
int32_t
code
=
scheduleFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pResInfo
->
pData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
code
;
return
NULL
;
}
setQueryResultByRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pRequest
->
body
.
resInfo
.
pData
);
setQueryResultFromRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pResInfo
->
pData
);
tscDebug
(
"0x%"
PRIx64
" fetch results, numOfRows:%d total Rows:%"
PRId64
", complete:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pResInfo
->
numOfRows
,
pResInfo
->
totalRows
,
pResInfo
->
completed
,
pRequest
->
requestId
);
if
(
pResultInfo
->
numOfRows
==
0
)
{
return
NULL
;
}
...
...
@@ -855,7 +850,7 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
pthread_mutex_unlock
(
&
pTscObj
->
mutex
);
}
void
setQueryResult
By
Rsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
)
{
void
setQueryResult
From
Rsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
)
{
assert
(
pResultInfo
!=
NULL
&&
pRsp
!=
NULL
);
pResultInfo
->
pRspMsg
=
(
const
char
*
)
pRsp
;
...
...
@@ -864,5 +859,6 @@ void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* p
pResultInfo
->
current
=
0
;
pResultInfo
->
completed
=
(
pRsp
->
completed
==
1
);
pResultInfo
->
totalRows
+=
pResultInfo
->
numOfRows
;
setResultDataPtr
(
pResultInfo
,
pResultInfo
->
fields
,
pResultInfo
->
numOfCols
,
pResultInfo
->
numOfRows
);
}
source/libs/planner/inc/plannerInt.h
浏览文件 @
069d715f
...
...
@@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
*/
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
uint64_t
requestId
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
SArray
*
pNodeList
,
uint64_t
requestId
);
void
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SDownstreamSource
*
pSource
);
int32_t
subPlanToString
(
const
SSubplan
*
pPhyNode
,
char
**
str
,
int32_t
*
len
);
int32_t
stringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
069d715f
...
...
@@ -261,14 +261,14 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
return
;
}
static
uint64_t
splitSubplanByTable
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
SVgroupsInfo
*
pVgroupList
=
pTable
->
pMeta
->
vgroupList
;
static
uint64_t
splitSubplanByTable
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
Info
)
{
SVgroupsInfo
*
pVgroupList
=
pTable
Info
->
pMeta
->
vgroupList
;
for
(
int32_t
i
=
0
;
i
<
pVgroupList
->
numOfVgroups
;
++
i
)
{
STORE_CURRENT_SUBPLAN
(
pCxt
);
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_SCAN
);
subplan
->
msgType
=
TDMT_VND_QUERY
;
vgroupMsgToEpSet
(
&
(
pTable
->
pMeta
->
vgroupList
->
vgroups
[
i
]),
&
subplan
->
execNode
);
subplan
->
pNode
=
createMultiTableScanNode
(
pPlanNode
,
pTable
);
vgroupMsgToEpSet
(
&
(
pTable
Info
->
pMeta
->
vgroupList
->
vgroups
[
i
]),
&
subplan
->
execNode
);
subplan
->
pNode
=
createMultiTableScanNode
(
pPlanNode
,
pTable
Info
);
subplan
->
pDataSink
=
createDataDispatcher
(
pCxt
,
pPlanNode
,
subplan
->
pNode
);
RECOVERY_CURRENT_SUBPLAN
(
pCxt
);
}
...
...
@@ -384,18 +384,19 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
subplan
->
msgType
=
TDMT_VND_QUERY
;
subplan
->
pNode
=
createPhyNode
(
pCxt
,
pRoot
);
subplan
->
pDataSink
=
createDataDispatcher
(
pCxt
,
pRoot
,
subplan
->
pNode
);
subplan
->
pDataSink
=
createDataDispatcher
(
pCxt
,
pRoot
,
subplan
->
pNode
);
}
// todo deal subquery
}
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
uint64_t
requestId
)
{
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
SArray
*
pNodeList
,
uint64_t
requestId
)
{
TRY
(
TSDB_MAX_TAG_CONDITIONS
)
{
SPlanContext
context
=
{
.
pCatalog
=
pCatalog
,
.
pDag
=
validPointer
(
calloc
(
1
,
sizeof
(
SQueryDag
))),
.
pCurrentSubplan
=
NULL
,
.
nextId
=
{.
queryId
=
requestId
},
//The unsigned Id starting from 1 would be better
.
nextId
=
{.
queryId
=
requestId
,
.
subplanId
=
1
,
.
templateId
=
1
},
};
*
pDag
=
context
.
pDag
;
...
...
@@ -408,6 +409,17 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
terrno
=
code
;
return
TSDB_CODE_FAILED
;
}
END_TRY
// traverse the dag again to acquire the execution node.
if
(
pNodeList
!=
NULL
)
{
SArray
**
pSubLevel
=
taosArrayGetLast
((
*
pDag
)
->
pSubplans
);
size_t
num
=
taosArrayGetSize
(
*
pSubLevel
);
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
SSubplan
*
pPlan
=
taosArrayGetP
(
*
pSubLevel
,
j
);
taosArrayPush
(
pNodeList
,
&
pPlan
->
execNode
);
}
}
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/planner/src/planner.c
浏览文件 @
069d715f
...
...
@@ -16,6 +16,8 @@
#include "parser.h"
#include "plannerInt.h"
static
void
extractResSchema
(
struct
SQueryDag
*
const
*
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
);
static
void
destroyDataSinkNode
(
SDataSink
*
pSinkNode
)
{
if
(
pSinkNode
==
NULL
)
{
return
;
...
...
@@ -56,7 +58,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
tfree
(
pDag
);
}
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pNode
,
struct
SQueryDag
**
pDag
,
uint64_t
requestId
)
{
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pNode
,
struct
SQueryDag
**
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
,
SArray
*
pNodeList
,
uint64_t
requestId
)
{
SQueryPlanNode
*
pLogicPlan
;
int32_t
code
=
createQueryPlan
(
pNode
,
&
pLogicPlan
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
...
...
@@ -76,17 +78,31 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
return
code
;
}
code
=
createDag
(
pLogicPlan
,
NULL
,
pDag
,
requestId
);
code
=
createDag
(
pLogicPlan
,
NULL
,
pDag
,
pNodeList
,
requestId
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
destroyQueryPlan
(
pLogicPlan
);
qDestroyQueryDag
(
*
pDag
);
return
code
;
}
extractResSchema
(
pDag
,
pResSchema
,
numOfCols
);
destroyQueryPlan
(
pLogicPlan
);
return
TSDB_CODE_SUCCESS
;
}
void
extractResSchema
(
struct
SQueryDag
*
const
*
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
)
{
// extract the final result schema
SArray
*
pTopSubplan
=
taosArrayGetP
((
*
pDag
)
->
pSubplans
,
0
);
SSubplan
*
pPlan
=
taosArrayGetP
(
pTopSubplan
,
0
);
SDataBlockSchema
*
pDataBlockSchema
=
&
(
pPlan
->
pDataSink
->
schema
);
*
numOfCols
=
pDataBlockSchema
->
numOfCols
;
*
pResSchema
=
calloc
(
pDataBlockSchema
->
numOfCols
,
sizeof
(
SSchema
));
memcpy
((
*
pResSchema
),
pDataBlockSchema
->
pSchema
,
pDataBlockSchema
->
numOfCols
*
sizeof
(
SSchema
));
}
void
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SDownstreamSource
*
pSource
)
{
setSubplanExecutionNode
(
subplan
,
templateId
,
pSource
);
}
...
...
source/libs/planner/test/phyPlanTests.cpp
浏览文件 @
069d715f
...
...
@@ -61,7 +61,7 @@ protected:
int32_t
run
()
{
SQueryDag
*
dag
=
nullptr
;
uint64_t
requestId
=
20
;
int32_t
code
=
createDag
(
logicPlan_
.
get
(),
nullptr
,
&
dag
,
requestId
);
int32_t
code
=
createDag
(
logicPlan_
.
get
(),
nullptr
,
&
dag
,
NULL
,
requestId
);
dag_
.
reset
(
dag
);
return
code
;
}
...
...
@@ -78,9 +78,9 @@ protected:
SQueryDag
*
dag
=
nullptr
;
uint64_t
requestId
=
20
;
SSchema
*
schema
=
NULL
;
u
int32_t
numOfOutput
=
0
;
int32_t
numOfOutput
=
0
;
code
=
qCreateQueryDag
(
query
,
&
dag
,
requestId
);
code
=
qCreateQueryDag
(
query
,
&
dag
,
&
schema
,
&
numOfOutput
,
nullptr
,
requestId
);
dag_
.
reset
(
dag
);
return
code
;
}
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
069d715f
...
...
@@ -171,17 +171,17 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
069d715f
...
...
@@ -146,12 +146,12 @@ typedef struct SSchJob {
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_ELOG(param, ...) qError("QID:
0x
%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:
0x
%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) qWarn("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:
0x
%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:
0x
%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) qWarn("QID:
0x
%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
069d715f
...
...
@@ -650,15 +650,11 @@ _return:
SCH_RET
(
code
);
}
int32_t
schProcessOnDataFetched
(
SSchJob
*
job
)
{
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
tsem_post
(
&
job
->
rspSem
);
}
// Note: no more error processing, handled in function internal
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
)
{
bool
needRetry
=
false
;
...
...
@@ -882,7 +878,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
SCH_ERR_JRET
(
schProcessOnDataFetched
(
pJob
));
break
;
}
case
TDMT_VND_DROP_TASK
:
{
...
...
@@ -892,7 +887,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
default:
SCH_TASK_ELOG
(
"unknown rsp msg, type:%d, status:%d"
,
msgType
,
SCH_GET_TASK_STATUS
(
pTask
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -935,8 +929,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
}
pTask
=
*
task
;
SCH_TASK_DLOG
(
"rsp msg received, type:%d, code:%x"
,
msgType
,
rspCode
);
SCH_TASK_DLOG
(
"rsp msg received, type:%s, code:%s"
,
TMSG_INFO
(
msgType
),
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
));
...
...
@@ -1562,8 +1555,8 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if
(
NULL
==
pJob
||
NULL
==
pData
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
...
...
@@ -1609,7 +1602,6 @@ _return:
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
res
,
*
pData
,
NULL
))
{
continue
;
}
...
...
@@ -1628,8 +1620,7 @@ _return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_JOB_DLOG
(
"fetch done, code:%x"
,
code
);
SCH_JOB_DLOG
(
"fetch done, code:%s"
,
tstrerror
(
code
));
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_RET
(
code
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录