Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c4301521
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
c4301521
编写于
12月 15, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
c971df22
8b5e0b43
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
292 addition
and
242 deletion
+292
-242
include/dnode/vnode/tq/tq.h
include/dnode/vnode/tq/tq.h
+7
-7
include/libs/wal/wal.h
include/libs/wal/wal.h
+3
-3
source/dnode/vnode/tq/inc/tqMetaStore.h
source/dnode/vnode/tq/inc/tqMetaStore.h
+11
-11
source/dnode/vnode/tq/src/tq.c
source/dnode/vnode/tq/src/tq.c
+2
-3
source/dnode/vnode/tq/src/tqMetaStore.c
source/dnode/vnode/tq/src/tqMetaStore.c
+46
-47
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+8
-0
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+41
-0
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+6
-2
source/libs/wal/src/walSeek.c
source/libs/wal/src/walSeek.c
+8
-8
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+2
-3
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+158
-158
未找到文件。
include/dnode/vnode/tq/tq.h
浏览文件 @
c4301521
...
...
@@ -226,21 +226,21 @@ typedef struct TqMetaHandle {
int64_t
serializedSize
;
void
*
valueInUse
;
void
*
valueInTxn
;
}
TqMetaHandle
;
}
S
TqMetaHandle
;
typedef
struct
TqMetaList
{
TqMetaHandle
handle
;
S
TqMetaHandle
handle
;
struct
TqMetaList
*
next
;
//struct TqMetaList* inTxnPrev;
//struct TqMetaList* inTxnNext;
struct
TqMetaList
*
unpersistPrev
;
struct
TqMetaList
*
unpersistNext
;
}
TqMetaList
;
}
S
TqMetaList
;
typedef
struct
TqMetaStore
{
TqMetaList
*
bucket
[
TQ_BUCKET_SIZE
];
S
TqMetaList
*
bucket
[
TQ_BUCKET_SIZE
];
//a table head
TqMetaList
*
unpersistHead
;
S
TqMetaList
*
unpersistHead
;
//TODO:temporaral use, to be replaced by unified tfile
int
fileFd
;
//TODO:temporaral use, to be replaced by unified tfile
...
...
@@ -250,7 +250,7 @@ typedef struct TqMetaStore {
TqSerializeFun
pSerializer
;
TqDeserializeFun
pDeserializer
;
TqDeleteFun
pDeleter
;
}
TqMetaStore
;
}
S
TqMetaStore
;
typedef
struct
STQ
{
// the collection of group handle
...
...
@@ -259,7 +259,7 @@ typedef struct STQ {
STqCfg
*
tqConfig
;
TqLogReader
*
tqLogReader
;
TqMemRef
tqMemRef
;
TqMetaStore
*
tqMeta
;
S
TqMetaStore
*
tqMeta
;
}
STQ
;
// open in each vnode
...
...
include/libs/wal/wal.h
浏览文件 @
c4301521
...
...
@@ -88,17 +88,17 @@ typedef struct SWalVer {
typedef
struct
SWal
{
// cfg
SWalCfg
cfg
;
int32_t
fsyncSeq
;
//meta
SWalVer
vers
;
//file set
int64_t
writeLogTfd
;
int64_t
writeIdxTfd
;
int32_t
writeCur
;
SArray
*
fileInfoSet
;
//stat
istic
s
//stat
u
s
int64_t
totSize
;
int64_t
lastRollSeq
;
//ctl
int32_t
fsyncSeq
;
int64_t
refId
;
pthread_mutex_t
mutex
;
//path
...
...
source/dnode/vnode/tq/inc/tqMetaStore.h
浏览文件 @
c4301521
...
...
@@ -24,29 +24,29 @@ extern "C" {
#endif
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
S
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
TqSerializeFun
pSerializer
,
TqDeserializeFun
pDeserializer
,
TqDeleteFun
pDeleter
,
int32_t
tqConfigFlag
);
int32_t
tqStoreClose
(
TqMetaStore
*
);
int32_t
tqStoreClose
(
S
TqMetaStore
*
);
//int32_t tqStoreDelete(TqMetaStore*);
//int32_t tqStoreCommitAll(TqMetaStore*);
int32_t
tqStorePersist
(
TqMetaStore
*
);
int32_t
tqStorePersist
(
S
TqMetaStore
*
);
//clean deleted idx and data from persistent file
int32_t
tqStoreCompact
(
TqMetaStore
*
);
int32_t
tqStoreCompact
(
S
TqMetaStore
*
);
void
*
tqHandleGet
(
TqMetaStore
*
,
int64_t
key
);
void
*
tqHandleGet
(
S
TqMetaStore
*
,
int64_t
key
);
//make it unpersist
void
*
tqHandleTouchGet
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleMovePut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
void
*
tqHandleTouchGet
(
S
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleMovePut
(
S
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
S
TqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
//delete committed kv pair
//notice that a delete action still needs to be committed
int32_t
tqHandleDel
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleCommit
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleAbort
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleDel
(
S
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleCommit
(
S
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleAbort
(
S
TqMetaStore
*
,
int64_t
key
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/tq/src/tq.c
浏览文件 @
c4301521
...
...
@@ -49,8 +49,8 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAl
pTq
->
path
=
strdup
(
path
);
pTq
->
tqConfig
=
tqConfig
;
pTq
->
tqLogReader
=
tqLogReader
;
//
pTq->tqMemRef.pAlloctorFactory = allocFac;
//
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
pTq
->
tqMemRef
.
pAlloctorFactory
=
allocFac
;
pTq
->
tqMemRef
.
pAllocator
=
allocFac
->
create
(
allocFac
);
if
(
pTq
->
tqMemRef
.
pAllocator
==
NULL
)
{
//TODO
}
...
...
@@ -202,7 +202,6 @@ static int tqFetch(TqGroupHandle* gHandle, void** msg) {
return
totSize
;
}
TqGroupHandle
*
tqGetGroupHandle
(
STQ
*
pTq
,
int64_t
cId
)
{
return
NULL
;
}
...
...
source/dnode/vnode/tq/src/tqMetaStore.c
浏览文件 @
c4301521
...
...
@@ -22,11 +22,10 @@
#define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME "tq.idx"
static
int32_t
tqHandlePutCommitted
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
);
static
void
*
tqHandleGetUncommitted
(
STqMetaStore
*
,
int64_t
key
);
static
int32_t
tqHandlePutCommitted
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
,
int64_t
key
);
static
inline
void
tqLinkUnpersist
(
TqMetaStore
*
pMeta
,
TqMetaList
*
pNode
)
{
static
inline
void
tqLinkUnpersist
(
STqMetaStore
*
pMeta
,
STqMetaList
*
pNode
)
{
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
...
...
@@ -68,18 +67,18 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
return
lseek
(
fd
,
offset
,
SEEK_SET
);
}
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
S
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
TqSerializeFun
serializer
,
TqDeserializeFun
deserializer
,
TqDeleteFun
deleter
,
int32_t
tqConfigFlag
)
{
TqMetaStore
*
pMeta
=
malloc
(
sizeof
(
TqMetaStore
));
STqMetaStore
*
pMeta
=
malloc
(
sizeof
(
S
TqMetaStore
));
if
(
pMeta
==
NULL
)
{
//close
return
NULL
;
}
memset
(
pMeta
,
0
,
sizeof
(
TqMetaStore
));
memset
(
pMeta
,
0
,
sizeof
(
S
TqMetaStore
));
//concat data file name and index file name
size_t
pathLen
=
strlen
(
path
);
...
...
@@ -105,14 +104,14 @@ TqMetaStore* tqStoreOpen(const char* path,
}
pMeta
->
idxFd
=
idxFd
;
pMeta
->
unpersistHead
=
malloc
(
sizeof
(
TqMetaList
));
pMeta
->
unpersistHead
=
malloc
(
sizeof
(
S
TqMetaList
));
if
(
pMeta
->
unpersistHead
==
NULL
)
{
ASSERT
(
false
);
//close file
//free memory
return
NULL
;
}
memset
(
pMeta
->
unpersistHead
,
0
,
sizeof
(
TqMetaList
));
memset
(
pMeta
->
unpersistHead
,
0
,
sizeof
(
S
TqMetaList
));
pMeta
->
unpersistHead
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistPrev
=
pMeta
->
unpersistHead
;
...
...
@@ -149,11 +148,11 @@ TqMetaStore* tqStoreOpen(const char* path,
ASSERT
(
idxBuf
.
head
.
writeOffset
==
idxRead
);
//loop read every entry
for
(
int
i
=
0
;
i
<
idxBuf
.
head
.
writeOffset
-
TQ_IDX_PAGE_HEAD_SIZE
;
i
+=
TQ_IDX_SIZE
)
{
TqMetaList
*
pNode
=
malloc
(
sizeof
(
TqMetaList
));
STqMetaList
*
pNode
=
malloc
(
sizeof
(
S
TqMetaList
));
if
(
pNode
==
NULL
)
{
//TODO: free memory and return error
}
memset
(
pNode
,
0
,
sizeof
(
TqMetaList
));
memset
(
pNode
,
0
,
sizeof
(
S
TqMetaList
));
memcpy
(
&
pNode
->
handle
,
&
idxBuf
.
buffer
[
i
],
TQ_IDX_SIZE
);
lseek
(
fileFd
,
pNode
->
handle
.
offset
,
SEEK_SET
);
...
...
@@ -199,7 +198,7 @@ TqMetaStore* tqStoreOpen(const char* path,
//put into list
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pBucketNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pBucketNode
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketNode
==
NULL
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
else
if
(
pBucketNode
->
handle
.
key
==
pNode
->
handle
.
key
)
{
...
...
@@ -212,7 +211,7 @@ TqMetaStore* tqStoreOpen(const char* path,
}
if
(
pBucketNode
->
next
)
{
ASSERT
(
pBucketNode
->
next
->
handle
.
key
==
pNode
->
handle
.
key
);
TqMetaList
*
pNodeFound
=
pBucketNode
->
next
;
S
TqMetaList
*
pNodeFound
=
pBucketNode
->
next
;
pNode
->
next
=
pNodeFound
->
next
;
pBucketNode
->
next
=
pNode
;
pBucketNode
=
pNodeFound
;
...
...
@@ -239,7 +238,7 @@ TqMetaStore* tqStoreOpen(const char* path,
return
pMeta
;
}
int32_t
tqStoreClose
(
TqMetaStore
*
pMeta
)
{
int32_t
tqStoreClose
(
S
TqMetaStore
*
pMeta
)
{
//commit data and idx
tqStorePersist
(
pMeta
);
ASSERT
(
pMeta
->
unpersistHead
&&
pMeta
->
unpersistHead
->
next
==
NULL
);
...
...
@@ -247,7 +246,7 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
close
(
pMeta
->
idxFd
);
//free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
TqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
while
(
pNode
)
{
ASSERT
(
pNode
->
unpersistNext
==
NULL
);
ASSERT
(
pNode
->
unpersistPrev
==
NULL
);
...
...
@@ -259,7 +258,7 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInUse
);
}
TqMetaList
*
next
=
pNode
->
next
;
S
TqMetaList
*
next
=
pNode
->
next
;
free
(
pNode
);
pNode
=
next
;
}
...
...
@@ -270,12 +269,12 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
return
0
;
}
int32_t
tqStoreDelete
(
TqMetaStore
*
pMeta
)
{
int32_t
tqStoreDelete
(
S
TqMetaStore
*
pMeta
)
{
close
(
pMeta
->
fileFd
);
close
(
pMeta
->
idxFd
);
//free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
TqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
pMeta
->
bucket
[
i
]
=
NULL
;
while
(
pNode
)
{
if
(
pNode
->
handle
.
valueInTxn
...
...
@@ -286,7 +285,7 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInUse
);
}
TqMetaList
*
next
=
pNode
->
next
;
S
TqMetaList
*
next
=
pNode
->
next
;
free
(
pNode
);
pNode
=
next
;
}
...
...
@@ -299,11 +298,11 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
}
//TODO: wrap in tfile
int32_t
tqStorePersist
(
TqMetaStore
*
pMeta
)
{
int32_t
tqStorePersist
(
S
TqMetaStore
*
pMeta
)
{
TqIdxPageBuf
idxBuf
;
int64_t
*
bufPtr
=
(
int64_t
*
)
idxBuf
.
buffer
;
TqMetaList
*
pHead
=
pMeta
->
unpersistHead
;
TqMetaList
*
pNode
=
pHead
->
unpersistNext
;
S
TqMetaList
*
pHead
=
pMeta
->
unpersistHead
;
S
TqMetaList
*
pNode
=
pHead
->
unpersistNext
;
TqSerializedHead
*
pSHead
=
malloc
(
sizeof
(
TqSerializedHead
));
if
(
pSHead
==
NULL
)
{
//TODO: memory error
...
...
@@ -384,11 +383,11 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
pNode
->
handle
.
valueInTxn
==
NULL
)
{
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketHead
==
pNode
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
->
next
;
}
else
{
TqMetaList
*
pBucketNode
=
pBucketHead
;
S
TqMetaList
*
pBucketNode
=
pBucketHead
;
while
(
pBucketNode
->
next
!=
NULL
&&
pBucketNode
->
next
!=
pNode
)
{
pBucketNode
=
pBucketNode
->
next
;
...
...
@@ -415,9 +414,9 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
return
0
;
}
static
int32_t
tqHandlePutCommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
static
int32_t
tqHandlePutCommitted
(
S
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
...
...
@@ -432,12 +431,12 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
pNode
=
pNode
->
next
;
}
}
TqMetaList
*
pNewNode
=
malloc
(
sizeof
(
TqMetaList
));
STqMetaList
*
pNewNode
=
malloc
(
sizeof
(
S
TqMetaList
));
if
(
pNewNode
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
memset
(
pNewNode
,
0
,
sizeof
(
S
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInUse
=
value
;
//put into unpersist list
...
...
@@ -448,9 +447,9 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
return
0
;
}
void
*
tqHandleGet
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
void
*
tqHandleGet
(
S
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
...
...
@@ -466,9 +465,9 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
return
NULL
;
}
void
*
tqHandleTouchGet
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
void
*
tqHandleTouchGet
(
S
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
...
...
@@ -485,9 +484,9 @@ void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) {
return
NULL
;
}
static
inline
int32_t
tqHandlePutImpl
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
static
inline
int32_t
tqHandlePutImpl
(
S
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
...
...
@@ -506,12 +505,12 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val
pNode
=
pNode
->
next
;
}
}
TqMetaList
*
pNewNode
=
malloc
(
sizeof
(
TqMetaList
));
STqMetaList
*
pNewNode
=
malloc
(
sizeof
(
S
TqMetaList
));
if
(
pNewNode
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
memset
(
pNewNode
,
0
,
sizeof
(
S
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
value
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
...
...
@@ -520,11 +519,11 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val
return
0
;
}
int32_t
tqHandleMovePut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int32_t
tqHandleMovePut
(
S
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
return
tqHandlePutImpl
(
pMeta
,
key
,
value
);
}
int32_t
tqHandleCopyPut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
,
size_t
vsize
)
{
int32_t
tqHandleCopyPut
(
S
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
,
size_t
vsize
)
{
void
*
vmem
=
malloc
(
vsize
);
if
(
vmem
==
NULL
)
{
//TODO: memory error
...
...
@@ -534,9 +533,9 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
return
tqHandlePutImpl
(
pMeta
,
key
,
vmem
);
}
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
static
void
*
tqHandleGetUncommitted
(
S
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
...
...
@@ -552,9 +551,9 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
return
NULL
;
}
int32_t
tqHandleCommit
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int32_t
tqHandleCommit
(
S
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
==
NULL
)
{
...
...
@@ -575,9 +574,9 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
return
-
2
;
}
int32_t
tqHandleAbort
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int32_t
tqHandleAbort
(
S
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
...
...
@@ -596,9 +595,9 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
return
-
2
;
}
int32_t
tqHandleDel
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int32_t
tqHandleDel
(
S
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
S
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
...
...
@@ -616,6 +615,6 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
}
//TODO: clean deleted idx and data from persistent file
int32_t
tqStoreCompact
(
TqMetaStore
*
pMeta
)
{
int32_t
tqStoreCompact
(
S
TqMetaStore
*
pMeta
)
{
return
0
;
}
source/libs/wal/inc/walInt.h
浏览文件 @
c4301521
...
...
@@ -105,6 +105,10 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
return
taosCalcChecksum
(
0
,
(
uint8_t
*
)
body
,
len
);
}
static
inline
int64_t
walGetVerIdxOffset
(
SWal
*
pWal
,
int64_t
ver
)
{
return
(
ver
-
walGetCurFileFirstVer
(
pWal
))
*
sizeof
(
WalIdxEntry
);
}
static
inline
void
walResetVer
(
SWalVer
*
pVer
)
{
pVer
->
firstVer
=
-
1
;
pVer
->
verInSnapshotting
=
-
1
;
...
...
@@ -117,6 +121,10 @@ int walLoadMeta(SWal* pWal);
int
walSaveMeta
(
SWal
*
pWal
);
int
walRollFileInfo
(
SWal
*
pWal
);
int
walCheckAndRepairMeta
(
SWal
*
pWal
);
int
walCheckAndRepairIdx
(
SWal
*
pWal
);
char
*
walMetaSerialize
(
SWal
*
pWal
);
int
walMetaDeserialize
(
SWal
*
pWal
,
const
char
*
bytes
);
//meta section end
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
c4301521
...
...
@@ -40,6 +40,47 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
}
int
walCheckAndRepairMeta
(
SWal
*
pWal
)
{
// load log files, get first/snapshot/last version info
const
char
*
logPattern
=
"^[0-9]+.log$"
;
const
char
*
idxPattern
=
"^[0-9]+.idx$"
;
regex_t
logRegPattern
;
regex_t
idxRegPattern
;
SArray
*
pLogArray
=
taosArrayInit
(
8
,
sizeof
(
int64_t
));
regcomp
(
&
logRegPattern
,
logPattern
,
REG_EXTENDED
);
regcomp
(
&
idxRegPattern
,
idxPattern
,
REG_EXTENDED
);
DIR
*
dir
=
opendir
(
pWal
->
path
);
if
(
dir
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to open since %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
-
1
;
}
struct
dirent
*
ent
;
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
char
*
name
=
basename
(
ent
->
d_name
);
int
code
=
regexec
(
&
logRegPattern
,
name
,
0
,
NULL
,
0
);
if
(
code
==
0
)
{
int64_t
firstVer
;
sscanf
(
name
,
"%"
PRId64
".log"
,
&
firstVer
);
taosArrayPush
(
pLogArray
,
&
firstVer
);
}
}
// load meta
// if not match, or meta missing
// rebuild meta
return
0
;
}
int
walCheckAndRepairIdx
(
SWal
*
pWal
)
{
// iterate all idx files
// check first and last entry of each idx file valid
return
0
;
}
int
walRollFileInfo
(
SWal
*
pWal
)
{
int64_t
ts
=
taosGetTimestampSec
();
...
...
source/libs/wal/src/walMgmt.c
浏览文件 @
c4301521
...
...
@@ -90,6 +90,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
//open meta
walResetVer
(
&
pWal
->
vers
);
pWal
->
writeLogTfd
=
-
1
;
pWal
->
writeIdxTfd
=
-
1
;
pWal
->
writeCur
=
-
1
;
...
...
@@ -101,7 +102,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
//init status
walResetVer
(
&
pWal
->
vers
);
pWal
->
totSize
=
0
;
pWal
->
lastRollSeq
=
-
1
;
...
...
@@ -123,7 +123,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return
NULL
;
}
if
(
walLoadMeta
(
pWal
)
<
0
)
{
if
(
walLoadMeta
(
pWal
)
<
0
&&
walCheckAndRepairMeta
(
pWal
)
<
0
)
{
taosRemoveRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
pthread_mutex_destroy
(
&
pWal
->
mutex
);
taosArrayDestroy
(
pWal
->
fileInfoSet
);
...
...
@@ -131,6 +131,10 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return
NULL
;
}
if
(
walCheckAndRepairIdx
(
pWal
)
<
0
)
{
}
wDebug
(
"vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d"
,
pWal
->
cfg
.
vgId
,
pWal
,
pWal
->
cfg
.
level
,
pWal
->
cfg
.
fsyncPeriod
);
return
pWal
;
...
...
source/libs/wal/src/walSeek.c
浏览文件 @
c4301521
...
...
@@ -27,20 +27,20 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t
logTfd
=
pWal
->
writeLogTfd
;
//seek position
int64_t
offset
=
(
ver
-
walGetCurFileFirstVer
(
pWal
))
*
sizeof
(
WalIdxEntry
);
code
=
tfLseek
(
idxTfd
,
offset
,
SEEK_SET
);
int64_t
idxOff
=
walGetVerIdxOffset
(
pWal
,
ver
);
code
=
tfLseek
(
idxTfd
,
idxOff
,
SEEK_SET
);
if
(
code
!=
0
)
{
return
-
1
;
}
int64_t
readBuf
[
2
];
code
=
tfRead
(
idxTfd
,
readBuf
,
sizeof
(
readBuf
));
WalIdxEntry
entry
;
//TODO:deserialize
code
=
tfRead
(
idxTfd
,
&
entry
,
sizeof
(
WalIdxEntry
));
if
(
code
!=
0
)
{
return
-
1
;
}
//TODO:deserialize
ASSERT
(
readBuf
[
0
]
==
ver
);
code
=
tfLseek
(
logTfd
,
readBuf
[
1
],
SEEK_CUR
);
if
(
code
!=
0
)
{
ASSERT
(
entry
.
ver
==
ver
);
code
=
tfLseek
(
logTfd
,
entry
.
offset
,
SEEK_CUR
);
if
(
code
<
0
)
{
return
-
1
;
}
return
code
;
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
c4301521
...
...
@@ -68,13 +68,12 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildIdxName
(
pWal
,
walGetCurFileFirstVer
(
pWal
),
fnameStr
);
int64_t
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
//change to deserialize function
//TODO:change to deserialize function
if
(
idxTfd
<
0
)
{
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
-
1
;
}
int
idxOff
=
(
ver
-
walGetCurFileFirstVer
(
pWal
))
*
sizeof
(
WalIdxEntry
);
int
64_t
idxOff
=
walGetVerIdxOffset
(
pWal
,
ver
);
code
=
tfLseek
(
idxTfd
,
idxOff
,
SEEK_SET
);
if
(
code
<
0
)
{
pthread_mutex_unlock
(
&
pWal
->
mutex
);
...
...
src/query/src/qExecutor.c
浏览文件 @
c4301521
...
...
@@ -5142,22 +5142,22 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
assert
(
repeatTime
>
0
);
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
0
;
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
pInfo
->
current
=
0
;
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
0
;
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
pInfo
->
current
=
0
;
// pInfo->prevGroupId = -1;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableScanOperator"
;
pOperator
->
operatorType
=
OP_TableScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pRuntimeEnv
->
pQueryAttr
->
numOfCols
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doTableScan
;
pOperator
->
name
=
"TableScanOperator"
;
pOperator
->
operatorType
=
OP_TableScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pRuntimeEnv
->
pQueryAttr
->
numOfCols
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doTableScan
;
return
pOperator
;
}
...
...
@@ -5174,14 +5174,14 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
pRuntimeEnv
->
enableGroupData
=
true
;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
operatorType
=
OP_TableSeqScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pRuntimeEnv
->
pQueryAttr
->
numOfCols
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doTableScanImpl
;
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
operatorType
=
OP_TableSeqScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pRuntimeEnv
->
pQueryAttr
->
numOfCols
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doTableScanImpl
;
return
pOperator
;
}
...
...
@@ -5199,14 +5199,14 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
taosArrayPush
(
pInfo
->
block
.
pDataBlock
,
&
infoData
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableBlockInfoScanOperator"
;
pOperator
->
operatorType
=
OP_TableBlockInfoScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
numOfOutput
=
pRuntimeEnv
->
pQueryAttr
->
numOfCols
;
pOperator
->
exec
=
doBlockInfoScan
;
pOperator
->
name
=
"TableBlockInfoScanOperator"
;
pOperator
->
operatorType
=
OP_TableBlockInfoScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
numOfOutput
=
pRuntimeEnv
->
pQueryAttr
->
numOfCols
;
pOperator
->
exec
=
doBlockInfoScan
;
return
pOperator
;
}
...
...
@@ -5271,11 +5271,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
assert
(
repeatTime
>
0
);
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
reverseTime
;
pInfo
->
current
=
0
;
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
reverseTime
;
pInfo
->
current
=
0
;
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
SOperatorInfo
*
pOptr
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOptr
->
name
=
"DataBlocksOptimizedScanOperator"
;
...
...
@@ -5429,14 +5429,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
setDefaultOutputBuf
(
pRuntimeEnv
,
&
pInfo
->
binfo
,
pInfo
->
seed
,
MERGE_STAGE
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"GlobalAggregate"
;
pOperator
->
operatorType
=
OP_GlobalAggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
name
=
"GlobalAggregate"
;
pOperator
->
operatorType
=
OP_GlobalAggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doGlobalAggregate
;
pOperator
->
cleanup
=
destroyGlobalAggOperatorInfo
;
...
...
@@ -5473,16 +5473,16 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"MultiwaySortOperator"
;
pOperator
->
operatorType
=
OP_MultiwayMergeSort
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
exec
=
doMultiwayMergeSort
;
pOperator
->
cleanup
=
destroyGlobalAggOperatorInfo
;
pOperator
->
name
=
"MultiwaySortOperator"
;
pOperator
->
operatorType
=
OP_MultiwayMergeSort
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
exec
=
doMultiwayMergeSort
;
pOperator
->
cleanup
=
destroyGlobalAggOperatorInfo
;
return
pOperator
;
}
...
...
@@ -6543,14 +6543,14 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
setDefaultOutputBuf
(
pRuntimeEnv
,
&
pInfo
->
binfo
,
pInfo
->
seed
,
MASTER_SCAN
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableAggregate"
;
pOperator
->
operatorType
=
OP_Aggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
name
=
"TableAggregate"
;
pOperator
->
operatorType
=
OP_Aggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doAggregate
;
pOperator
->
cleanup
=
destroyAggOperatorInfo
;
...
...
@@ -6638,14 +6638,14 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
tableGroup
,
TSDB_DATA_TYPE_INT
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"MultiTableAggregate"
;
pOperator
->
operatorType
=
OP_MultiTableAggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
name
=
"MultiTableAggregate"
;
pOperator
->
operatorType
=
OP_MultiTableAggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doSTableAggregate
;
pOperator
->
cleanup
=
destroyAggOperatorInfo
;
...
...
@@ -6668,14 +6668,14 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
setDefaultOutputBuf
(
pRuntimeEnv
,
pBInfo
,
pInfo
->
seed
,
MASTER_SCAN
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
operatorType
=
OP_Project
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
operatorType
=
OP_Project
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doProjectOperation
;
pOperator
->
cleanup
=
destroyProjectOperatorInfo
;
...
...
@@ -6920,16 +6920,16 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
operatorType
=
OP_Groupby
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
hashGroupbyAggregate
;
pOperator
->
cleanup
=
destroyGroupbyOperatorInfo
;
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
operatorType
=
OP_Groupby
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
hashGroupbyAggregate
;
pOperator
->
cleanup
=
destroyGroupbyOperatorInfo
;
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
...
...
@@ -7163,16 +7163,16 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
pInfo
->
curPos
=
0
;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"SeqTableTagScan"
;
pOperator
->
operatorType
=
OP_TagScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
exec
=
doTagScan
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
cleanup
=
destroyTagScanOperatorInfo
;
pOperator
->
name
=
"SeqTableTagScan"
;
pOperator
->
operatorType
=
OP_TagScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
exec
=
doTagScan
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
cleanup
=
destroyTagScanOperatorInfo
;
return
pOperator
;
}
...
...
@@ -7302,17 +7302,17 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"DistinctOperator"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
operatorType
=
OP_Distinct
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
hashDistinct
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
cleanup
=
destroyDistinctOperatorInfo
;
pOperator
->
name
=
"DistinctOperator"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
operatorType
=
OP_Distinct
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
hashDistinct
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
cleanup
=
destroyDistinctOperatorInfo
;
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
...
...
@@ -7587,20 +7587,20 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfOutput
;
++
i
)
{
param
->
pExpr
[
i
]
=
pExprMsg
;
pExprMsg
->
colInfo
.
colIndex
=
htons
(
pExprMsg
->
colInfo
.
colIndex
);
pExprMsg
->
colInfo
.
colId
=
htons
(
pExprMsg
->
colInfo
.
colId
);
pExprMsg
->
colInfo
.
flag
=
htons
(
pExprMsg
->
colInfo
.
flag
);
pExprMsg
->
colBytes
=
htons
(
pExprMsg
->
colBytes
);
pExprMsg
->
colType
=
htons
(
pExprMsg
->
colType
);
pExprMsg
->
colInfo
.
colIndex
=
htons
(
pExprMsg
->
colInfo
.
colIndex
);
pExprMsg
->
colInfo
.
colId
=
htons
(
pExprMsg
->
colInfo
.
colId
);
pExprMsg
->
colInfo
.
flag
=
htons
(
pExprMsg
->
colInfo
.
flag
);
pExprMsg
->
colBytes
=
htons
(
pExprMsg
->
colBytes
);
pExprMsg
->
colType
=
htons
(
pExprMsg
->
colType
);
pExprMsg
->
resType
=
htons
(
pExprMsg
->
resType
);
pExprMsg
->
resBytes
=
htons
(
pExprMsg
->
resBytes
);
pExprMsg
->
interBytes
=
htonl
(
pExprMsg
->
interBytes
);
pExprMsg
->
resType
=
htons
(
pExprMsg
->
resType
);
pExprMsg
->
resBytes
=
htons
(
pExprMsg
->
resBytes
);
pExprMsg
->
interBytes
=
htonl
(
pExprMsg
->
interBytes
);
pExprMsg
->
functionId
=
htons
(
pExprMsg
->
functionId
);
pExprMsg
->
numOfParams
=
htons
(
pExprMsg
->
numOfParams
);
pExprMsg
->
resColId
=
htons
(
pExprMsg
->
resColId
);
pExprMsg
->
flist
.
numOfFilters
=
htons
(
pExprMsg
->
flist
.
numOfFilters
);
pExprMsg
->
functionId
=
htons
(
pExprMsg
->
functionId
);
pExprMsg
->
numOfParams
=
htons
(
pExprMsg
->
numOfParams
);
pExprMsg
->
resColId
=
htons
(
pExprMsg
->
resColId
);
pExprMsg
->
flist
.
numOfFilters
=
htons
(
pExprMsg
->
flist
.
numOfFilters
);
pMsg
+=
sizeof
(
SSqlExpr
);
for
(
int32_t
j
=
0
;
j
<
pExprMsg
->
numOfParams
;
++
j
)
{
...
...
@@ -7639,15 +7639,15 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
param
->
pSecExpr
[
i
]
=
pExprMsg
;
pExprMsg
->
colInfo
.
colIndex
=
htons
(
pExprMsg
->
colInfo
.
colIndex
);
pExprMsg
->
colInfo
.
colId
=
htons
(
pExprMsg
->
colInfo
.
colId
);
pExprMsg
->
colInfo
.
flag
=
htons
(
pExprMsg
->
colInfo
.
flag
);
pExprMsg
->
resType
=
htons
(
pExprMsg
->
resType
);
pExprMsg
->
resBytes
=
htons
(
pExprMsg
->
resBytes
);
pExprMsg
->
colBytes
=
htons
(
pExprMsg
->
colBytes
);
pExprMsg
->
colType
=
htons
(
pExprMsg
->
colType
);
pExprMsg
->
colInfo
.
colId
=
htons
(
pExprMsg
->
colInfo
.
colId
);
pExprMsg
->
colInfo
.
flag
=
htons
(
pExprMsg
->
colInfo
.
flag
);
pExprMsg
->
resType
=
htons
(
pExprMsg
->
resType
);
pExprMsg
->
resBytes
=
htons
(
pExprMsg
->
resBytes
);
pExprMsg
->
colBytes
=
htons
(
pExprMsg
->
colBytes
);
pExprMsg
->
colType
=
htons
(
pExprMsg
->
colType
);
pExprMsg
->
functionId
=
htons
(
pExprMsg
->
functionId
);
pExprMsg
->
numOfParams
=
htons
(
pExprMsg
->
numOfParams
);
pExprMsg
->
functionId
=
htons
(
pExprMsg
->
functionId
);
pExprMsg
->
numOfParams
=
htons
(
pExprMsg
->
numOfParams
);
pMsg
+=
sizeof
(
SSqlExpr
);
...
...
@@ -8422,40 +8422,40 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
SQueryAttr
*
pQueryAttr
=
&
pQInfo
->
query
;
pQInfo
->
runtimeEnv
.
pQueryAttr
=
pQueryAttr
;
pQueryAttr
->
tableGroupInfo
=
*
pTableGroupInfo
;
pQueryAttr
->
numOfCols
=
numOfCols
;
pQueryAttr
->
numOfOutput
=
numOfOutput
;
pQueryAttr
->
limit
.
limit
=
pQueryMsg
->
limit
;
pQueryAttr
->
limit
.
offset
=
pQueryMsg
->
offset
;
pQueryAttr
->
order
.
order
=
pQueryMsg
->
order
;
pQueryAttr
->
order
.
orderColId
=
pQueryMsg
->
orderColId
;
pQueryAttr
->
pExpr1
=
pExprs
;
pQueryAttr
->
pExpr2
=
pSecExprs
;
pQueryAttr
->
numOfExpr2
=
pQueryMsg
->
secondStageOutput
;
pQueryAttr
->
pGroupbyExpr
=
pGroupbyExpr
;
pQueryAttr
->
tableGroupInfo
=
*
pTableGroupInfo
;
pQueryAttr
->
numOfCols
=
numOfCols
;
pQueryAttr
->
numOfOutput
=
numOfOutput
;
pQueryAttr
->
limit
.
limit
=
pQueryMsg
->
limit
;
pQueryAttr
->
limit
.
offset
=
pQueryMsg
->
offset
;
pQueryAttr
->
order
.
order
=
pQueryMsg
->
order
;
pQueryAttr
->
order
.
orderColId
=
pQueryMsg
->
orderColId
;
pQueryAttr
->
pExpr1
=
pExprs
;
pQueryAttr
->
pExpr2
=
pSecExprs
;
pQueryAttr
->
numOfExpr2
=
pQueryMsg
->
secondStageOutput
;
pQueryAttr
->
pGroupbyExpr
=
pGroupbyExpr
;
memcpy
(
&
pQueryAttr
->
interval
,
&
pQueryMsg
->
interval
,
sizeof
(
pQueryAttr
->
interval
));
pQueryAttr
->
fillType
=
pQueryMsg
->
fillType
;
pQueryAttr
->
numOfTags
=
pQueryMsg
->
numOfTags
;
pQueryAttr
->
tagColList
=
pTagCols
;
pQueryAttr
->
fillType
=
pQueryMsg
->
fillType
;
pQueryAttr
->
numOfTags
=
pQueryMsg
->
numOfTags
;
pQueryAttr
->
tagColList
=
pTagCols
;
pQueryAttr
->
prjInfo
.
vgroupLimit
=
pQueryMsg
->
vgroupLimit
;
pQueryAttr
->
prjInfo
.
ts
=
(
pQueryMsg
->
order
==
TSDB_ORDER_ASC
)
?
INT64_MIN
:
INT64_MAX
;
pQueryAttr
->
sw
=
pQueryMsg
->
sw
;
pQueryAttr
->
vgId
=
vgId
;
pQueryAttr
->
stableQuery
=
pQueryMsg
->
stableQuery
;
pQueryAttr
->
topBotQuery
=
pQueryMsg
->
topBotQuery
;
pQueryAttr
->
groupbyColumn
=
pQueryMsg
->
groupbyColumn
;
pQueryAttr
->
hasTagResults
=
pQueryMsg
->
hasTagResults
;
pQueryAttr
->
prjInfo
.
ts
=
(
pQueryMsg
->
order
==
TSDB_ORDER_ASC
)
?
INT64_MIN
:
INT64_MAX
;
pQueryAttr
->
sw
=
pQueryMsg
->
sw
;
pQueryAttr
->
vgId
=
vgId
;
pQueryAttr
->
stableQuery
=
pQueryMsg
->
stableQuery
;
pQueryAttr
->
topBotQuery
=
pQueryMsg
->
topBotQuery
;
pQueryAttr
->
groupbyColumn
=
pQueryMsg
->
groupbyColumn
;
pQueryAttr
->
hasTagResults
=
pQueryMsg
->
hasTagResults
;
pQueryAttr
->
timeWindowInterpo
=
pQueryMsg
->
timeWindowInterpo
;
pQueryAttr
->
queryBlockDist
=
pQueryMsg
->
queryBlockDist
;
pQueryAttr
->
stabledev
=
pQueryMsg
->
stabledev
;
pQueryAttr
->
tsCompQuery
=
pQueryMsg
->
tsCompQuery
;
pQueryAttr
->
simpleAgg
=
pQueryMsg
->
simpleAgg
;
pQueryAttr
->
pointInterpQuery
=
pQueryMsg
->
pointInterpQuery
;
pQueryAttr
->
needReverseScan
=
pQueryMsg
->
needReverseScan
;
pQueryAttr
->
stateWindow
=
pQueryMsg
->
stateWindow
;
pQueryAttr
->
vgId
=
vgId
;
pQueryAttr
->
pFilters
=
pFilters
;
pQueryAttr
->
queryBlockDist
=
pQueryMsg
->
queryBlockDist
;
pQueryAttr
->
stabledev
=
pQueryMsg
->
stabledev
;
pQueryAttr
->
tsCompQuery
=
pQueryMsg
->
tsCompQuery
;
pQueryAttr
->
simpleAgg
=
pQueryMsg
->
simpleAgg
;
pQueryAttr
->
pointInterpQuery
=
pQueryMsg
->
pointInterpQuery
;
pQueryAttr
->
needReverseScan
=
pQueryMsg
->
needReverseScan
;
pQueryAttr
->
stateWindow
=
pQueryMsg
->
stateWindow
;
pQueryAttr
->
vgId
=
vgId
;
pQueryAttr
->
pFilters
=
pFilters
;
pQueryAttr
->
tableCols
=
calloc
(
numOfCols
,
sizeof
(
SSingleColumnFilterInfo
));
if
(
pQueryAttr
->
tableCols
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录