Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8ee8785d
T
TDengine
项目概览
taosdata
/
TDengine
11 个月 前同步成功
通知
1179
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8ee8785d
编写于
1月 25, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-11818] refactor, fix bug in select
上级
b97ab5db
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
40 addition
and
80 deletion
+40
-80
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+0
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+26
-44
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-3
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+5
-5
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+7
-26
未找到文件。
source/dnode/vnode/inc/tsdb.h
浏览文件 @
8ee8785d
...
...
@@ -75,7 +75,6 @@ typedef struct STsdbQueryCond {
}
STsdbQueryCond
;
typedef
struct
{
void
*
pTable
;
TSKEY
lastKey
;
uint64_t
uid
;
}
STableKeyInfo
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
8ee8785d
...
...
@@ -81,7 +81,6 @@ enum {
CHECKINFO_CHOSEN_BOTH
=
2
//for update=2(merge case)
};
typedef
struct
STableCheckInfo
{
uint64_t
tableId
;
TSKEY
lastKey
;
...
...
@@ -258,9 +257,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
for
(
int32_t
j
=
0
;
j
<
gsize
;
++
j
)
{
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
group
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
};
info
.
tableId
=
pKeyInfo
->
uid
;
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
,
.
tableId
=
pKeyInfo
->
uid
};
if
(
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
if
(
info
.
lastKey
==
INT64_MIN
||
info
.
lastKey
<
pTsdbReadHandle
->
window
.
skey
)
{
info
.
lastKey
=
pTsdbReadHandle
->
window
.
skey
;
...
...
@@ -277,12 +274,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
}
// TODO group table according to the tag value.
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
// size_t gsize = taosArrayGetSize(pTableCheckInfo);
// for (int32_t i = 0; i < gsize; ++i) {
// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
// }
taosArraySort
(
pTableCheckInfo
,
tsdbCheckInfoCompar
);
return
pTableCheckInfo
;
}
...
...
@@ -2605,7 +2597,7 @@ static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
break
;
}
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
TSKEY_INITIAL_VAL
,
uid
=
id
};
STableKeyInfo
info
=
{.
lastKey
=
TSKEY_INITIAL_VAL
,
uid
=
id
};
taosArrayPush
(
list
,
&
info
);
}
...
...
@@ -3196,7 +3188,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
if
(
key
<
lastKey
)
{
key
=
lastKey
;
keyInfo
.
pTable
=
pInfo
->
pTable
;
//
keyInfo.pTable = pInfo->pTable;
keyInfo
.
lastKey
=
key
;
pInfo
->
lastKey
=
key
;
...
...
@@ -3210,29 +3202,19 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
}
}
// clear current group, unref unused table
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STableKeyInfo
*
pInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
pGroup
,
i
);
// keyInfo.pTable may be NULL here.
if
(
pInfo
->
pTable
!=
keyInfo
.
pTable
)
{
// tsdbUnRefTable(pInfo->pTable);
}
}
// more than one table in each group, only one table left for each group
if
(
keyInfo
.
pTable
!=
NULL
)
{
totalNumOfTable
++
;
if
(
taosArrayGetSize
(
pGroup
)
==
1
)
{
// do nothing
}
else
{
taosArrayClear
(
pGroup
);
taosArrayPush
(
pGroup
,
&
keyInfo
);
}
}
else
{
// mark all the empty groups, and remove it later
taosArrayDestroy
(
pGroup
);
taosArrayPush
(
emptyGroup
,
&
j
);
}
//
if (keyInfo.pTable != NULL) {
//
totalNumOfTable++;
//
if (taosArrayGetSize(pGroup) == 1) {
//
// do nothing
//
} else {
//
taosArrayClear(pGroup);
//
taosArrayPush(pGroup, &keyInfo);
//
}
//
} else { // mark all the empty groups, and remove it later
//
taosArrayDestroy(pGroup);
//
taosArrayPush(emptyGroup, &j);
//
}
}
// window does not being updated, so set the original
...
...
@@ -3422,11 +3404,13 @@ void filterPrepare(void* expr, void* param) {
}
}
#endif
static
int32_t
tableGroupComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
#if 0
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable* pTable1 = ((STableKeyInfo*) p1)->
pTable
;
STable* pTable2 = ((STableKeyInfo*) p2)->
pTable
;
STable* pTable1 = ((STableKeyInfo*) p1)->
uid
;
STable* pTable2 = ((STableKeyInfo*) p2)->
uid
;
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
...
...
@@ -3474,10 +3458,9 @@ static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *pa
return ret;
}
}
#endif
return
0
;
}
#endif
static
int
tsdbCheckInfoCompar
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(((
STableCheckInfo
*
)
key1
)
->
tableId
<
((
STableCheckInfo
*
)
key2
)
->
tableId
)
{
...
...
@@ -3493,10 +3476,9 @@ static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
void
createTableGroupImpl
(
SArray
*
pGroups
,
SArray
*
pTableList
,
size_t
numOfTables
,
TSKEY
skey
,
STableGroupSupporter
*
pSupp
,
__ext_compar_fn_t
compareFn
)
{
STable
*
pTable
=
taosArrayGetP
(
pTableList
,
0
);
SArray
*
g
=
taosArrayInit
(
16
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info
=
{.
pTable
=
pTable
,
.
lastKey
=
skey
};
STableKeyInfo
info
=
{.
lastKey
=
skey
};
taosArrayPush
(
g
,
&
info
);
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
...
...
@@ -3507,13 +3489,13 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
assert
(
ret
==
0
||
ret
==
-
1
);
if
(
ret
==
0
)
{
STableKeyInfo
info1
=
{.
pTable
=
*
p
,
.
lastKey
=
skey
};
STableKeyInfo
info1
=
{.
lastKey
=
skey
};
taosArrayPush
(
g
,
&
info1
);
}
else
{
taosArrayPush
(
pGroups
,
&
g
);
// current group is ended, start a new group
g
=
taosArrayInit
(
16
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info1
=
{.
pTable
=
*
p
,
.
lastKey
=
skey
};
STableKeyInfo
info1
=
{.
lastKey
=
skey
};
taosArrayPush
(
g
,
&
info1
);
}
}
...
...
@@ -3546,8 +3528,8 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
sup
.
pTagSchema
=
pTagSchema
->
pSchema
;
sup
.
pCols
=
pCols
;
//
taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
//
createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
taosqsort
(
pTableList
->
pData
,
size
,
sizeof
(
STableKeyInfo
),
&
sup
,
tableGroupComparFn
);
createTableGroupImpl
(
pTableGroup
,
pTableList
,
size
,
skey
,
&
sup
,
tableGroupComparFn
);
}
return
pTableGroup
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
8ee8785d
...
...
@@ -38,7 +38,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.
id
str)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
8ee8785d
...
...
@@ -239,8 +239,7 @@ typedef struct STaskIdInfo {
uint64_t
queryId
;
// this is also a request id
uint64_t
subplanId
;
uint64_t
templateId
;
uint64_t
taskId
;
// this is a subplan id
char
*
idstr
;
char
*
str
;
}
STaskIdInfo
;
typedef
struct
SExecTaskInfo
{
...
...
@@ -639,7 +638,6 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
bool
checkNeedToCompressQueryCol
(
SQInfo
*
pQInfo
);
bool
doBuildResCheck
(
SQInfo
*
pQInfo
);
void
setQueryStatus
(
STaskRuntimeEnv
*
pRuntimeEnv
,
int8_t
status
);
bool
onlyQueryTags
(
STaskAttr
*
pQueryAttr
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
8ee8785d
...
...
@@ -18,20 +18,20 @@
#include "executorimpl.h"
#include "planner.h"
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
uint64_t
reqI
d
)
{
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
char
*
i
d
)
{
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
OP_StreamScan
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
qError
(
"failed to find stream scan operator to set the input data block,
reqId:0x%"
PRIx64
,
reqI
d
);
qError
(
"failed to find stream scan operator to set the input data block,
%s"
PRIx64
,
i
d
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
if
(
pOperator
->
numOfDownstream
>
1
)
{
// not handle this in join query
qError
(
"join not supported for stream block scan,
reqId:0x%"
PRIx64
,
reqI
d
);
qError
(
"join not supported for stream block scan,
%s"
PRIx64
,
i
d
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
return
doSetStreamBlock
(
pOperator
->
pDownstream
[
0
],
input
,
reqI
d
);
return
doSetStreamBlock
(
pOperator
->
pDownstream
[
0
],
input
,
i
d
);
}
else
{
SStreamBlockScanInfo
*
pInfo
=
pOperator
->
info
;
tqReadHandleSetMsg
(
pInfo
->
readerHandle
,
input
,
0
);
...
...
@@ -50,7 +50,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
input
,
pTaskInfo
->
id
.
queryId
);
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
input
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s failed to set the stream block data"
,
GET_TASKID
(
pTaskInfo
));
}
else
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8ee8785d
...
...
@@ -5146,7 +5146,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
size_t
totalSources
=
taosArrayGetSize
(
pExchangeInfo
->
pSources
);
if
(
pExchangeInfo
->
current
>=
totalSources
)
{
qDebug
(
"%s all %"
PRIzu
" source(s) are exhausted, total rows:%"
PRIu64
" bytes:%"
PRIu64
", elapsed:%.2f ms"
,
pTaskInfo
->
id
.
idstr
,
totalSources
,
qDebug
(
"%s all %"
PRIzu
" source(s) are exhausted, total rows:%"
PRIu64
" bytes:%"
PRIu64
", elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
)
,
totalSources
,
pExchangeInfo
->
totalRows
,
pExchangeInfo
->
totalSize
,
pExchangeInfo
->
totalElapsed
/
1000
.
0
);
return
NULL
;
}
...
...
@@ -5208,7 +5208,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
int64_t
el
=
taosGetTimestampUs
()
-
startTs
;
pExchangeInfo
->
totalElapsed
+=
el
;
qDebug
(
"%s all %"
PRIzu
" sources are exhausted, total rows: %"
PRIu64
" bytes:%"
PRIu64
", elapsed:%.2f ms"
,
pTaskInfo
->
id
.
idstr
,
totalSources
,
qDebug
(
"%s all %"
PRIzu
" sources are exhausted, total rows: %"
PRIu64
" bytes:%"
PRIu64
", elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
)
,
totalSources
,
pExchangeInfo
->
totalRows
,
pExchangeInfo
->
totalSize
,
pExchangeInfo
->
totalElapsed
/
1000
.
0
);
return
NULL
;
}
else
{
...
...
@@ -7741,11 +7741,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
id
.
taskId
=
taskId
;
char
*
p
=
calloc
(
1
,
128
);
snprintf
(
p
,
128
,
"TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
taskId
,
queryId
);
pTaskInfo
->
id
.
id
str
=
strdup
(
p
);
pTaskInfo
->
id
.
str
=
strdup
(
p
);
return
pTaskInfo
;
}
...
...
@@ -7832,7 +7831,7 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
SArray
*
pa
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
0
,
.
uid
=
uid
};
STableKeyInfo
info
=
{.
lastKey
=
0
,
.
uid
=
uid
};
taosArrayPush
(
pa
,
&
info
);
taosArrayPush
(
groupInfo
.
pGroupList
,
&
pa
);
}
...
...
@@ -8827,33 +8826,15 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
}
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"%s start to free execTask"
,
GET_TASKID
(
pTaskInfo
));
doDestroyTableQueryInfo
(
&
pTaskInfo
->
tableqinfoGroupInfo
);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
tfree
(
pTaskInfo
->
sql
);
tfree
(
pTaskInfo
->
id
.
str
);
qDebug
(
"%s execTask is freed"
,
GET_TASKID
(
pTaskInfo
));
tfree
(
pTaskInfo
);
}
bool
doBuildResCheck
(
SQInfo
*
pQInfo
)
{
bool
buildRes
=
false
;
pthread_mutex_lock
(
&
pQInfo
->
lock
);
pQInfo
->
dataReady
=
QUERY_RESULT_READY
;
buildRes
=
needBuildResAfterQueryComplete
(
pQInfo
);
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
// put into task to be executed.
assert
(
pQInfo
->
owner
==
taosGetSelfPthreadId
());
pQInfo
->
owner
=
0
;
pthread_mutex_unlock
(
&
pQInfo
->
lock
);
// used in retrieve blocking model.
tsem_post
(
&
pQInfo
->
ready
);
return
buildRes
;
tfree
(
pTaskInfo
);
}
static
void
doSetTagValueToResultBuf
(
char
*
output
,
const
char
*
val
,
int16_t
type
,
int16_t
bytes
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录