Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8708bf01
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8708bf01
编写于
1月 26, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/mnode
上级
2c3f35e7
36cbe6ad
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
57 addition
and
132 deletion
+57
-132
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+1
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+41
-94
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-3
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+5
-5
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+8
-27
未找到文件。
source/dnode/vnode/inc/tsdb.h
浏览文件 @
8708bf01
...
...
@@ -75,7 +75,6 @@ typedef struct STsdbQueryCond {
}
STsdbQueryCond
;
typedef
struct
{
void
*
pTable
;
TSKEY
lastKey
;
uint64_t
uid
;
}
STableKeyInfo
;
...
...
@@ -141,7 +140,7 @@ bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
*/
int32_t
tsdbQuerySTableByTagCond
(
STsdb
*
tsdb
,
uint64_t
uid
,
TSKEY
skey
,
const
char
*
pTagCond
,
size_t
len
,
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
,
uint64_t
reqId
);
SColIndex
*
pColIndex
,
int32_t
numOfCols
,
uint64_t
reqId
,
uint64_t
taskId
);
/**
* get num of rows in mem table
*
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
8708bf01
...
...
@@ -81,7 +81,6 @@ enum {
CHECKINFO_CHOSEN_BOTH
=
2
//for update=2(merge case)
};
typedef
struct
STableCheckInfo
{
uint64_t
tableId
;
TSKEY
lastKey
;
...
...
@@ -209,31 +208,6 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
return
pLocalIdList
;
}
static
void
tsdbMayTakeMemSnapshot
(
STsdbReadHandle
*
pTsdbReadHandle
,
SArray
*
psTable
)
{
// assert(pTsdbReadHandle != NULL && pTsdbReadHandle->pMemRef != NULL);
//
// STsdbMemTable* pMemRef = pTsdbReadHandle->pMemRef;
// if (pTsdbReadHandle->pMemRef->ref++ == 0) {
// tsdbTakeMemSnapshot(pTsdbReadHandle->pTsdb, &(pMemRef->snapshot), psTable);
// }
//
// taosArrayDestroy(psTable);
}
static
void
tsdbMayUnTakeMemSnapshot
(
STsdbReadHandle
*
pTsdbReadHandle
)
{
// assert(pTsdbReadHandle != NULL);
// STsdbMemTable* pMemRef = pTsdbReadHandle->pMemRef;
// if (pMemRef == NULL) { // it has been freed
// return;
// }
//
// if (--pMemRef->ref == 0) {
// tsdbUnTakeMemSnapShot(pTsdbReadHandle->pTsdb, &(pMemRef->snapshot));
// }
//
// pTsdbReadHandle->pMemRef = NULL;
}
//int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle) {
// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
//
...
...
@@ -263,9 +237,9 @@ static void tsdbMayUnTakeMemSnapshot(STsdbReadHandle* pTsdbReadHandle) {
// return rows;
//}
static
SArray
*
createCheckInfoFromTableGroup
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableGroupInfo
*
pGroupList
,
SArray
**
psTable
)
{
size_t
size
OfGroup
=
taosArrayGetSize
(
pGroupList
->
pGroupList
);
assert
(
size
OfGroup
>=
1
);
static
SArray
*
createCheckInfoFromTableGroup
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableGroupInfo
*
pGroupList
)
{
size_t
num
OfGroup
=
taosArrayGetSize
(
pGroupList
->
pGroupList
);
assert
(
num
OfGroup
>=
1
);
// allocate buffer in order to load data blocks from file
SArray
*
pTableCheckInfo
=
taosArrayInit
(
pGroupList
->
numOfTables
,
sizeof
(
STableCheckInfo
));
...
...
@@ -273,14 +247,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
return
NULL
;
}
SArray
*
pTable
=
taosArrayInit
(
4
,
sizeof
(
STable
*
));
if
(
pTable
==
NULL
)
{
taosArrayDestroy
(
pTableCheckInfo
);
return
NULL
;
}
// todo apply the lastkey of table check to avoid to load header file
for
(
int32_t
i
=
0
;
i
<
size
OfGroup
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
OfGroup
;
++
i
)
{
SArray
*
group
=
*
(
SArray
**
)
taosArrayGet
(
pGroupList
->
pGroupList
,
i
);
size_t
gsize
=
taosArrayGetSize
(
group
);
...
...
@@ -289,12 +257,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
for
(
int32_t
j
=
0
;
j
<
gsize
;
++
j
)
{
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
group
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
};
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
info
.
tableId
=
pKeyInfo
->
uid
;
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
,
.
tableId
=
pKeyInfo
->
uid
};
if
(
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
if
(
info
.
lastKey
==
INT64_MIN
||
info
.
lastKey
<
pTsdbReadHandle
->
window
.
skey
)
{
info
.
lastKey
=
pTsdbReadHandle
->
window
.
skey
;
...
...
@@ -310,13 +273,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
}
}
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
size_t
gsize
=
taosArrayGetSize
(
pTableCheckInfo
);
// for (int32_t i = 0; i < gsize; ++i) {
// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
// }
*
psTable
=
pTable
;
// TODO group table according to the tag value.
taosArraySort
(
pTableCheckInfo
,
tsdbCheckInfoCompar
);
return
pTableCheckInfo
;
}
...
...
@@ -484,18 +442,17 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup
return
(
tsdbReadHandleT
*
)
pTsdbReadHandle
;
}
SArray
*
psTable
=
NULL
;
// todo apply the lastkey of table check to avoid to load header file
pTsdbReadHandle
->
pTableCheckInfo
=
createCheckInfoFromTableGroup
(
pTsdbReadHandle
,
groupList
,
&
psTable
);
pTsdbReadHandle
->
pTableCheckInfo
=
createCheckInfoFromTableGroup
(
pTsdbReadHandle
,
groupList
);
if
(
pTsdbReadHandle
->
pTableCheckInfo
==
NULL
)
{
// tsdbCleanupQueryHandle(pTsdbReadHandle);
taosArrayDestroy
(
psTable
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
tsdbDebug
(
"%p total numOfTable:%"
PRIzu
" in query, %s"
,
pTsdbReadHandle
,
taosArrayGetSize
(
pTsdbReadHandle
->
pTableCheckInfo
),
pTsdbReadHandle
->
idStr
);
tsdbDebug
(
"%p total numOfTable:%"
PRIzu
" in this query, group %"
PRIzu
" %s"
,
pTsdbReadHandle
,
taosArrayGetSize
(
pTsdbReadHandle
->
pTableCheckInfo
),
taosArrayGetSize
(
groupList
->
pGroupList
),
pTsdbReadHandle
->
idStr
);
return
(
tsdbReadHandleT
)
pTsdbReadHandle
;
}
...
...
@@ -2640,7 +2597,7 @@ static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
break
;
}
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
TSKEY_INITIAL_VAL
,
uid
=
id
};
STableKeyInfo
info
=
{.
lastKey
=
TSKEY_INITIAL_VAL
,
uid
=
id
};
taosArrayPush
(
list
,
&
info
);
}
...
...
@@ -3231,7 +3188,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
if
(
key
<
lastKey
)
{
key
=
lastKey
;
keyInfo
.
pTable
=
pInfo
->
pTable
;
//
keyInfo.pTable = pInfo->pTable;
keyInfo
.
lastKey
=
key
;
pInfo
->
lastKey
=
key
;
...
...
@@ -3245,29 +3202,19 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
}
}
// clear current group, unref unused table
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STableKeyInfo
*
pInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
pGroup
,
i
);
// keyInfo.pTable may be NULL here.
if
(
pInfo
->
pTable
!=
keyInfo
.
pTable
)
{
// tsdbUnRefTable(pInfo->pTable);
}
}
// more than one table in each group, only one table left for each group
if
(
keyInfo
.
pTable
!=
NULL
)
{
totalNumOfTable
++
;
if
(
taosArrayGetSize
(
pGroup
)
==
1
)
{
// do nothing
}
else
{
taosArrayClear
(
pGroup
);
taosArrayPush
(
pGroup
,
&
keyInfo
);
}
}
else
{
// mark all the empty groups, and remove it later
taosArrayDestroy
(
pGroup
);
taosArrayPush
(
emptyGroup
,
&
j
);
}
//
if (keyInfo.pTable != NULL) {
//
totalNumOfTable++;
//
if (taosArrayGetSize(pGroup) == 1) {
//
// do nothing
//
} else {
//
taosArrayClear(pGroup);
//
taosArrayPush(pGroup, &keyInfo);
//
}
//
} else { // mark all the empty groups, and remove it later
//
taosArrayDestroy(pGroup);
//
taosArrayPush(emptyGroup, &j);
//
}
}
// window does not being updated, so set the original
...
...
@@ -3457,11 +3404,13 @@ void filterPrepare(void* expr, void* param) {
}
}
#endif
static
int32_t
tableGroupComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
#if 0
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable* pTable1 = ((STableKeyInfo*) p1)->
pTable
;
STable* pTable2 = ((STableKeyInfo*) p2)->
pTable
;
STable* pTable1 = ((STableKeyInfo*) p1)->
uid
;
STable* pTable2 = ((STableKeyInfo*) p2)->
uid
;
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
...
...
@@ -3509,10 +3458,9 @@ static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *pa
return ret;
}
}
#endif
return
0
;
}
#endif
static
int
tsdbCheckInfoCompar
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(((
STableCheckInfo
*
)
key1
)
->
tableId
<
((
STableCheckInfo
*
)
key2
)
->
tableId
)
{
...
...
@@ -3528,10 +3476,9 @@ static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
void
createTableGroupImpl
(
SArray
*
pGroups
,
SArray
*
pTableList
,
size_t
numOfTables
,
TSKEY
skey
,
STableGroupSupporter
*
pSupp
,
__ext_compar_fn_t
compareFn
)
{
STable
*
pTable
=
taosArrayGetP
(
pTableList
,
0
);
SArray
*
g
=
taosArrayInit
(
16
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info
=
{.
pTable
=
pTable
,
.
lastKey
=
skey
};
STableKeyInfo
info
=
{.
lastKey
=
skey
};
taosArrayPush
(
g
,
&
info
);
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
...
...
@@ -3542,13 +3489,13 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
assert
(
ret
==
0
||
ret
==
-
1
);
if
(
ret
==
0
)
{
STableKeyInfo
info1
=
{.
pTable
=
*
p
,
.
lastKey
=
skey
};
STableKeyInfo
info1
=
{.
lastKey
=
skey
};
taosArrayPush
(
g
,
&
info1
);
}
else
{
taosArrayPush
(
pGroups
,
&
g
);
// current group is ended, start a new group
g
=
taosArrayInit
(
16
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info1
=
{.
pTable
=
*
p
,
.
lastKey
=
skey
};
STableKeyInfo
info1
=
{.
lastKey
=
skey
};
taosArrayPush
(
g
,
&
info1
);
}
}
...
...
@@ -3581,8 +3528,8 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
sup
.
pTagSchema
=
pTagSchema
->
pSchema
;
sup
.
pCols
=
pCols
;
//
taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
//
createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
taosqsort
(
pTableList
->
pData
,
size
,
sizeof
(
STableKeyInfo
),
&
sup
,
tableGroupComparFn
);
createTableGroupImpl
(
pTableGroup
,
pTableList
,
size
,
skey
,
&
sup
,
tableGroupComparFn
);
}
return
pTableGroup
;
...
...
@@ -3690,16 +3637,16 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
int32_t
tsdbQuerySTableByTagCond
(
STsdb
*
tsdb
,
uint64_t
uid
,
TSKEY
skey
,
const
char
*
pTagCond
,
size_t
len
,
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
,
uint64_t
reqId
)
{
SColIndex
*
pColIndex
,
int32_t
numOfCols
,
uint64_t
reqId
,
uint64_t
taskId
)
{
STbCfg
*
pTbCfg
=
metaGetTbInfoByUid
(
tsdb
->
pMeta
,
uid
);
if
(
pTbCfg
==
NULL
)
{
tsdbError
(
"%p failed to get stable, uid:%"
PRIu64
",
reqId:0x%"
PRIx64
,
tsdb
,
ui
d
,
reqId
);
tsdbError
(
"%p failed to get stable, uid:%"
PRIu64
",
TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
tsdb
,
uid
,
taskI
d
,
reqId
);
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
goto
_error
;
}
if
(
pTbCfg
->
type
!=
META_SUPER_TABLE
)
{
tsdbError
(
"%p query normal tag not allowed, uid:%"
PRIu64
",
reId:0x%"
PRIx64
,
tsdb
,
ui
d
,
reqId
);
tsdbError
(
"%p query normal tag not allowed, uid:%"
PRIu64
",
TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
tsdb
,
uid
,
taskI
d
,
reqId
);
terrno
=
TSDB_CODE_OPS_NOT_SUPPORT
;
//basically, this error is caused by invalid sql issued by client
goto
_error
;
}
...
...
@@ -3718,8 +3665,8 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
pGroupInfo
->
numOfTables
=
(
uint32_t
)
taosArrayGetSize
(
res
);
pGroupInfo
->
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCols
,
skey
);
tsdbDebug
(
"%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu
"
,
tsdb
,
pGroupInfo
->
numOfTables
,
taosArrayGetSize
(
pGroupInfo
->
pGroupList
));
tsdbDebug
(
"%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu
, TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
tsdb
,
pGroupInfo
->
numOfTables
,
taosArrayGetSize
(
pGroupInfo
->
pGroupList
)
,
taskId
,
reqId
);
taosArrayDestroy
(
res
);
return
ret
;
...
...
@@ -3892,7 +3839,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
tfree
(
pTsdbReadHandle
->
statis
);
if
(
!
emptyQueryTimewindow
(
pTsdbReadHandle
))
{
tsdbMayUnTakeMemSnapshot
(
pTsdbReadHandle
);
//
tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
}
else
{
assert
(
pTsdbReadHandle
->
pTableCheckInfo
==
NULL
);
}
...
...
source/libs/executor/inc/executil.h
浏览文件 @
8708bf01
...
...
@@ -38,7 +38,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.
id
str)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
8708bf01
...
...
@@ -239,8 +239,7 @@ typedef struct STaskIdInfo {
uint64_t
queryId
;
// this is also a request id
uint64_t
subplanId
;
uint64_t
templateId
;
uint64_t
taskId
;
// this is a subplan id
char
*
idstr
;
char
*
str
;
}
STaskIdInfo
;
typedef
struct
SExecTaskInfo
{
...
...
@@ -639,7 +638,6 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
bool
checkNeedToCompressQueryCol
(
SQInfo
*
pQInfo
);
bool
doBuildResCheck
(
SQInfo
*
pQInfo
);
void
setQueryStatus
(
STaskRuntimeEnv
*
pRuntimeEnv
,
int8_t
status
);
bool
onlyQueryTags
(
STaskAttr
*
pQueryAttr
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
8708bf01
...
...
@@ -18,20 +18,20 @@
#include "executorimpl.h"
#include "planner.h"
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
uint64_t
reqI
d
)
{
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
char
*
i
d
)
{
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
OP_StreamScan
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
qError
(
"failed to find stream scan operator to set the input data block,
reqId:0x%"
PRIx64
,
reqI
d
);
qError
(
"failed to find stream scan operator to set the input data block,
%s"
PRIx64
,
i
d
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
if
(
pOperator
->
numOfDownstream
>
1
)
{
// not handle this in join query
qError
(
"join not supported for stream block scan,
reqId:0x%"
PRIx64
,
reqI
d
);
qError
(
"join not supported for stream block scan,
%s"
PRIx64
,
i
d
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
return
doSetStreamBlock
(
pOperator
->
pDownstream
[
0
],
input
,
reqI
d
);
return
doSetStreamBlock
(
pOperator
->
pDownstream
[
0
],
input
,
i
d
);
}
else
{
SStreamBlockScanInfo
*
pInfo
=
pOperator
->
info
;
tqReadHandleSetMsg
(
pInfo
->
readerHandle
,
input
,
0
);
...
...
@@ -50,7 +50,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
input
,
pTaskInfo
->
id
.
queryId
);
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
input
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s failed to set the stream block data"
,
GET_TASKID
(
pTaskInfo
));
}
else
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8708bf01
...
...
@@ -5146,7 +5146,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
size_t
totalSources
=
taosArrayGetSize
(
pExchangeInfo
->
pSources
);
if
(
pExchangeInfo
->
current
>=
totalSources
)
{
qDebug
(
"%s all %"
PRIzu
" source(s) are exhausted, total rows:%"
PRIu64
" bytes:%"
PRIu64
", elapsed:%.2f ms"
,
pTaskInfo
->
id
.
idstr
,
totalSources
,
qDebug
(
"%s all %"
PRIzu
" source(s) are exhausted, total rows:%"
PRIu64
" bytes:%"
PRIu64
", elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
)
,
totalSources
,
pExchangeInfo
->
totalRows
,
pExchangeInfo
->
totalSize
,
pExchangeInfo
->
totalElapsed
/
1000
.
0
);
return
NULL
;
}
...
...
@@ -5208,7 +5208,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
int64_t
el
=
taosGetTimestampUs
()
-
startTs
;
pExchangeInfo
->
totalElapsed
+=
el
;
qDebug
(
"%s all %"
PRIzu
" sources are exhausted, total rows: %"
PRIu64
" bytes:%"
PRIu64
", elapsed:%.2f ms"
,
pTaskInfo
->
id
.
idstr
,
totalSources
,
qDebug
(
"%s all %"
PRIzu
" sources are exhausted, total rows: %"
PRIu64
" bytes:%"
PRIu64
", elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
)
,
totalSources
,
pExchangeInfo
->
totalRows
,
pExchangeInfo
->
totalSize
,
pExchangeInfo
->
totalElapsed
/
1000
.
0
);
return
NULL
;
}
else
{
...
...
@@ -7741,11 +7741,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
id
.
taskId
=
taskId
;
char
*
p
=
calloc
(
1
,
128
);
snprintf
(
p
,
128
,
"TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
taskId
,
queryId
);
pTaskInfo
->
id
.
id
str
=
strdup
(
p
);
pTaskInfo
->
id
.
str
=
strdup
(
p
);
return
pTaskInfo
;
}
...
...
@@ -7822,7 +7821,7 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
if
(
tableType
==
TSDB_SUPER_TABLE
)
{
code
=
tsdbQuerySTableByTagCond
(
readerHandle
,
uid
,
window
.
skey
,
NULL
,
0
,
0
,
NULL
,
&
groupInfo
,
NULL
,
0
,
queryId
);
tsdbQuerySTableByTagCond
(
readerHandle
,
uid
,
window
.
skey
,
NULL
,
0
,
0
,
NULL
,
&
groupInfo
,
NULL
,
0
,
queryId
,
taskId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -7832,7 +7831,7 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
SArray
*
pa
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
0
,
.
uid
=
uid
};
STableKeyInfo
info
=
{.
lastKey
=
0
,
.
uid
=
uid
};
taosArrayPush
(
pa
,
&
info
);
taosArrayPush
(
groupInfo
.
pGroupList
,
&
pa
);
}
...
...
@@ -8827,33 +8826,15 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
}
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"%s start to free execTask"
,
GET_TASKID
(
pTaskInfo
));
doDestroyTableQueryInfo
(
&
pTaskInfo
->
tableqinfoGroupInfo
);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
tfree
(
pTaskInfo
->
sql
);
tfree
(
pTaskInfo
->
id
.
str
);
qDebug
(
"%s execTask is freed"
,
GET_TASKID
(
pTaskInfo
));
tfree
(
pTaskInfo
);
}
bool
doBuildResCheck
(
SQInfo
*
pQInfo
)
{
bool
buildRes
=
false
;
pthread_mutex_lock
(
&
pQInfo
->
lock
);
pQInfo
->
dataReady
=
QUERY_RESULT_READY
;
buildRes
=
needBuildResAfterQueryComplete
(
pQInfo
);
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
// put into task to be executed.
assert
(
pQInfo
->
owner
==
taosGetSelfPthreadId
());
pQInfo
->
owner
=
0
;
pthread_mutex_unlock
(
&
pQInfo
->
lock
);
// used in retrieve blocking model.
tsem_post
(
&
pQInfo
->
ready
);
return
buildRes
;
tfree
(
pTaskInfo
);
}
static
void
doSetTagValueToResultBuf
(
char
*
output
,
const
char
*
val
,
int16_t
type
,
int16_t
bytes
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录