Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
040093be
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
040093be
编写于
6月 15, 2022
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into szhou/feature/multi-tb-merge-scan
上级
562ec27c
e85b591e
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
240 addition
and
146 deletion
+240
-146
include/common/systable.h
include/common/systable.h
+1
-0
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+50
-49
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+1
-1
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+2
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+7
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+115
-39
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+3
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-8
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+8
-0
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+22
-12
tests/system-test/2-query/json_tag.py
tests/system-test/2-query/json_tag.py
+30
-35
未找到文件。
include/common/systable.h
浏览文件 @
040093be
...
...
@@ -52,6 +52,7 @@ extern "C" {
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_PERFS_TABLE_TRANS "trans"
#define TSDB_PERFS_TABLE_STREAMS "streams"
#define TSDB_PERFS_TABLE_APPS "apps"
typedef
struct
SSysDbTableSchema
{
const
char
*
name
;
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
040093be
...
...
@@ -261,55 +261,49 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
}
int32_t
ctgHandleForceUpdate
(
SCatalog
*
pCtg
,
SCtgJob
*
pJob
,
const
SCatalogReq
*
pReq
)
{
int32_t
dbNum
=
pJob
->
dbCfgNum
+
pJob
->
dbVgNum
+
pJob
->
dbInfoNum
;
if
(
dbNum
>
0
)
{
if
(
dbNum
>
pJob
->
dbCfgNum
&&
dbNum
>
pJob
->
dbVgNum
&&
dbNum
>
pJob
->
dbInfoNum
)
{
SHashObj
*
pDb
=
taosHashInit
(
dbNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
pDb
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbVgNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbVgroup
,
i
);
taosHashPut
(
pDb
,
dbFName
,
strlen
(
dbFName
),
dbFName
,
TSDB_DB_FNAME_LEN
);
}
int32_t
ctgHandleForceUpdate
(
SCatalog
*
pCtg
,
int32_t
taskNum
,
SCtgJob
*
pJob
,
const
SCatalogReq
*
pReq
)
{
SHashObj
*
pDb
=
taosHashInit
(
taskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
pDb
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbVgNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbVgroup
,
i
);
taosHashPut
(
pDb
,
dbFName
,
strlen
(
dbFName
),
dbFName
,
TSDB_DB_FNAME_LEN
);
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbCfgNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbCfg
,
i
);
taosHashPut
(
pDb
,
dbFName
,
strlen
(
dbFName
),
dbFName
,
TSDB_DB_FNAME_LEN
);
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbCfgNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbCfg
,
i
);
taosHashPut
(
pDb
,
dbFName
,
strlen
(
dbFName
),
dbFName
,
TSDB_DB_FNAME_LEN
);
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbInfoNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbInfo
,
i
);
taosHashPut
(
pDb
,
dbFName
,
strlen
(
dbFName
),
dbFName
,
TSDB_DB_FNAME_LEN
);
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbInfoNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbInfo
,
i
);
taosHashPut
(
pDb
,
dbFName
,
strlen
(
dbFName
),
dbFName
,
TSDB_DB_FNAME_LEN
);
}
char
*
dbFName
=
taosHashIterate
(
pDb
,
NULL
);
while
(
dbFName
)
{
ctgDropDbVgroupEnqueue
(
pCtg
,
dbFName
,
true
);
dbFName
=
taosHashIterate
(
pDb
,
dbFName
);
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
tbMetaNum
;
++
i
)
{
SName
*
name
=
taosArrayGet
(
pReq
->
pTableMeta
,
i
);
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
name
,
dbFName
);
taosHashPut
(
pDb
,
dbFName
,
strlen
(
dbFName
),
dbFName
,
TSDB_DB_FNAME_LEN
);
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
tbHashNum
;
++
i
)
{
SName
*
name
=
taosArrayGet
(
pReq
->
pTableHash
,
i
);
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
name
,
dbFName
);
taosHashPut
(
pDb
,
dbFName
,
strlen
(
dbFName
),
dbFName
,
TSDB_DB_FNAME_LEN
);
}
taosHashCleanup
(
pDb
);
}
else
{
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbVgNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbVgroup
,
i
);
CTG_ERR_RET
(
ctgDropDbVgroupEnqueue
(
pCtg
,
dbFName
,
true
));
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbCfgNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbCfg
,
i
);
CTG_ERR_RET
(
ctgDropDbVgroupEnqueue
(
pCtg
,
dbFName
,
true
));
}
for
(
int32_t
i
=
0
;
i
<
pJob
->
dbInfoNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pReq
->
pDbInfo
,
i
);
CTG_ERR_RET
(
ctgDropDbVgroupEnqueue
(
pCtg
,
dbFName
,
true
));
}
}
char
*
dbFName
=
taosHashIterate
(
pDb
,
NULL
);
while
(
dbFName
)
{
ctgDropDbVgroupEnqueue
(
pCtg
,
dbFName
,
true
);
dbFName
=
taosHashIterate
(
pDb
,
dbFName
);
}
taosHashCleanup
(
pDb
);
int32_t
tbNum
=
pJob
->
tbMetaNum
+
pJob
->
tbHashNum
;
if
(
tbNum
>
0
)
{
if
(
tbNum
>
pJob
->
tbMetaNum
&&
tbNum
>
pJob
->
tbHashNum
)
{
...
...
@@ -404,7 +398,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
}
if
(
pReq
->
forceUpdate
)
{
CTG_ERR_JRET
(
ctgHandleForceUpdate
(
pCtg
,
pJob
,
pReq
));
CTG_ERR_JRET
(
ctgHandleForceUpdate
(
pCtg
,
*
taskNum
,
pJob
,
pReq
));
}
int32_t
taskIdx
=
0
;
...
...
@@ -790,8 +784,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
_return:
if
(
dbCache
)
{
ctgRUnlockVgInfo
(
dbCache
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgReleaseVgInfoToCache
(
pCtg
,
dbCache
);
}
ctgHandleTaskEnd
(
pTask
,
code
);
...
...
@@ -870,7 +863,7 @@ _return:
int32_t
ctgHandleGetTbIndexRsp
(
SCtgTask
*
pTask
,
int32_t
reqType
,
const
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgProcessRspMsg
(
&
pTask
->
msgCtx
.
out
,
reqType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
,
pTask
->
msgCtx
.
target
));
CTG_ERR_JRET
(
ctgProcessRspMsg
(
pTask
->
msgCtx
.
out
,
reqType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
,
pTask
->
msgCtx
.
target
));
STableIndex
*
pOut
=
(
STableIndex
*
)
pTask
->
msgCtx
.
out
;
SArray
*
pInfo
=
NULL
;
...
...
@@ -949,7 +942,6 @@ _return:
int32_t
ctgHandleGetUserRsp
(
SCtgTask
*
pTask
,
int32_t
reqType
,
const
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SCtgDBCache
*
dbCache
=
NULL
;
SCtgUserCtx
*
ctx
=
(
SCtgUserCtx
*
)
pTask
->
taskCtx
;
SCatalog
*
pCtg
=
pTask
->
pJob
->
pCtg
;
bool
pass
=
false
;
...
...
@@ -1018,7 +1010,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
dbFName
,
&
dbCache
));
if
(
dbCache
)
{
SVgroupInfo
vgInfo
=
{
0
};
CTG_ERR_RET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgCache
.
vgInfo
,
ctx
->
pName
,
&
vgInfo
));
CTG_ERR_
J
RET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgCache
.
vgInfo
,
ctx
->
pName
,
&
vgInfo
));
ctgDebug
(
"will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d"
,
tNameGetTableName
(
ctx
->
pName
),
ctx
->
flag
);
...
...
@@ -1067,6 +1059,9 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
pCtx
->
dbFName
,
&
dbCache
));
if
(
NULL
!=
dbCache
)
{
CTG_ERR_JRET
(
ctgGenerateVgList
(
pCtg
,
dbCache
->
vgCache
.
vgInfo
->
vgHash
,
(
SArray
**
)
&
pTask
->
res
));
ctgReleaseVgInfoToCache
(
pCtg
,
dbCache
);
dbCache
=
NULL
;
CTG_ERR_JRET
(
ctgHandleTaskEnd
(
pTask
,
0
));
}
else
{
...
...
@@ -1101,6 +1096,9 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
CTG_ERR_JRET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgCache
.
vgInfo
,
pCtx
->
pName
,
(
SVgroupInfo
*
)
pTask
->
res
));
ctgReleaseVgInfoToCache
(
pCtg
,
dbCache
);
dbCache
=
NULL
;
CTG_ERR_JRET
(
ctgHandleTaskEnd
(
pTask
,
0
));
}
else
{
...
...
@@ -1176,6 +1174,9 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
pInfo
->
vgVer
=
dbCache
->
vgCache
.
vgInfo
->
vgVersion
;
pInfo
->
dbId
=
dbCache
->
dbId
;
pInfo
->
tbNum
=
dbCache
->
vgCache
.
vgInfo
->
numOfTable
;
ctgReleaseVgInfoToCache
(
pCtg
,
dbCache
);
dbCache
=
NULL
;
}
else
{
pInfo
->
vgVer
=
CTG_DEFAULT_INVALID_VERSION
;
}
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
040093be
...
...
@@ -19,7 +19,7 @@
#include "catalogInt.h"
extern
SCatalogMgmt
gCtgMgmt
;
SCtgDebug
gCTGDebug
=
{
0
};
SCtgDebug
gCTGDebug
=
{
.
lockEnable
=
true
,
.
apiEnable
=
true
};
void
ctgdUserCallback
(
SMetaData
*
pResult
,
void
*
param
,
int32_t
code
)
{
ASSERT
(
*
(
int32_t
*
)
param
==
1
);
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
040093be
...
...
@@ -675,7 +675,8 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STableIndexInfo
*
pInfo
=
taosArrayGet
(
pIndex
,
i
);
taosArrayPush
(
*
pRes
,
pInfo
);
pInfo
=
taosArrayPush
(
*
pRes
,
pInfo
);
pInfo
->
expr
=
strdup
(
pInfo
->
expr
);
}
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
040093be
...
...
@@ -391,7 +391,9 @@ typedef struct SStreamBlockScanInfo {
void
*
streamBlockReader
;
// stream block reader handle
SArray
*
pColMatchInfo
;
//
SNode
*
pCondition
;
int32_t
tsArrayIndex
;
SArray
*
tsArray
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
SExprInfo
*
pPseudoExpr
;
...
...
@@ -582,6 +584,7 @@ typedef struct SPartitionOperatorInfo {
int32_t
*
columnOffset
;
// start position for each column data
void
*
pGroupIter
;
// group iterator
int32_t
pageIndex
;
// page index of current group
SSDataBlock
*
pUpdateRes
;
}
SPartitionOperatorInfo
;
typedef
struct
SWindowRowsSup
{
...
...
@@ -907,6 +910,7 @@ int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
getTableList
(
void
*
metaHandle
,
int32_t
tableType
,
uint64_t
tableUid
,
STableListInfo
*
pListInfo
,
SNode
*
pTagCond
);
int32_t
createMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
...
...
@@ -914,6 +918,9 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand
uint64_t
taskId
,
SNode
*
pTagCond
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SArray
*
dataReaders
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
040093be
...
...
@@ -751,60 +751,127 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
return
true
;
}
static
SSDataBlock
*
doDataScan
(
SStreamBlockScanInfo
*
pInfo
)
{
SSDataBlock
*
pResult
=
NULL
;
pResult
=
doTableScan
(
pInfo
->
pOperatorDumy
);
if
(
pResult
==
NULL
)
{
if
(
prepareDataScan
(
pInfo
))
{
// scan next window data
pResult
=
doTableScan
(
pInfo
->
pOperatorDumy
);
static
void
copyOneRow
(
SSDataBlock
*
dest
,
SSDataBlock
*
source
,
int32_t
sourceRowId
)
{
for
(
int32_t
j
=
0
;
j
<
source
->
info
.
numOfCols
;
j
++
)
{
SColumnInfoData
*
pDestCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
dest
->
pDataBlock
,
j
);
SColumnInfoData
*
pSourceCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
source
->
pDataBlock
,
j
);
if
(
colDataIsNull_s
(
pSourceCol
,
sourceRowId
))
{
colDataAppendNULL
(
pDestCol
,
dest
->
info
.
rows
);
}
else
{
colDataAppend
(
pDestCol
,
dest
->
info
.
rows
,
colDataGetData
(
pSourceCol
,
sourceRowId
),
false
);
}
}
return
pResult
;
dest
->
info
.
rows
++
;
}
static
void
getUpdateDataBlock
(
SStreamBlockScanInfo
*
pInfo
,
bool
invertible
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pUpdateBlock
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
ASSERT
(
pColDataInfo
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
ts
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
if
(
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
pBlock
->
info
.
uid
,
ts
[
i
]))
{
taosArrayPush
(
pInfo
->
tsArray
,
ts
+
i
);
static
uint64_t
getGroupId
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
int32_t
rowId
)
{
uint64_t
*
groupId
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
pBlock
->
info
.
uid
,
sizeof
(
int64_t
));
if
(
groupId
)
{
return
*
groupId
;
}
return
0
;
/* Todo(liuyao) for partition by column
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
uint64_t resId = 0;
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
if (groupId) {
return *groupId;
} else if (len != 0) {
resId = calcGroupId(pTableScanInfo->keyBuf, len);
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
}
return resId;
*/
}
static
SSDataBlock
*
doDataScan
(
SStreamBlockScanInfo
*
pInfo
)
{
while
(
1
)
{
SSDataBlock
*
pResult
=
NULL
;
pResult
=
doTableScan
(
pInfo
->
pOperatorDumy
);
if
(
pResult
==
NULL
)
{
if
(
prepareDataScan
(
pInfo
))
{
// scan next window data
pResult
=
doTableScan
(
pInfo
->
pOperatorDumy
);
}
}
if
(
!
pResult
)
{
return
NULL
;
}
if
(
pResult
->
info
.
groupId
==
pInfo
->
groupId
)
{
return
pResult
;
}
}
if
(
!
pUpdateBlock
)
{
taosArrayClear
(
pInfo
->
tsArray
);
return
;
/* Todo(liuyao) for partition by column
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
blockDataCleanup(pResult);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
if (id == pInfo->groupId) {
copyOneRow(pResult, pBlock, i);
}
}
return pResult;
*/
}
static
void
setUpdateData
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pUpdateBlock
)
{
blockDataCleanup
(
pUpdateBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
tsArray
);
if
(
size
>
0
&&
invertible
)
{
// Todo(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pBlock, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
if
(
pInfo
->
tsArrayIndex
<
size
)
{
SColumnInfoData
*
pCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pUpdateBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
ASSERT
(
pCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
blockDataEnsureCapacity
(
pUpdateBlock
,
size
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
TSKEY
*
pTs
=
(
TSKEY
*
)
taosArrayGet
(
pInfo
->
tsArray
,
i
);
colDataAppend
(
pCol
,
i
,
(
char
*
)
pTs
,
false
);
}
for
(
int32_t
i
=
0
;
i
<
pUpdateBlock
->
info
.
numOfCols
;
i
++
)
{
if
(
i
==
pInfo
->
primaryTsIndex
)
{
continue
;
ASSERT
(
pBlock
->
info
.
numOfCols
==
pUpdateBlock
->
info
.
numOfCols
);
int32_t
rowId
=
*
(
int32_t
*
)
taosArrayGet
(
pInfo
->
tsArray
,
pInfo
->
tsArrayIndex
);
pInfo
->
groupId
=
getGroupId
(
pInfo
->
pOperatorDumy
,
pBlock
,
rowId
);
int32_t
i
=
0
;
for
(
;
i
<
size
;
i
++
)
{
rowId
=
*
(
int32_t
*
)
taosArrayGet
(
pInfo
->
tsArray
,
i
+
pInfo
->
tsArrayIndex
);
uint64_t
id
=
getGroupId
(
pInfo
->
pOperatorDumy
,
pBlock
,
rowId
);
if
(
pInfo
->
groupId
!=
id
)
{
break
;
}
SColumnInfoData
*
pCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pUpdateBlock
->
pDataBlock
,
i
);
colDataAppendNNULL
(
pCol
,
0
,
size
);
copyOneRow
(
pUpdateBlock
,
pBlock
,
rowId
);
}
pUpdateBlock
->
info
.
rows
=
size
;
pUpdateBlock
->
info
.
rows
=
i
;
pInfo
->
tsArrayIndex
+=
i
;
pUpdateBlock
->
info
.
groupId
=
pInfo
->
groupId
;
pUpdateBlock
->
info
.
type
=
STREAM_REPROCESS
;
blockDataUpdateTsWindow
(
pUpdateBlock
,
0
);
}
// all rows have same group id
ASSERT
(
pInfo
->
tsArrayIndex
>=
size
);
if
(
size
>
0
&&
pInfo
->
tsArrayIndex
==
size
)
{
taosArrayClear
(
pInfo
->
tsArray
);
}
}
static
void
getUpdateDataBlock
(
SStreamBlockScanInfo
*
pInfo
,
bool
invertible
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pUpdateBlock
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
ASSERT
(
pColDataInfo
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
ts
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
for
(
int32_t
rowId
=
0
;
rowId
<
pBlock
->
info
.
rows
;
rowId
++
)
{
if
(
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
pBlock
->
info
.
uid
,
ts
[
rowId
]))
{
taosArrayPush
(
pInfo
->
tsArray
,
&
rowId
);
}
}
if
(
!
pUpdateBlock
)
{
taosArrayClear
(
pInfo
->
tsArray
);
return
;
}
setUpdateData
(
pInfo
,
pBlock
,
pUpdateBlock
);
// Todo(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pBlock, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
}
static
SSDataBlock
*
doStreamBlockScan
(
SOperatorInfo
*
pOperator
)
{
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -834,7 +901,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
return
pInfo
->
pRes
;
}
else
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_UPDATERES
)
{
blockDataCleanup
(
pInfo
->
pRes
);
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
if
(
!
isStateWindow
(
pInfo
))
{
prepareDataScan
(
pInfo
);
...
...
@@ -849,7 +915,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER
)
{
SSDataBlock
*
pSDB
=
doDataScan
(
pInfo
);
if
(
pSDB
==
NULL
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
setUpdateData
(
pInfo
,
pInfo
->
pRes
,
pInfo
->
pUpdateRes
);
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
if
(
!
isStateWindow
(
pInfo
))
{
prepareDataScan
(
pInfo
);
}
return
pInfo
->
pUpdateRes
;
}
else
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
}
}
else
{
getUpdateDataBlock
(
pInfo
,
true
,
pSDB
,
NULL
);
return
pSDB
;
...
...
@@ -942,7 +1016,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if
(
rows
==
0
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
else
if
(
pInfo
->
pUpdateInfo
)
{
blockDataCleanup
(
pInfo
->
pUpdateRes
)
;
pInfo
->
tsArrayIndex
=
0
;
getUpdateDataBlock
(
pInfo
,
true
,
pInfo
->
pRes
,
pInfo
->
pUpdateRes
);
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pUpdateRes
->
info
.
type
==
STREAM_REPROCESS
)
{
...
...
@@ -1021,7 +1095,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
goto
_error
;
}
pInfo
->
tsArray
=
taosArrayInit
(
4
,
sizeof
(
TSKEY
));
pInfo
->
tsArray
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
if
(
pInfo
->
tsArray
==
NULL
)
{
goto
_error
;
}
...
...
@@ -1048,6 +1122,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pInfo
->
pOperatorDumy
=
pTableScanDummy
;
pInfo
->
interval
=
pSTInfo
->
interval
;
pInfo
->
sessionSup
=
(
SessionWindowSupporter
){.
pStreamAggSup
=
NULL
,
.
gap
=
-
1
};
pInfo
->
groupId
=
0
;
pOperator
->
name
=
"StreamBlockScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
pOperator
->
blocking
=
false
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
040093be
...
...
@@ -1985,7 +1985,7 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) {
blockDataCleanup
(
pBlock
);
}
static
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
)
{
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
)
{
ASSERT
(
pDest
->
info
.
capacity
>=
pSource
->
info
.
rows
);
clearUpdateDataBlock
(
pDest
);
SColumnInfoData
*
pDestCol
=
taosArrayGet
(
pDest
->
pDataBlock
,
0
);
...
...
@@ -1997,6 +1997,8 @@ static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_
colDataAppendNNULL
(
pCol
,
0
,
pSource
->
info
.
rows
);
}
pDest
->
info
.
rows
=
pSource
->
info
.
rows
;
pDest
->
info
.
groupId
=
pSource
->
info
.
groupId
;
pDest
->
info
.
type
=
pSource
->
info
.
type
;
blockDataUpdateTsWindow
(
pDest
,
0
);
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
040093be
...
...
@@ -2345,14 +2345,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
SResultRowEntryInfo
*
pSResInfo
=
GET_RES_INFO
(
pSourceCtx
);
SAPercentileInfo
*
pSBuf
=
GET_ROWCELL_INTERBUF
(
pSResInfo
);
ASSERT
(
pDBuf
->
algo
==
pSBuf
->
algo
);
if
(
pDBuf
->
algo
==
APERCT_ALGO_TDIGEST
)
{
tdigestMerge
(
pDBuf
->
pTDigest
,
pSBuf
->
pTDigest
);
}
else
{
SHistogramInfo
*
pTmp
=
tHistogramMerge
(
pDBuf
->
pHisto
,
pSBuf
->
pHisto
,
MAX_HISTOGRAM_BIN
);
memcpy
(
pDBuf
->
pHisto
,
pTmp
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
(
MAX_HISTOGRAM_BIN
+
1
));
pDBuf
->
pHisto
->
elems
=
(
SHistBin
*
)((
char
*
)
pDBuf
->
pHisto
+
sizeof
(
SHistogramInfo
));
tHistogramDestroy
(
&
pTmp
);
}
apercentileTransferInfo
(
pSBuf
,
pDBuf
);
pDResInfo
->
numOfRes
=
TMAX
(
pDResInfo
->
numOfRes
,
pSResInfo
->
numOfRes
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
040093be
...
...
@@ -4128,12 +4128,14 @@ static const char* getSysDbName(ENodeType type) {
case
QUERY_NODE_SHOW_SNODES_STMT
:
case
QUERY_NODE_SHOW_LICENCE_STMT
:
case
QUERY_NODE_SHOW_CLUSTER_STMT
:
case
QUERY_NODE_SHOW_VARIABLE_STMT
:
return
TSDB_INFORMATION_SCHEMA_DB
;
case
QUERY_NODE_SHOW_CONNECTIONS_STMT
:
case
QUERY_NODE_SHOW_QUERIES_STMT
:
case
QUERY_NODE_SHOW_TOPICS_STMT
:
case
QUERY_NODE_SHOW_STREAMS_STMT
:
case
QUERY_NODE_SHOW_TRANSACTIONS_STMT
:
case
QUERY_NODE_SHOW_APPS_STMT
:
return
TSDB_PERFORMANCE_SCHEMA_DB
;
default:
break
;
...
...
@@ -4183,6 +4185,10 @@ static const char* getSysTableName(ENodeType type) {
return
TSDB_PERFS_TABLE_TOPICS
;
case
QUERY_NODE_SHOW_TRANSACTIONS_STMT
:
return
TSDB_PERFS_TABLE_TRANS
;
case
QUERY_NODE_SHOW_VARIABLE_STMT
:
return
TSDB_INS_TABLE_CONFIGS
;
case
QUERY_NODE_SHOW_APPS_STMT
:
return
TSDB_PERFS_TABLE_APPS
;
default:
break
;
}
...
...
@@ -5237,6 +5243,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case
QUERY_NODE_SHOW_CLUSTER_STMT
:
case
QUERY_NODE_SHOW_TOPICS_STMT
:
case
QUERY_NODE_SHOW_TRANSACTIONS_STMT
:
case
QUERY_NODE_SHOW_VARIABLE_STMT
:
case
QUERY_NODE_SHOW_APPS_STMT
:
code
=
rewriteShow
(
pCxt
,
pQuery
);
break
;
case
QUERY_NODE_CREATE_TABLE_STMT
:
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
040093be
...
...
@@ -104,10 +104,14 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return
false
;
}
if
(
NULL
==
pNode
->
pParent
||
(
QUERY_NODE_LOGIC_PLAN_WINDOW
!=
nodeType
(
pNode
->
pParent
)
&&
QUERY_NODE_LOGIC_PLAN_AGG
!=
nodeType
(
pNode
->
pParent
)))
{
QUERY_NODE_LOGIC_PLAN_AGG
!=
nodeType
(
pNode
->
pParent
)
&&
QUERY_NODE_LOGIC_PLAN_PARTITION
!=
nodeType
(
pNode
->
pParent
)))
{
return
false
;
}
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
->
pParent
))
{
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
->
pParent
)
||
(
QUERY_NODE_LOGIC_PLAN_PARTITION
==
nodeType
(
pNode
->
pParent
)
&&
pNode
->
pParent
->
pParent
&&
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
->
pParent
->
pParent
))
)
{
return
true
;
}
return
!
osdHaveNormalCol
(((
SAggLogicNode
*
)
pNode
->
pParent
)
->
pGroupKeys
);
...
...
@@ -217,16 +221,22 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
}
static
void
setScanWindowInfo
(
SScanLogicNode
*
pScan
)
{
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pScan
->
node
.
pParent
))
{
pScan
->
interval
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
interval
;
pScan
->
offset
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
offset
;
pScan
->
sliding
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
sliding
;
pScan
->
intervalUnit
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
intervalUnit
;
pScan
->
slidingUnit
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
slidingUnit
;
pScan
->
triggerType
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
triggerType
;
pScan
->
watermark
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
watermark
;
pScan
->
tsColId
=
((
SColumnNode
*
)((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
pTspk
)
->
colId
;
pScan
->
filesFactor
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
filesFactor
;
SLogicNode
*
pParent
=
pScan
->
node
.
pParent
;
if
(
QUERY_NODE_LOGIC_PLAN_PARTITION
==
nodeType
(
pParent
)
&&
pParent
->
pParent
&&
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pParent
->
pParent
))
{
pParent
=
pParent
->
pParent
;
}
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pParent
))
{
pScan
->
interval
=
((
SWindowLogicNode
*
)
pParent
)
->
interval
;
pScan
->
offset
=
((
SWindowLogicNode
*
)
pParent
)
->
offset
;
pScan
->
sliding
=
((
SWindowLogicNode
*
)
pParent
)
->
sliding
;
pScan
->
intervalUnit
=
((
SWindowLogicNode
*
)
pParent
)
->
intervalUnit
;
pScan
->
slidingUnit
=
((
SWindowLogicNode
*
)
pParent
)
->
slidingUnit
;
pScan
->
triggerType
=
((
SWindowLogicNode
*
)
pParent
)
->
triggerType
;
pScan
->
watermark
=
((
SWindowLogicNode
*
)
pParent
)
->
watermark
;
pScan
->
tsColId
=
((
SColumnNode
*
)((
SWindowLogicNode
*
)
pParent
)
->
pTspk
)
->
colId
;
pScan
->
filesFactor
=
((
SWindowLogicNode
*
)
pParent
)
->
filesFactor
;
}
}
...
...
tests/system-test/2-query/json_tag.py
浏览文件 @
040093be
...
...
@@ -243,9 +243,8 @@ class TDTestCase:
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag2'!='beijing'"
)
tdSql
.
checkRows
(
5
)
#open
#tdSql.query("select * from jsons1 where jtag->'tag2'=''")
#tdSql.checkRows(2)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag2'=''"
)
tdSql
.
checkRows
(
2
)
#
# # where json value is int
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'=5"
)
...
...
@@ -253,11 +252,10 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
1
,
2
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'=10"
)
tdSql
.
checkRows
(
0
)
# open
#tdSql.query("select * from jsons1 where jtag->'tag1'<54")
#tdSql.checkRows(3)
#tdSql.query("select * from jsons1 where jtag->'tag1'<=11")
#tdSql.checkRows(3)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'<54"
)
tdSql
.
checkRows
(
4
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'<=11"
)
tdSql
.
checkRows
(
4
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'>4"
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'>=5"
)
...
...
@@ -270,31 +268,28 @@ class TDTestCase:
# # where json value is double
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'=1.232"
)
tdSql
.
checkRows
(
1
)
# open
#tdSql.query("select * from jsons1 where jtag->'tag1'<1.232")
#tdSql.checkRows(0)
#tdSql.query("select * from jsons1 where jtag->'tag1'<=1.232")
#tdSql.checkRows(1)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'<1.232"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'<=1.232"
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'>1.23"
)
tdSql
.
checkRows
(
3
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'>=1.232"
)
tdSql
.
checkRows
(
3
)
# open
#tdSql.query("select * from jsons1 where jtag->'tag1'!=1.232")
#tdSql.checkRows(2)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'!=1.232"
)
tdSql
.
checkRows
(
6
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'!=3.232"
)
tdSql
.
checkRows
(
7
)
#tdSql.error("select * from jsons1 where jtag->'tag1'/0=3")
#tdSql.error("select * from jsons1 where jtag->'tag1'/5=1")
#
# # where json value is bool
#tdSql.query("select * from jsons1 where jtag->'tag1'=true")
# open
#tdSql.checkRows(0)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'=true"
)
tdSql
.
checkRows
(
0
)
#tdSql.query("select * from jsons1 where jtag->'tag1'=false")
#tdSql.checkRows(1)
#
tdSql.query("select * from jsons1 where jtag->'tag1'!=false")
#tdSql.checkRows(0
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'!=false"
)
tdSql
.
checkRows
(
3
)
#tdSql.error("select * from jsons1 where jtag->'tag1'>false")
#
# # where json value is null
...
...
@@ -303,18 +298,17 @@ class TDTestCase:
#tdSql.checkRows(1)
#
# # where json key is null
# open
#tdSql.query("select * from jsons1 where jtag->'tag_no_exist'=3")
#tdSql.checkRows(0)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag_no_exist'=3"
)
tdSql
.
checkRows
(
0
)
#
# # where json value is not exist
#
tdSql.query("select * from jsons1 where jtag->'tag1' is null")
#
tdSql.checkData(0, 0, 'jsons1_9')
#tdSql.checkRows(1
)
#
tdSql.query("select * from jsons1 where jtag->'tag4' is null")
#
tdSql.checkRows(9)
#
tdSql.query("select * from jsons1 where jtag->'tag3' is not null")
#tdSql.checkRows(4
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1' is null"
)
tdSql
.
checkData
(
0
,
0
,
'jsons1_9'
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag4' is null"
)
tdSql
.
checkRows
(
9
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag3' is not null"
)
tdSql
.
checkRows
(
3
)
#
# # test contains
tdSql
.
query
(
"select * from jsons1 where jtag contains 'tag1'"
)
...
...
@@ -344,10 +338,10 @@ class TDTestCase:
#
#
# # test with between and
#
tdSql.query("select * from jsons1 where jtag->'tag1' between 1 and 30")
#
tdSql.checkRows(3)
#
tdSql.query("select * from jsons1 where jtag->'tag1' between 'femail' and 'beijing'")
#
tdSql.checkRows(2)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1' between 1 and 30"
)
tdSql
.
checkRows
(
3
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1' between 'femail' and 'beijing'"
)
tdSql
.
checkRows
(
2
)
#
# # test with tbname/normal column
tdSql
.
query
(
"select * from jsons1 where tbname = 'jsons1_1'"
)
...
...
@@ -362,6 +356,7 @@ class TDTestCase:
#
# # test where condition like
# open
# syntax error
#tdSql.query("select *,tbname from jsons1 where jtag->'tag2' like 'bei%'")
#tdSql.checkRows(2)
#tdSql.query("select *,tbname from jsons1 where jtag->'tag1' like 'fe%' and jtag->'tag2' is not null")
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录