Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e5606ccf
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e5606ccf
编写于
2月 28, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/config
上级
9a9e87ec
c35a3fa3
变更
33
展开全部
隐藏空白更改
内联
并排
Showing
33 changed file
with
3124 addition
and
1318 deletion
+3124
-1318
2.0/src/query/inc/qExecutor.h
2.0/src/query/inc/qExecutor.h
+1
-1
2.0/src/query/src/qExecutor.c
2.0/src/query/src/qExecutor.c
+2
-2
include/common/tep.h
include/common/tep.h
+8
-9
include/libs/executor/executor.h
include/libs/executor/executor.h
+13
-2
include/libs/function/function.h
include/libs/function/function.h
+3
-1
include/libs/planner/plannerOp.h
include/libs/planner/plannerOp.h
+1
-0
include/util/tarray.h
include/util/tarray.h
+2
-2
include/util/tpagedbuf.h
include/util/tpagedbuf.h
+36
-12
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+3
-1
source/common/src/tep.c
source/common/src/tep.c
+52
-15
source/common/test/commonTests.cpp
source/common/test/commonTests.cpp
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+1
-1
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+5
-4
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+54
-48
source/libs/executor/inc/tlinearhash.h
source/libs/executor/inc/tlinearhash.h
+44
-0
source/libs/executor/inc/tsort.h
source/libs/executor/inc/tsort.h
+137
-0
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+3
-4
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+22
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+716
-941
source/libs/executor/src/tlinearhash.c
source/libs/executor/src/tlinearhash.c
+427
-0
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+679
-0
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+293
-46
source/libs/executor/test/lhashTests.cpp
source/libs/executor/test/lhashTests.cpp
+65
-0
source/libs/executor/test/sortTests.cpp
source/libs/executor/test/sortTests.cpp
+280
-0
source/libs/function/src/taggfunction.c
source/libs/function/src/taggfunction.c
+4
-3
source/libs/function/src/tpercentile.c
source/libs/function/src/tpercentile.c
+4
-4
source/libs/parser/src/queryInfoUtil.c
source/libs/parser/src/queryInfoUtil.c
+7
-3
source/libs/scalar/inc/filterInt.h
source/libs/scalar/inc/filterInt.h
+1
-1
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+7
-6
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+1
-1
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+208
-168
source/util/test/pageBufferTest.cpp
source/util/test/pageBufferTest.cpp
+41
-41
未找到文件。
2.0/src/query/inc/qExecutor.h
浏览文件 @
e5606ccf
...
...
@@ -589,7 +589,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRunt
SOperatorInfo
*
createAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createProjectOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
);
SOperatorInfo
*
create
Time
IntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSWindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createFillOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
bool
multigroupResult
);
...
...
2.0/src/query/src/qExecutor.c
浏览文件 @
e5606ccf
...
...
@@ -2166,7 +2166,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
case
OP_TimeWindow
:
{
pRuntimeEnv
->
proot
=
create
Time
IntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
if
(
opType
!=
OP_DummyInput
&&
opType
!=
OP_Join
)
{
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
...
...
@@ -6756,7 +6756,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return
pOperator
;
}
SOperatorInfo
*
create
Time
IntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
...
...
include/common/tep.h
浏览文件 @
e5606ccf
...
...
@@ -18,9 +18,6 @@ typedef struct SBlockOrderInfo {
int32_t
order
;
int32_t
colIndex
;
SColumnInfoData
*
pColData
;
// int32_t type;
// int32_t bytes;
// bool hasNull;
}
SBlockOrderInfo
;
int
taosGetFqdnPortFromEp
(
const
char
*
ep
,
SEp
*
pEp
);
...
...
@@ -67,16 +64,16 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
}
}
#define colDataGet(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (
(p1_)->pData + (p1_)->varmeta.offset[(r_)])
\
: (
(p1_)->pData + ((r_) * (p1_)->info.bytes)
))
#define colDataGet
Data
(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (
p1_)->pData + (p1_)->varmeta.offset[(r_)]
\
: (
p1_)->pData + ((r_) * (p1_)->info.bytes
))
int32_t
colDataAppend
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
);
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
numOfRow1
,
const
SColumnInfoData
*
pSource
,
uint32_t
numOfRow2
);
int32_t
blockDataUpdateTsWindow
(
SSDataBlock
*
pDataBlock
);
int32_t
colDataGet
Size
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
);
void
colDataTrim
(
SColumnInfoData
*
pColumnInfoData
);
int32_t
colDataGet
Length
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
);
void
colDataTrim
(
SColumnInfoData
*
pColumnInfoData
);
size_t
colDataGetNumOfCols
(
const
SSDataBlock
*
pBlock
);
size_t
colDataGetNumOfRows
(
const
SSDataBlock
*
pBlock
);
...
...
@@ -93,13 +90,15 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock);
double
blockDataGetSerialRowSize
(
const
SSDataBlock
*
pBlock
);
size_t
blockDataGetSerialMetaSize
(
const
SSDataBlock
*
pBlock
);
size_t
blockDataNumOfRowsForSerialize
(
const
SSDataBlock
*
pBlock
,
int32_t
blockSize
);
SSchema
*
blockDataExtractSchema
(
const
SSDataBlock
*
pBlock
,
int32_t
*
numOfCols
);
int32_t
blockDataSort
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
);
int32_t
blockDataSort_rv
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
);
int32_t
blockDataEnsureCapacity
(
SSDataBlock
*
pDataBlock
,
uint32_t
numOfRows
);
void
blockDataClearup
(
SSDataBlock
*
pDataBlock
,
bool
hasVarCol
);
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
);
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
);
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
#ifdef __cplusplus
...
...
include/libs/executor/executor.h
浏览文件 @
e5606ccf
...
...
@@ -31,6 +31,7 @@ typedef struct SReadHandle {
void
*
reader
;
void
*
meta
;
}
SReadHandle
;
/**
* Create the exec task for streaming mode
* @param pMsg
...
...
@@ -40,12 +41,22 @@ typedef struct SReadHandle {
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
void
*
streamReadHandle
);
/**
*
*
Set the input data block for the stream scan.
* @param tinfo
* @param input
* @return
*/
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
);
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
);
/**
* Update the table id list, add or remove.
*
* @param tinfo
* @param id
* @param isAdd
* @return
*/
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
SArray
*
tableIdList
,
bool
isAdd
);
/**
* Create the exec task object according to task json
...
...
include/libs/function/function.h
浏览文件 @
e5606ccf
...
...
@@ -138,8 +138,10 @@ extern SFunctionFpSet fpSet[1];
// sql function runtime context
typedef
struct
SqlFunctionCtx
{
int32_t
startRow
;
int32_t
size
;
// number of rows
void
*
pInput
;
// input data buffer
SColumnInfoData
*
pInput
;
uint32_t
order
;
// asc|desc
int16_t
inputType
;
int16_t
inputBytes
;
...
...
include/libs/planner/plannerOp.h
浏览文件 @
e5606ccf
...
...
@@ -47,5 +47,6 @@ OP_ENUM_MACRO(AllTimeWindow)
OP_ENUM_MACRO
(
AllMultiTableTimeInterval
)
OP_ENUM_MACRO
(
Order
)
OP_ENUM_MACRO
(
Exchange
)
OP_ENUM_MACRO
(
SortedMerge
)
//OP_ENUM_MACRO(TableScan)
include/util/tarray.h
浏览文件 @
e5606ccf
...
...
@@ -42,8 +42,8 @@ extern "C" {
typedef
struct
SArray
{
size_t
size
;
size
_t
capacity
;
size
_t
elemSize
;
uint32
_t
capacity
;
uint32
_t
elemSize
;
void
*
pData
;
}
SArray
;
...
...
include/util/tpagedbuf.h
浏览文件 @
e5606ccf
...
...
@@ -29,10 +29,9 @@ typedef struct SPageInfo SPageInfo;
typedef
struct
SDiskbasedBuf
SDiskbasedBuf
;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define DEFAULT_PAGE_SIZE (16384L)
typedef
struct
SFilePage
{
int
64
_t
num
;
int
32
_t
num
;
char
data
[];
}
SFilePage
;
...
...
@@ -54,8 +53,7 @@ typedef struct SDiskbasedBufStatis {
* @param handle
* @return
*/
int32_t
createDiskbasedBuffer
(
SDiskbasedBuf
**
pBuf
,
int32_t
pagesize
,
int32_t
inMemBufSize
,
uint64_t
qId
,
const
char
*
dir
);
int32_t
createDiskbasedBuf
(
SDiskbasedBuf
**
pBuf
,
int32_t
pagesize
,
int32_t
inMemBufSize
,
uint64_t
qId
,
const
char
*
dir
);
/**
*
...
...
@@ -64,7 +62,7 @@ int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t in
* @param pageId
* @return
*/
SFilePage
*
getNewDataBuf
(
SDiskbasedBuf
*
pBuf
,
int32_t
groupId
,
int32_t
*
pageId
);
void
*
getNewBufPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
groupId
,
int32_t
*
pageId
);
/**
*
...
...
@@ -80,7 +78,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId);
* @param id
* @return
*/
SFilePage
*
getBufPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
id
);
void
*
getBufPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
id
);
/**
* release the referenced buf pages
...
...
@@ -108,13 +106,13 @@ size_t getTotalBufSize(const SDiskbasedBuf* pBuf);
* @param pBuf
* @return
*/
size_t
getNumOf
Result
BufGroupId
(
const
SDiskbasedBuf
*
pBuf
);
size_t
getNumOfBufGroupId
(
const
SDiskbasedBuf
*
pBuf
);
/**
* destroy result buffer
* @param pBuf
*/
void
destroy
Result
Buf
(
SDiskbasedBuf
*
pBuf
);
void
destroy
Diskbased
Buf
(
SDiskbasedBuf
*
pBuf
);
/**
*
...
...
@@ -137,6 +135,11 @@ int32_t getPageId(const SPageInfo* pPgInfo);
*/
int32_t
getBufPageSize
(
const
SDiskbasedBuf
*
pBuf
);
/**
*
* @param pBuf
* @return
*/
int32_t
getNumOfInMemBufPages
(
const
SDiskbasedBuf
*
pBuf
);
/**
...
...
@@ -148,22 +151,43 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf);
/**
* Set the buffer page is dirty, and needs to be flushed to disk when swap out.
* @param pPage
Info
* @param pPage
* @param dirty
*/
void
setBufPageDirty
(
SFilePage
*
pPageInfo
,
bool
dirty
);
void
setBufPageDirty
(
void
*
pPage
,
bool
dirty
);
/**
* Set the compress/ no-compress flag for paged buffer, when flushing data in disk.
* @param pBuf
*/
void
setBufPageCompressOnDisk
(
SDiskbasedBuf
*
pBuf
,
bool
comp
);
/**
* Set the pageId page buffer is not need
* @param pBuf
* @param pageId
*/
void
dBufSetBufPageRecycled
(
SDiskbasedBuf
*
pBuf
,
void
*
pPage
);
/**
* Print the statistics when closing this buffer
* @param pBuf
*/
void
printStatisBeforeClose
(
SDiskbasedBuf
*
pBuf
);
void
dBufSetPrintInfo
(
SDiskbasedBuf
*
pBuf
);
/**
* return buf statistics.
* Return buf statistics.
* @param pBuf
* @return
*/
SDiskbasedBufStatis
getDBufStatis
(
const
SDiskbasedBuf
*
pBuf
);
/**
* Print the buffer statistics information
* @param pBuf
*/
void
dBufPrintStatis
(
const
SDiskbasedBuf
*
pBuf
);
#ifdef __cplusplus
}
#endif
...
...
source/client/test/clientTests.cpp
浏览文件 @
e5606ccf
...
...
@@ -53,8 +53,10 @@ TEST(testCase, driverInit_Test) {
// taos_init();
}
#if
0
#if
1
TEST
(
testCase
,
connect_Test
)
{
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
printf
(
"failed to connect to server, reason:%s
\n
"
,
taos_errstr
(
NULL
));
...
...
source/common/src/tep.c
浏览文件 @
e5606ccf
...
...
@@ -63,7 +63,7 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) {
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
int32_t
colDataGet
Size
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
)
{
int32_t
colDataGet
Length
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
)
{
ASSERT
(
pColumnInfoData
!=
NULL
);
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
return
pColumnInfoData
->
varmeta
.
length
;
...
...
@@ -127,6 +127,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
case
TSDB_DATA_TYPE_USMALLINT
:
{
*
(
int16_t
*
)
p
=
*
(
int16_t
*
)
pData
;
break
;}
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
{
*
(
int32_t
*
)
p
=
*
(
int32_t
*
)
pData
;
break
;}
case
TSDB_DATA_TYPE_TIMESTAMP
:
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
{
*
(
int64_t
*
)
p
=
*
(
int64_t
*
)
pData
;
break
;}
default:
...
...
@@ -249,8 +250,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
}
ASSERT
(
pColInfoData
->
nullbitmap
==
NULL
);
pDataBlock
->
info
.
window
.
skey
=
*
(
TSKEY
*
)
colDataGet
(
pColInfoData
,
0
);
pDataBlock
->
info
.
window
.
ekey
=
*
(
TSKEY
*
)
colDataGet
(
pColInfoData
,
(
pDataBlock
->
info
.
rows
-
1
));
pDataBlock
->
info
.
window
.
skey
=
*
(
TSKEY
*
)
colDataGet
Data
(
pColInfoData
,
0
);
pDataBlock
->
info
.
window
.
ekey
=
*
(
TSKEY
*
)
colDataGet
Data
(
pColInfoData
,
(
pDataBlock
->
info
.
rows
-
1
));
return
0
;
}
...
...
@@ -262,8 +263,8 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
SColumnInfoData
*
pCol2
=
taosArrayGet
(
pDest
->
pDataBlock
,
i
);
SColumnInfoData
*
pCol1
=
taosArrayGet
(
pSrc
->
pDataBlock
,
i
);
uint32_t
oldLen
=
colDataGet
Size
(
pCol2
,
pDest
->
info
.
rows
);
uint32_t
newLen
=
colDataGet
Size
(
pCol1
,
pSrc
->
info
.
rows
);
uint32_t
oldLen
=
colDataGet
Length
(
pCol2
,
pDest
->
info
.
rows
);
uint32_t
newLen
=
colDataGet
Length
(
pCol1
,
pSrc
->
info
.
rows
);
int32_t
newSize
=
oldLen
+
newLen
;
char
*
tmp
=
realloc
(
pCol2
->
pData
,
newSize
);
...
...
@@ -287,7 +288,7 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) {
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
total
+=
colDataGet
Size
(
pColInfoData
,
pBlock
->
info
.
rows
);
total
+=
colDataGet
Length
(
pColInfoData
,
pBlock
->
info
.
rows
);
if
(
IS_VAR_DATA_TYPE
(
pColInfoData
->
info
.
type
))
{
total
+=
sizeof
(
int32_t
)
*
pBlock
->
info
.
rows
;
...
...
@@ -336,7 +337,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
if
(
isNull
)
{
// do nothing
}
else
{
char
*
p
=
colDataGet
(
pColInfoData
,
j
);
char
*
p
=
colDataGet
Data
(
pColInfoData
,
j
);
size
+=
varDataTLen
(
p
);
}
...
...
@@ -401,7 +402,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
for
(
int32_t
j
=
startIndex
;
j
<
(
startIndex
+
rowCount
);
++
j
)
{
bool
isNull
=
colDataIsNull
(
pColData
,
pBlock
->
info
.
rows
,
j
,
pBlock
->
pBlockAgg
);
char
*
p
=
colDataGet
(
pColData
,
j
);
char
*
p
=
colDataGet
Data
(
pColData
,
j
);
colDataAppend
(
pDstCol
,
j
-
startIndex
,
p
,
isNull
);
}
...
...
@@ -411,7 +412,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
return
pDst
;
}
/**
*
* +------------------+---------------+--------------------+
...
...
@@ -444,7 +444,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
pStart
+=
BitmapLen
(
pBlock
->
info
.
rows
);
}
uint32_t
dataSize
=
colDataGet
Size
(
pCol
,
numOfRows
);
uint32_t
dataSize
=
colDataGet
Length
(
pCol
,
numOfRows
);
*
(
int32_t
*
)
pStart
=
dataSize
;
pStart
+=
sizeof
(
int32_t
);
...
...
@@ -522,6 +522,22 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return
sizeof
(
int32_t
)
+
pBlock
->
info
.
numOfCols
*
sizeof
(
int32_t
);
}
SSchema
*
blockDataExtractSchema
(
const
SSDataBlock
*
pBlock
,
int32_t
*
numOfCols
)
{
SSchema
*
pSchema
=
calloc
(
pBlock
->
info
.
numOfCols
,
sizeof
(
SSchema
));
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
pSchema
[
i
].
bytes
=
pColInfoData
->
info
.
bytes
;
pSchema
[
i
].
type
=
pColInfoData
->
info
.
type
;
pSchema
[
i
].
colId
=
pColInfoData
->
info
.
colId
;
}
if
(
numOfCols
!=
NULL
)
{
*
numOfCols
=
pBlock
->
info
.
numOfCols
;
}
return
pSchema
;
}
double
blockDataGetSerialRowSize
(
const
SSDataBlock
*
pBlock
)
{
ASSERT
(
pBlock
!=
NULL
);
double
rowSize
=
0
;
...
...
@@ -577,8 +593,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
}
}
void
*
left1
=
colDataGet
(
pColInfoData
,
left
);
void
*
right1
=
colDataGet
(
pColInfoData
,
right
);
void
*
left1
=
colDataGet
Data
(
pColInfoData
,
left
);
void
*
right1
=
colDataGet
Data
(
pColInfoData
,
right
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
...
...
@@ -617,7 +633,7 @@ static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, co
return
code
;
}
}
else
{
char
*
p
=
colDataGet
(
pSrc
,
tupleIndex
);
char
*
p
=
colDataGet
Data
(
pSrc
,
tupleIndex
);
code
=
colDataAppend
(
pDst
,
numOfRows
,
p
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -956,8 +972,8 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
// }
// }
// void* left1 = colDataGet(pColInfoData, left);
// void* right1 = colDataGet(pColInfoData, right);
// void* left1 = colDataGet
Data
(pColInfoData, left);
// void* right1 = colDataGet
Data
(pColInfoData, right);
// switch(pColInfoData->info.type) {
// case TSDB_DATA_TYPE_INT: {
...
...
@@ -1098,4 +1114,25 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
tfree
(
pBlock
->
pBlockAgg
);
tfree
(
pBlock
);
return
NULL
;
}
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
)
{
int32_t
numOfCols
=
pDataBlock
->
info
.
numOfCols
;
SSDataBlock
*
pBlock
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
pBlock
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
pBlock
->
info
.
numOfCols
=
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{
0
};
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
colInfo
.
info
=
p
->
info
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfo
);
}
return
pBlock
;
}
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
)
{
return
pageSize
/
(
blockDataGetSerialRowSize
(
pBlock
)
+
blockDataGetSerialMetaSize
(
pBlock
));
}
\ No newline at end of file
source/common/test/commonTests.cpp
浏览文件 @
e5606ccf
...
...
@@ -162,7 +162,7 @@ TEST(testCase, Datablock_test) {
ASSERT_EQ
(
colDataGetNumOfCols
(
b
),
2
);
ASSERT_EQ
(
colDataGetNumOfRows
(
b
),
40
);
char
*
pData
=
colDataGet
(
p1
,
3
);
char
*
pData
=
colDataGet
Data
(
p1
,
3
);
printf
(
"the second row of binary:%s, length:%d
\n
"
,
(
char
*
)
varDataVal
(
pData
),
varDataLen
(
pData
));
SArray
*
pOrderInfo
=
taosArrayInit
(
3
,
sizeof
(
SBlockOrderInfo
));
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
e5606ccf
...
...
@@ -216,13 +216,15 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA
static
FORCE_INLINE
int
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
pHandle
->
tbIdHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
if
(
pHandle
->
tbIdHash
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pHandle
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
),
NULL
,
0
);
// pHandle->tbUid = tbUid;
}
return
0
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
e5606ccf
...
...
@@ -88,7 +88,7 @@ typedef struct STableCheckInfo {
int32_t
compSize
;
int32_t
numOfBlocks
:
29
;
// number of qualified data blocks not the original blocks
uint8_t
chosen
:
2
;
// indicate which iterator should move forward
bool
initBuf
;
// whether to initialize the in-memory skip list iterator or not
bool
initBuf
:
1
;
// whether to initialize the in-memory skip list iterator or not
SSkipListIterator
*
iter
;
// mem buffer skip list iterator
SSkipListIterator
*
iiter
;
// imem buffer skip list iterator
}
STableCheckInfo
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
e5606ccf
...
...
@@ -68,9 +68,10 @@ typedef struct SResultRow {
}
SResultRow
;
typedef
struct
SResultRowInfo
{
SResultRow
*
pCurResult
;
// current active result row info
SResultRow
**
pResult
;
// result list
int16_t
type
:
8
;
// data type for hash key
int32_t
size
:
24
;
// number of result set
//
int16_t type:8; // data type for hash key
int32_t
size
;
// number of result set
int32_t
capacity
;
// max capacity
int32_t
curPos
;
// current active result row index of pResult list
}
SResultRowInfo
;
...
...
@@ -95,7 +96,7 @@ struct SUdfInfo;
int32_t
getOutputInterResultBufSize
(
struct
STaskAttr
*
pQueryAttr
);
size_t
getResultRowSize
(
SArray
*
pExprInfo
);
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
);
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
resetResultRowInfo
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
...
...
@@ -105,7 +106,7 @@ void closeAllResultRows(SResultRowInfo* pResultRowInfo);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
clearResultRow
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
);
void
clearResultRow
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
e5606ccf
...
...
@@ -15,6 +15,7 @@
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H
#include "tsort.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -444,16 +445,32 @@ typedef struct SOptrBasicInfo {
int32_t
capacity
;
}
SOptrBasicInfo
;
typedef
struct
SOptrBasicInfo
STableIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
typedef
struct
SAggSupporter
{
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
SHashObj
*
pResultRowListSet
;
// used to check if current ResultRowInfo has ResultRow object or not
SArray
*
pResultRowArrayList
;
// The array list that contains the Result rows
char
*
keyBuf
;
// window key buffer
SResultRowPool
*
pool
;
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
}
SAggSupporter
;
typedef
struct
STableIntervalOperatorInfo
{
SOptrBasicInfo
binfo
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SGroupResInfo
groupResInfo
;
SInterval
interval
;
STimeWindow
win
;
int32_t
precision
;
bool
timeWindowInterpo
;
char
**
pRow
;
SAggSupporter
aggSup
;
STableQueryInfo
*
pCurrent
;
int32_t
order
;
}
STableIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SAggSupporter
aggSup
;
STableQueryInfo
*
current
;
uint32_t
groupId
;
SGroupResInfo
groupResInfo
;
...
...
@@ -549,49 +566,42 @@ typedef struct SDistinctOperatorInfo {
SArray
*
pDistinctDataInfo
;
}
SDistinctOperatorInfo
;
struct
SGlobalMerger
;
typedef
struct
SMultiwayMergeInfo
{
struct
SGlobalMerger
*
pMerge
;
SOptrBasicInfo
binfo
;
int32_t
bufCapacity
;
int64_t
seed
;
char
**
prevRow
;
SArray
*
orderColumnList
;
int32_t
resultRowFactor
;
bool
hasGroupColData
;
char
**
currentGroupColData
;
SArray
*
groupColumnList
;
bool
hasDataBlockForNewGroup
;
SSDataBlock
*
pExistBlock
;
SArray
*
udfInfo
;
bool
hasPrev
;
bool
multiGroupResults
;
}
SMultiwayMergeInfo
;
typedef
struct
SMsortComparParam
{
struct
SExternalMemSource
**
pSources
;
int32_t
numOfSources
;
SArray
*
orderInfo
;
// SArray<SBlockOrderInfo>
bool
nullFirst
;
}
SMsortComparParam
;
typedef
struct
SSortedMergeOperatorInfo
{
SOptrBasicInfo
binfo
;
bool
hasVarCol
;
SArray
*
orderInfo
;
// SArray<SBlockOrderInfo>
bool
nullFirst
;
int32_t
numOfSources
;
SSortHandle
*
pSortHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
int32_t
resultRowFactor
;
bool
hasGroupVal
;
SDiskbasedBuf
*
pTupleStore
;
// keep the final results
int32_t
numOfResPerPage
;
char
**
groupVal
;
SArray
*
groupInfo
;
SAggSupporter
aggSup
;
}
SSortedMergeOperatorInfo
;
typedef
struct
SOrderOperatorInfo
{
int32_t
sourceId
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SSDataBlock
*
pDataBlock
;
bool
hasVarCol
;
// has variable length column, such as binary/varchar/nchar
int32_t
numOfCompleted
;
SDiskbasedBuf
*
pSortInternalBuf
;
S
MultiwayMergeTreeInfo
*
pMergeTre
e
;
SArray
*
pSources
;
// SArray<SExternalMemSource*>
SArray
*
orderInfo
;
bool
nullFirst
;
S
SortHandle
*
pSortHandl
e
;
int32_t
bufPageSize
;
int32_t
numOfRowsInRes
;
SMsortComparParam
cmpParam
;
// TODO extact struct
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
uint64_t
totalSize
;
// total load bytes from remote
...
...
@@ -608,8 +618,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
SOperatorInfo
*
createProjectOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createLimitOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
);
SOperatorInfo
*
create
TimeIntervalOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
create
IntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SArray
*
pExprInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSWindowOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
...
...
@@ -641,9 +651,8 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo
*
createJoinOperatorInfo
(
SOperatorInfo
**
pdownstream
,
int32_t
numOfDownstream
,
SSchema
*
pSchema
,
int32_t
numOfOutput
);
SOperatorInfo
*
createOrderOperatorInfo
(
SOperatorInfo
*
downstream
,
SArray
*
pExprInfo
,
SArray
*
pOrderVal
);
SOperatorInfo
*
createMergeSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrder
*
pOrderVal
);
SOperatorInfo
*
createOrderOperatorInfo
(
SOperatorInfo
*
downstream
,
SArray
*
pExprInfo
,
SArray
*
pOrderVal
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SArray
*
pExprInfo
,
SArray
*
pOrderVal
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
...
...
@@ -691,9 +700,6 @@ int32_t checkForQueryBuf(size_t numOfTables);
bool
checkNeedToCompressQueryCol
(
SQInfo
*
pQInfo
);
void
setQueryStatus
(
STaskRuntimeEnv
*
pRuntimeEnv
,
int8_t
status
);
bool
onlyQueryTags
(
STaskAttr
*
pQueryAttr
);
// void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
,
int8_t
compressed
,
int32_t
*
compLen
);
size_t
getResultSize
(
SQInfo
*
pQInfo
,
int64_t
*
numOfRows
);
...
...
source/libs/executor/inc/tlinearhash.h
0 → 100644
浏览文件 @
e5606ccf
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TLINEARHASH_H
#define TDENGINE_TLINEARHASH_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "thash.h"
enum
{
LINEAR_HASH_STATIS
=
0x1
,
LINEAR_HASH_DATA
=
0x2
,
};
typedef
struct
SLHashObj
SLHashObj
;
SLHashObj
*
tHashInit
(
int32_t
inMemPages
,
int32_t
pageSize
,
_hash_fn_t
fn
,
int32_t
numOfTuplePerPage
);
void
*
tHashCleanup
(
SLHashObj
*
pHashObj
);
int32_t
tHashPut
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
*
data
,
size_t
size
);
char
*
tHashGet
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
);
int32_t
tHashRemove
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
);
void
tHashPrint
(
const
SLHashObj
*
pHashObj
,
int32_t
type
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TLINEARHASH_H
source/libs/executor/inc/tsort.h
0 → 100644
浏览文件 @
e5606ccf
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSORT_H
#define TDENGINE_TSORT_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "common.h"
#include "os.h"
enum
{
SORT_MULTISOURCE_MERGE
=
0x1
,
SORT_SINGLESOURCE_SORT
=
0x2
,
};
typedef
struct
SMultiMergeSource
{
int32_t
type
;
int32_t
rowIndex
;
SSDataBlock
*
pBlock
;
}
SMultiMergeSource
;
typedef
struct
SExternalMemSource
{
SMultiMergeSource
src
;
SArray
*
pageIdList
;
int32_t
pageIndex
;
}
SExternalMemSource
;
typedef
struct
SGenericSource
{
SMultiMergeSource
src
;
void
*
param
;
}
SGenericSource
;
typedef
struct
SMsortComparParam
{
void
**
pSources
;
int32_t
numOfSources
;
SArray
*
orderInfo
;
// SArray<SBlockOrderInfo>
bool
nullFirst
;
}
SMsortComparParam
;
typedef
struct
SSortHandle
SSortHandle
;
typedef
struct
STupleHandle
STupleHandle
;
typedef
SSDataBlock
*
(
*
_sort_fetch_block_fn_t
)(
void
*
param
);
typedef
int32_t
(
*
_sort_merge_compar_fn_t
)(
const
void
*
p1
,
const
void
*
p2
,
void
*
param
);
/**
*
* @param type
* @return
*/
SSortHandle
*
tsortCreateSortHandle
(
SArray
*
pOrderInfo
,
bool
nullFirst
,
int32_t
type
,
int32_t
pageSize
,
int32_t
numOfPages
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
const
char
*
idstr
);
/**
*
* @param pSortHandle
*/
void
tsortDestroySortHandle
(
SSortHandle
*
pSortHandle
);
/**
*
* @param pHandle
* @return
*/
int32_t
tsortOpen
(
SSortHandle
*
pHandle
);
/**
*
* @param pHandle
* @return
*/
int32_t
tsortClose
(
SSortHandle
*
pHandle
);
/**
*
* @return
*/
int32_t
tsortSetFetchRawDataFp
(
SSortHandle
*
pHandle
,
_sort_fetch_block_fn_t
fp
);
/**
*
* @param pHandle
* @param fp
* @return
*/
int32_t
tsortSetComparFp
(
SSortHandle
*
pHandle
,
_sort_merge_compar_fn_t
fp
);
/**
*
* @param pHandle
* @param pSource
* @return success or failed
*/
int32_t
tsortAddSource
(
SSortHandle
*
pSortHandle
,
void
*
pSource
);
/**
*
* @param pHandle
* @return
*/
STupleHandle
*
tsortNextTuple
(
SSortHandle
*
pHandle
);
/**
*
* @param pHandle
* @param colIndex
* @return
*/
bool
tsortIsNullVal
(
STupleHandle
*
pVHandle
,
int32_t
colIndex
);
/**
*
* @param pHandle
* @param colIndex
* @return
*/
void
*
tsortGetValue
(
STupleHandle
*
pVHandle
,
int32_t
colIndex
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSORT_H
source/libs/executor/src/executil.c
浏览文件 @
e5606ccf
...
...
@@ -53,8 +53,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
return
size
;
}
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
)
{
pResultRowInfo
->
type
=
type
;
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
)
{
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
curPos
=
-
1
;
pResultRowInfo
->
capacity
=
size
;
...
...
@@ -93,7 +92,7 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
SResultRow
*
pWindowRes
=
pResultRowInfo
->
pResult
[
i
];
clearResultRow
(
pRuntimeEnv
,
pWindowRes
,
pResultRowInfo
->
type
);
clearResultRow
(
pRuntimeEnv
,
pWindowRes
);
int32_t
groupIndex
=
0
;
int64_t
uid
=
0
;
...
...
@@ -136,7 +135,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
=
true
;
}
void
clearResultRow
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
)
{
void
clearResultRow
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
)
{
if
(
pResultRow
==
NULL
)
{
return
;
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
e5606ccf
...
...
@@ -93,3 +93,25 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return
pTaskInfo
;
}
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
SArray
*
tableIdList
,
bool
isAdd
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
// traverse to the streamscan node to add this table id
SOperatorInfo
*
pInfo
=
pTaskInfo
->
pRoot
;
while
(
pInfo
->
operatorType
!=
OP_StreamScan
)
{
pInfo
=
pInfo
->
pDownstream
[
0
];
}
SStreamBlockScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
int32_t
code
=
tqReadHandleSetTbUidList
(
pScanInfo
->
readerHandle
,
tableIdList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
else
{
assert
(
0
);
}
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/libs/executor/src/executorimpl.c
浏览文件 @
e5606ccf
此差异已折叠。
点击以展开。
source/libs/executor/src/tlinearhash.c
0 → 100644
浏览文件 @
e5606ccf
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tlinearhash.h"
#include "tcfg.h"
#include "taoserror.h"
#include "tpagedbuf.h"
#define LHASH_CAP_RATIO 0.85
// Always located in memory
typedef
struct
SLHashBucket
{
SArray
*
pPageIdList
;
int32_t
size
;
// the number of element in this entry
}
SLHashBucket
;
typedef
struct
SLHashObj
{
SDiskbasedBuf
*
pBuf
;
_hash_fn_t
hashFn
;
int32_t
tuplesPerPage
;
SLHashBucket
**
pBucket
;
// entry list
int32_t
numOfAlloc
;
// number of allocated bucket ptr slot
int32_t
bits
;
// the number of bits used in hash
int32_t
numOfBuckets
;
// the number of buckets
int64_t
size
;
// the number of total items
}
SLHashObj
;
/**
* the data struct for each hash node
* +-----------+-------+--------+
* | SLHashNode| key | data |
* +-----------+-------+--------+
*/
typedef
struct
SLHashNode
{
uint16_t
keyLen
;
uint16_t
dataLen
;
}
SLHashNode
;
#define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode))
#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen)
#define GET_LHASH_NODE_LEN(_n) (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen)
static
int32_t
doAddNewBucket
(
SLHashObj
*
pHashObj
);
static
int32_t
doGetBucketIdFromHashVal
(
int32_t
hashv
,
int32_t
bits
)
{
return
hashv
&
((
1ul
<<
(
bits
))
-
1
);
}
static
int32_t
doGetAlternativeBucketId
(
int32_t
bucketId
,
int32_t
bits
,
int32_t
numOfBuckets
)
{
int32_t
v
=
bucketId
-
(
1ul
<<
(
bits
-
1
));
ASSERT
(
v
<
numOfBuckets
);
return
v
;
}
static
int32_t
doGetRelatedSplitBucketId
(
int32_t
bucketId
,
int32_t
bits
)
{
int32_t
splitBucketId
=
(
1ul
<<
(
bits
-
1
))
^
bucketId
;
return
splitBucketId
;
}
static
void
doCopyObject
(
char
*
p
,
const
void
*
key
,
int32_t
keyLen
,
const
void
*
data
,
int32_t
size
)
{
*
(
uint16_t
*
)
p
=
keyLen
;
p
+=
sizeof
(
uint16_t
);
*
(
uint16_t
*
)
p
=
size
;
p
+=
sizeof
(
uint16_t
);
memcpy
(
p
,
key
,
keyLen
);
p
+=
keyLen
;
memcpy
(
p
,
data
,
size
);
}
static
int32_t
doAddToBucket
(
SLHashObj
*
pHashObj
,
SLHashBucket
*
pBucket
,
int32_t
index
,
const
void
*
key
,
int32_t
keyLen
,
const
void
*
data
,
int32_t
size
)
{
int32_t
pageId
=
*
(
int32_t
*
)
taosArrayGetLast
(
pBucket
->
pPageIdList
);
SFilePage
*
pPage
=
getBufPage
(
pHashObj
->
pBuf
,
pageId
);
ASSERT
(
pPage
!=
NULL
);
// put to current buf page
size_t
nodeSize
=
sizeof
(
SLHashNode
)
+
keyLen
+
size
;
ASSERT
(
nodeSize
+
sizeof
(
SFilePage
)
<=
getBufPageSize
(
pHashObj
->
pBuf
));
if
(
pPage
->
num
+
nodeSize
>
getBufPageSize
(
pHashObj
->
pBuf
))
{
releaseBufPage
(
pHashObj
->
pBuf
,
pPage
);
// allocate the overflow buffer page to hold this k/v.
int32_t
newPageId
=
-
1
;
SFilePage
*
pNewPage
=
getNewBufPage
(
pHashObj
->
pBuf
,
0
,
&
newPageId
);
if
(
pNewPage
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
taosArrayPush
(
pBucket
->
pPageIdList
,
&
newPageId
);
doCopyObject
(
pNewPage
->
data
,
key
,
keyLen
,
data
,
size
);
pNewPage
->
num
=
sizeof
(
SFilePage
)
+
nodeSize
;
setBufPageDirty
(
pNewPage
,
true
);
releaseBufPage
(
pHashObj
->
pBuf
,
pNewPage
);
}
else
{
char
*
p
=
(
char
*
)
pPage
+
pPage
->
num
;
doCopyObject
(
p
,
key
,
keyLen
,
data
,
size
);
pPage
->
num
+=
nodeSize
;
setBufPageDirty
(
pPage
,
true
);
releaseBufPage
(
pHashObj
->
pBuf
,
pPage
);
}
pBucket
->
size
+=
1
;
// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
return
TSDB_CODE_SUCCESS
;
}
static
void
doRemoveFromBucket
(
SFilePage
*
pPage
,
SLHashNode
*
pNode
,
SLHashBucket
*
pBucket
)
{
ASSERT
(
pPage
!=
NULL
&&
pNode
!=
NULL
&&
pBucket
->
size
>=
1
);
int32_t
len
=
GET_LHASH_NODE_LEN
(
pNode
);
char
*
p
=
(
char
*
)
pNode
+
len
;
char
*
pEnd
=
(
char
*
)
pPage
+
pPage
->
num
;
memmove
(
pNode
,
p
,
(
pEnd
-
p
));
pPage
->
num
-=
len
;
if
(
pPage
->
num
==
0
)
{
// this page is empty, could be recycle in the future.
}
setBufPageDirty
(
pPage
,
true
);
pBucket
->
size
-=
1
;
}
static
void
doCompressBucketPages
(
SLHashObj
*
pHashObj
,
SLHashBucket
*
pBucket
)
{
size_t
numOfPages
=
taosArrayGetSize
(
pBucket
->
pPageIdList
);
if
(
numOfPages
<=
1
)
{
return
;
}
int32_t
*
firstPage
=
taosArrayGet
(
pBucket
->
pPageIdList
,
0
);
SFilePage
*
pFirst
=
getBufPage
(
pHashObj
->
pBuf
,
*
firstPage
);
int32_t
*
pageId
=
taosArrayGetLast
(
pBucket
->
pPageIdList
);
SFilePage
*
pLast
=
getBufPage
(
pHashObj
->
pBuf
,
*
pageId
);
if
(
pLast
->
num
<=
sizeof
(
SFilePage
))
{
// this is empty
dBufSetBufPageRecycled
(
pHashObj
->
pBuf
,
pLast
);
releaseBufPage
(
pHashObj
->
pBuf
,
pFirst
);
taosArrayRemove
(
pBucket
->
pPageIdList
,
numOfPages
-
1
);
return
;
}
char
*
pStart
=
pLast
->
data
;
int32_t
nodeSize
=
GET_LHASH_NODE_LEN
(
pStart
);
while
(
1
)
{
if
(
pFirst
->
num
+
nodeSize
<
getBufPageSize
(
pHashObj
->
pBuf
))
{
char
*
p
=
((
char
*
)
pFirst
)
+
pFirst
->
num
;
SLHashNode
*
pNode
=
(
SLHashNode
*
)
pStart
;
doCopyObject
(
p
,
GET_LHASH_NODE_KEY
(
pStart
),
pNode
->
keyLen
,
GET_LHASH_NODE_DATA
(
pStart
),
pNode
->
dataLen
);
setBufPageDirty
(
pFirst
,
true
);
setBufPageDirty
(
pLast
,
true
);
ASSERT
(
pLast
->
num
>=
nodeSize
+
sizeof
(
SFilePage
));
pFirst
->
num
+=
nodeSize
;
pLast
->
num
-=
nodeSize
;
pStart
+=
nodeSize
;
if
(
pLast
->
num
<=
sizeof
(
SFilePage
))
{
// this is empty
dBufSetBufPageRecycled
(
pHashObj
->
pBuf
,
pLast
);
releaseBufPage
(
pHashObj
->
pBuf
,
pFirst
);
taosArrayRemove
(
pBucket
->
pPageIdList
,
numOfPages
-
1
);
break
;
}
nodeSize
=
GET_LHASH_NODE_LEN
(
pStart
);
}
else
{
// move to the front of pLast page
if
(
pStart
!=
pLast
->
data
)
{
memmove
(
pLast
->
data
,
pStart
,
(((
char
*
)
pLast
)
+
pLast
->
num
-
pStart
));
setBufPageDirty
(
pLast
,
true
);
}
releaseBufPage
(
pHashObj
->
pBuf
,
pLast
);
releaseBufPage
(
pHashObj
->
pBuf
,
pFirst
);
break
;
}
}
}
static
int32_t
doAddNewBucket
(
SLHashObj
*
pHashObj
)
{
if
(
pHashObj
->
numOfBuckets
+
1
>
pHashObj
->
numOfAlloc
)
{
int32_t
newLen
=
pHashObj
->
numOfAlloc
*
1
.
25
;
if
(
newLen
==
pHashObj
->
numOfAlloc
)
{
newLen
+=
4
;
}
char
*
p
=
realloc
(
pHashObj
->
pBucket
,
POINTER_BYTES
*
newLen
);
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
memset
(
p
+
POINTER_BYTES
*
pHashObj
->
numOfBuckets
,
0
,
newLen
-
pHashObj
->
numOfBuckets
);
pHashObj
->
pBucket
=
(
SLHashBucket
**
)
p
;
pHashObj
->
numOfAlloc
=
newLen
;
}
SLHashBucket
*
pBucket
=
calloc
(
1
,
sizeof
(
SLHashBucket
));
pHashObj
->
pBucket
[
pHashObj
->
numOfBuckets
]
=
pBucket
;
pBucket
->
pPageIdList
=
taosArrayInit
(
2
,
sizeof
(
int32_t
));
if
(
pBucket
->
pPageIdList
==
NULL
||
pBucket
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
pageId
=
-
1
;
SFilePage
*
p
=
getNewBufPage
(
pHashObj
->
pBuf
,
0
,
&
pageId
);
p
->
num
=
sizeof
(
SFilePage
);
setBufPageDirty
(
p
,
true
);
releaseBufPage
(
pHashObj
->
pBuf
,
p
);
taosArrayPush
(
pBucket
->
pPageIdList
,
&
pageId
);
pHashObj
->
numOfBuckets
+=
1
;
// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
return
TSDB_CODE_SUCCESS
;
}
SLHashObj
*
tHashInit
(
int32_t
inMemPages
,
int32_t
pageSize
,
_hash_fn_t
fn
,
int32_t
numOfTuplePerPage
)
{
SLHashObj
*
pHashObj
=
calloc
(
1
,
sizeof
(
SLHashObj
));
if
(
pHashObj
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
int32_t
code
=
createDiskbasedBuf
(
&
pHashObj
->
pBuf
,
pageSize
,
inMemPages
*
pageSize
,
0
,
"/tmp"
);
if
(
code
!=
0
)
{
terrno
=
code
;
return
NULL
;
}
setBufPageCompressOnDisk
(
pHashObj
->
pBuf
,
false
);
/**
* The number of bits in the hash value, which is used to decide the exact bucket where the object should be located in.
* The initial value is 0.
*/
pHashObj
->
bits
=
0
;
pHashObj
->
hashFn
=
fn
;
pHashObj
->
tuplesPerPage
=
numOfTuplePerPage
;
pHashObj
->
numOfAlloc
=
4
;
// initial allocated array list
pHashObj
->
pBucket
=
calloc
(
pHashObj
->
numOfAlloc
,
POINTER_BYTES
);
code
=
doAddNewBucket
(
pHashObj
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyDiskbasedBuf
(
pHashObj
->
pBuf
);
tfree
(
pHashObj
);
terrno
=
code
;
return
NULL
;
}
return
pHashObj
;
}
void
*
tHashCleanup
(
SLHashObj
*
pHashObj
)
{
destroyDiskbasedBuf
(
pHashObj
->
pBuf
);
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
numOfBuckets
;
++
i
)
{
taosArrayDestroy
(
pHashObj
->
pBucket
[
i
]
->
pPageIdList
);
tfree
(
pHashObj
->
pBucket
[
i
]);
}
tfree
(
pHashObj
->
pBucket
);
tfree
(
pHashObj
);
return
NULL
;
}
int32_t
tHashPut
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
*
data
,
size_t
size
)
{
ASSERT
(
pHashObj
!=
NULL
&&
key
!=
NULL
);
if
(
pHashObj
->
bits
==
0
)
{
SLHashBucket
*
pBucket
=
pHashObj
->
pBucket
[
0
];
doAddToBucket
(
pHashObj
,
pBucket
,
0
,
key
,
keyLen
,
data
,
size
);
}
else
{
int32_t
hashVal
=
pHashObj
->
hashFn
(
key
,
keyLen
);
int32_t
v
=
doGetBucketIdFromHashVal
(
hashVal
,
pHashObj
->
bits
);
if
(
v
>=
pHashObj
->
numOfBuckets
)
{
int32_t
newBucketId
=
doGetAlternativeBucketId
(
v
,
pHashObj
->
bits
,
pHashObj
->
numOfBuckets
);
// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
v
=
newBucketId
;
}
SLHashBucket
*
pBucket
=
pHashObj
->
pBucket
[
v
];
int32_t
code
=
doAddToBucket
(
pHashObj
,
pBucket
,
v
,
key
,
keyLen
,
data
,
size
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
pHashObj
->
size
+=
1
;
// Too many records, needs to bucket split
if
((
pHashObj
->
numOfBuckets
*
LHASH_CAP_RATIO
*
pHashObj
->
tuplesPerPage
)
<
pHashObj
->
size
)
{
int32_t
newBucketId
=
pHashObj
->
numOfBuckets
;
int32_t
code
=
doAddNewBucket
(
pHashObj
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
int32_t
numOfBits
=
ceil
(
log
(
pHashObj
->
numOfBuckets
)
/
log
(
2
));
if
(
numOfBits
>
pHashObj
->
bits
)
{
// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
ASSERT
(
numOfBits
==
pHashObj
->
bits
+
1
);
pHashObj
->
bits
=
numOfBits
;
}
int32_t
splitBucketId
=
doGetRelatedSplitBucketId
(
newBucketId
,
pHashObj
->
bits
);
// load all data in this bucket and check if the data needs to relocated into the new bucket
SLHashBucket
*
pBucket
=
pHashObj
->
pBucket
[
splitBucketId
];
// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pBucket
->
pPageIdList
);
++
i
)
{
int32_t
pageId
=
*
(
int32_t
*
)
taosArrayGet
(
pBucket
->
pPageIdList
,
i
);
SFilePage
*
p
=
getBufPage
(
pHashObj
->
pBuf
,
pageId
);
char
*
pStart
=
p
->
data
;
while
(
pStart
-
((
char
*
)
p
)
<
p
->
num
)
{
SLHashNode
*
pNode
=
(
SLHashNode
*
)
pStart
;
ASSERT
(
pNode
->
keyLen
>
0
&&
pNode
->
dataLen
>=
0
);
char
*
k
=
GET_LHASH_NODE_KEY
(
pNode
);
int32_t
hashv
=
pHashObj
->
hashFn
(
k
,
pNode
->
keyLen
);
int32_t
v1
=
doGetBucketIdFromHashVal
(
hashv
,
pHashObj
->
bits
);
if
(
v1
!=
splitBucketId
)
{
// place it into the new bucket
ASSERT
(
v1
==
newBucketId
);
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket
*
pNewBucket
=
pHashObj
->
pBucket
[
newBucketId
];
doAddToBucket
(
pHashObj
,
pNewBucket
,
newBucketId
,
(
void
*
)
GET_LHASH_NODE_KEY
(
pNode
),
pNode
->
keyLen
,
GET_LHASH_NODE_KEY
(
pNode
),
pNode
->
dataLen
);
doRemoveFromBucket
(
p
,
pNode
,
pBucket
);
}
else
{
// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
int32_t
nodeSize
=
GET_LHASH_NODE_LEN
(
pStart
);
pStart
+=
nodeSize
;
}
}
releaseBufPage
(
pHashObj
->
pBuf
,
p
);
}
doCompressBucketPages
(
pHashObj
,
pBucket
);
}
return
TSDB_CODE_SUCCESS
;
}
char
*
tHashGet
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
ASSERT
(
pHashObj
!=
NULL
&&
key
!=
NULL
&&
keyLen
>
0
);
int32_t
hashv
=
pHashObj
->
hashFn
(
key
,
keyLen
);
int32_t
bucketId
=
doGetBucketIdFromHashVal
(
hashv
,
pHashObj
->
bits
);
if
(
bucketId
>=
pHashObj
->
numOfBuckets
)
{
bucketId
=
doGetAlternativeBucketId
(
bucketId
,
pHashObj
->
bits
,
pHashObj
->
numOfBuckets
);
}
SLHashBucket
*
pBucket
=
pHashObj
->
pBucket
[
bucketId
];
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pBucket
->
pPageIdList
);
++
i
)
{
int32_t
pageId
=
*
(
int32_t
*
)
taosArrayGet
(
pBucket
->
pPageIdList
,
i
);
SFilePage
*
p
=
getBufPage
(
pHashObj
->
pBuf
,
pageId
);
char
*
pStart
=
p
->
data
;
while
(
pStart
-
p
->
data
<
p
->
num
)
{
SLHashNode
*
pNode
=
(
SLHashNode
*
)
pStart
;
char
*
k
=
GET_LHASH_NODE_KEY
(
pNode
);
if
(
pNode
->
keyLen
==
keyLen
&&
(
memcmp
(
key
,
k
,
keyLen
)
==
0
))
{
releaseBufPage
(
pHashObj
->
pBuf
,
p
);
return
GET_LHASH_NODE_DATA
(
pNode
);
}
else
{
pStart
+=
GET_LHASH_NODE_LEN
(
pStart
);
}
}
releaseBufPage
(
pHashObj
->
pBuf
,
p
);
}
return
NULL
;
}
int32_t
tHashRemove
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
// todo
}
void
tHashPrint
(
const
SLHashObj
*
pHashObj
,
int32_t
type
)
{
printf
(
"==================== linear hash ====================
\n
"
);
printf
(
"total bucket:%d, size:%ld, ratio:%.2f
\n
"
,
pHashObj
->
numOfBuckets
,
pHashObj
->
size
,
LHASH_CAP_RATIO
);
dBufSetPrintInfo
(
pHashObj
->
pBuf
);
if
(
type
==
LINEAR_HASH_DATA
)
{
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
numOfBuckets
;
++
i
)
{
// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
}
}
else
{
dBufPrintStatis
(
pHashObj
->
pBuf
);
}
}
\ No newline at end of file
source/libs/executor/src/tsort.c
0 → 100644
浏览文件 @
e5606ccf
此差异已折叠。
点击以展开。
source/libs/executor/test/executorTests.cpp
浏览文件 @
e5606ccf
此差异已折叠。
点击以展开。
source/libs/executor/test/lhashTests.cpp
0 → 100644
浏览文件 @
e5606ccf
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <iostream>
#include "executorimpl.h"
#include "tlinearhash.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
TEST
(
testCase
,
linear_hash_Tests
)
{
srand
(
time
(
NULL
));
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
);
#if 0
SLHashObj* pHashObj = tHashInit(256, 4096, fn, 320);
for(int32_t i = 0; i < 5000000; ++i) {
int32_t code = tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
assert(code == 0);
}
// tHashPrint(pHashObj, LINEAR_HASH_STATIS);
// for(int32_t i = 0; i < 10000; ++i) {
// char* v = tHashGet(pHashObj, &i, sizeof(i));
// if (v != NULL) {
//// printf("find value: %d, key:%d\n", *(int32_t*) v, i);
// } else {
// printf("failed to found key:%d in hash\n", i);
// }
// }
tHashPrint(pHashObj, LINEAR_HASH_STATIS);
tHashCleanup(pHashObj);
#endif
#if 0
SHashObj* pHashObj = taosHashInit(1000, fn, false, HASH_NO_LOCK);
for(int32_t i = 0; i < 1000000; ++i) {
taosHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
}
for(int32_t i = 0; i < 10000; ++i) {
void* v = taosHashGet(pHashObj, &i, sizeof(i));
}
taosHashCleanup(pHashObj);
#endif
}
\ No newline at end of file
source/libs/executor/test/sortTests.cpp
0 → 100644
浏览文件 @
e5606ccf
此差异已折叠。
点击以展开。
source/libs/function/src/taggfunction.c
浏览文件 @
e5606ccf
...
...
@@ -29,9 +29,10 @@
#include "tcompression.h"
//#include "queryLog.h"
#include "tudf.h"
#include "tep.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (
GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes
)
#define GET_INPUT_DATA(x, y) (
(char*) colDataGetData((x)->pInput, (y))
)
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
...
...
@@ -3818,7 +3819,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) {
skey
=
ekey
;
}
}
assignVal
(
pCtx
->
pOutput
,
pCtx
->
pInput
,
pCtx
->
resDataInfo
.
bytes
,
pCtx
->
inputType
);
//
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->resDataInfo.bytes, pCtx->inputType);
}
else
if
(
type
==
TSDB_FILL_NEXT
)
{
TSKEY
ekey
=
skey
;
char
*
val
=
NULL
;
...
...
@@ -4395,7 +4396,7 @@ SFunctionFpSet fpSet[1] = {
.
addInput
=
count_function
,
.
finalize
=
doFinalizer
,
.
combine
=
count_func_merge
,
}
}
,
};
SAggFunctionInfo
aggFunc
[
35
]
=
{{
...
...
source/libs/function/src/tpercentile.c
浏览文件 @
e5606ccf
...
...
@@ -222,7 +222,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
}
pBucket
->
numOfSlots
=
DEFAULT_NUM_OF_SLOT
;
pBucket
->
bufPageSize
=
DEFAULT_PAGE_SIZE
*
4
;
// 4
k per page
pBucket
->
bufPageSize
=
16384
*
4
;
// 16
k per page
pBucket
->
type
=
dataType
;
pBucket
->
bytes
=
nElemSize
;
...
...
@@ -255,7 +255,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo
(
pBucket
);
int32_t
ret
=
createDiskbasedBuf
fer
(
&
pBucket
->
pBuffer
,
pBucket
->
bufPageSize
,
pBucket
->
bufPageSize
*
512
,
1
,
tsTempDir
);
int32_t
ret
=
createDiskbasedBuf
(
&
pBucket
->
pBuffer
,
pBucket
->
bufPageSize
,
pBucket
->
bufPageSize
*
512
,
1
,
"/tmp"
);
if
(
ret
!=
0
)
{
tMemBucketDestroy
(
pBucket
);
return
NULL
;
...
...
@@ -270,7 +270,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return
;
}
destroy
Result
Buf
(
pBucket
->
pBuffer
);
destroy
Diskbased
Buf
(
pBucket
->
pBuffer
);
tfree
(
pBucket
->
pSlots
);
tfree
(
pBucket
);
}
...
...
@@ -348,7 +348,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot
->
info
.
data
=
NULL
;
}
pSlot
->
info
.
data
=
getNew
DataBuf
(
pBucket
->
pBuffer
,
groupId
,
&
pageId
);
pSlot
->
info
.
data
=
getNew
BufPage
(
pBucket
->
pBuffer
,
groupId
,
&
pageId
);
pSlot
->
info
.
pageId
=
pageId
;
}
...
...
source/libs/parser/src/queryInfoUtil.c
浏览文件 @
e5606ccf
...
...
@@ -230,7 +230,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo) {
}
void
assignExprInfo
(
SExprInfo
*
dst
,
const
SExprInfo
*
src
)
{
assert
(
dst
!=
NULL
&&
src
!=
NULL
);
assert
(
dst
!=
NULL
&&
src
!=
NULL
/* && src->base.numOfCols > 0*/
);
*
dst
=
*
src
;
#if 0
...
...
@@ -241,8 +241,12 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
#endif
dst
->
pExpr
=
exprdup
(
src
->
pExpr
);
dst
->
base
.
pColumns
=
calloc
(
src
->
base
.
numOfCols
,
sizeof
(
SColumn
));
memcpy
(
dst
->
base
.
pColumns
,
src
->
base
.
pColumns
,
sizeof
(
SColumn
)
*
src
->
base
.
numOfCols
);
if
(
src
->
base
.
numOfCols
>
0
)
{
dst
->
base
.
pColumns
=
calloc
(
src
->
base
.
numOfCols
,
sizeof
(
SColumn
));
memcpy
(
dst
->
base
.
pColumns
,
src
->
base
.
pColumns
,
sizeof
(
SColumn
)
*
src
->
base
.
numOfCols
);
}
else
{
dst
->
base
.
pColumns
=
NULL
;
}
memset
(
dst
->
base
.
param
,
0
,
sizeof
(
SVariant
)
*
tListLen
(
dst
->
base
.
param
));
for
(
int32_t
j
=
0
;
j
<
src
->
base
.
numOfParams
;
++
j
)
{
...
...
source/libs/scalar/inc/filterInt.h
浏览文件 @
e5606ccf
...
...
@@ -306,7 +306,7 @@ typedef struct SFilterInfo {
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnNode *)((fi)->desc))->colId)
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnNode *)((fi)->desc))->slotId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnNode *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) (colDataGet(((SColumnInfoData *)(fi)->data), (ri)))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) (colDataGet
Data
(((SColumnInfoData *)(fi)->data), (ri)))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((SValueNode *)((fi)->desc))->node.resType.type)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc)
...
...
source/libs/scalar/src/filter.c
浏览文件 @
e5606ccf
...
...
@@ -19,6 +19,7 @@
#include "tcompare.h"
#include "filterInt.h"
#include "filter.h"
#include "tep.h"
OptrStr
gOptrStr
[]
=
{
{
0
,
"invalid"
},
...
...
@@ -2776,7 +2777,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
uint32_t
unitNum
=
*
(
unitIdx
++
);
for
(
uint32_t
u
=
0
;
u
<
unitNum
;
++
u
)
{
SFilterComUnit
*
cunit
=
&
info
->
cunits
[
*
(
unitIdx
+
u
)];
void
*
colData
=
colDataGet
((
SColumnInfoData
*
)
cunit
->
colData
,
i
);
void
*
colData
=
colDataGet
Data
((
SColumnInfoData
*
)
cunit
->
colData
,
i
);
//if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx);
...
...
@@ -2874,7 +2875,7 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
uint32_t
uidx
=
info
->
groups
[
0
].
unitIdxs
[
0
];
void
*
colData
=
colDataGet
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
i
);
void
*
colData
=
colDataGet
Data
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
i
);
if
(
info
->
cunits
[
uidx
].
dataType
==
TSDB_DATA_TYPE_JSON
){
if
(
!
colData
){
// for json->'key' is null
(
*
p
)[
i
]
=
1
;
...
...
@@ -2908,7 +2909,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
uint32_t
uidx
=
info
->
groups
[
0
].
unitIdxs
[
0
];
void
*
colData
=
colDataGet
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
i
);
void
*
colData
=
colDataGet
Data
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
i
);
if
(
info
->
cunits
[
uidx
].
dataType
==
TSDB_DATA_TYPE_JSON
){
if
(
!
colData
)
{
// for json->'key' is not null
...
...
@@ -2949,7 +2950,7 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
}
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
void
*
colData
=
colDataGet
((
SColumnInfoData
*
)
info
->
cunits
[
0
].
colData
,
i
);
void
*
colData
=
colDataGet
Data
((
SColumnInfoData
*
)
info
->
cunits
[
0
].
colData
,
i
);
if
(
colData
==
NULL
||
isNull
(
colData
,
info
->
cunits
[
0
].
dataType
))
{
all
=
false
;
...
...
@@ -2980,7 +2981,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
uint32_t
uidx
=
info
->
groups
[
0
].
unitIdxs
[
0
];
void
*
colData
=
colDataGet
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
i
);
void
*
colData
=
colDataGet
Data
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
i
);
if
(
colData
==
NULL
||
isNull
(
colData
,
info
->
cunits
[
uidx
].
dataType
))
{
(
*
p
)[
i
]
=
0
;
all
=
false
;
...
...
@@ -3031,7 +3032,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
for
(
uint32_t
u
=
0
;
u
<
group
->
unitNum
;
++
u
)
{
uint32_t
uidx
=
group
->
unitIdxs
[
u
];
SFilterComUnit
*
cunit
=
&
info
->
cunits
[
uidx
];
void
*
colData
=
colDataGet
((
SColumnInfoData
*
)(
cunit
->
colData
),
i
);
void
*
colData
=
colDataGet
Data
((
SColumnInfoData
*
)(
cunit
->
colData
),
i
);
//if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx);
...
...
source/libs/scalar/src/sclvector.c
浏览文件 @
e5606ccf
...
...
@@ -226,7 +226,7 @@ void* getVectorValueAddr_default(void *src, int32_t index) {
return
src
;
}
void
*
getVectorValueAddr_VAR
(
void
*
src
,
int32_t
index
)
{
return
colDataGet
((
SColumnInfoData
*
)
src
,
index
);
return
colDataGet
Data
((
SColumnInfoData
*
)
src
,
index
);
}
_getValueAddr_fn_t
getVectorValueAddrFn
(
int32_t
srcType
)
{
...
...
source/util/src/tpagedbuf.c
浏览文件 @
e5606ccf
此差异已折叠。
点击以展开。
source/util/test/pageBufferTest.cpp
浏览文件 @
e5606ccf
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录