Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
aace32e6
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
aace32e6
编写于
12月 18, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
8a82da8f
4658a963
变更
11
展开全部
隐藏空白更改
内联
并排
Showing
11 changed file
with
494 addition
and
232 deletion
+494
-232
include/libs/planner/planner.h
include/libs/planner/planner.h
+5
-2
source/libs/index/CMakeLists.txt
source/libs/index/CMakeLists.txt
+1
-0
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+6
-3
source/libs/index/inc/index_cache.h
source/libs/index/inc/index_cache.h
+6
-4
source/libs/index/inc/index_fst.h
source/libs/index/inc/index_fst.h
+0
-1
source/libs/index/src/index.c
source/libs/index/src/index.c
+33
-7
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+59
-24
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+2
-0
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+10
-1
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+368
-190
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+4
-0
未找到文件。
include/libs/planner/planner.h
浏览文件 @
aace32e6
...
...
@@ -128,6 +128,7 @@ typedef struct SSubplan {
}
SSubplan
;
typedef
struct
SQueryDag
{
uint64_t
queryId
;
SArray
*
pSubplans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryDag
;
...
...
@@ -136,7 +137,7 @@ typedef struct SQueryDag {
*/
int32_t
qCreateQueryDag
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
);
int32_t
qSetSu
planExecutionNode
(
SSubplan
*
subplan
,
SArray
*
node
s
);
int32_t
qSetSu
bplanExecutionNode
(
SSubplan
*
subplan
,
SArray
*
ep
s
);
int32_t
qExplainQuery
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
...
...
@@ -147,12 +148,14 @@ int32_t qSubPlanToString(const SSubplan* subplan, char** str);
int32_t
qStringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
void
qDestroySubplan
(
SSubplan
*
pSubplan
);
/**
* Destroy the physical plan.
* @param pQueryPhyNode
* @return
*/
void
qDestroyQueryDag
(
struct
SQueryDag
*
pDag
);
void
qDestroyQueryDag
(
SQueryDag
*
pDag
);
#ifdef __cplusplus
}
...
...
source/libs/index/CMakeLists.txt
浏览文件 @
aace32e6
...
...
@@ -9,6 +9,7 @@ target_link_libraries(
index
PUBLIC os
PUBLIC util
PUBLIC common
)
if
(
${
BUILD_WITH_LUCENE
}
)
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
aace32e6
...
...
@@ -37,9 +37,11 @@ struct SIndex {
#endif
void
*
cache
;
void
*
tindex
;
SHashObj
*
fieldObj
;
// <field name, field id>
uint64_t
suid
;
int
fieldId
;
SHashObj
*
fieldObj
;
// < field name, field id>
int64_t
suid
;
// current super table id, -1 is normal table
int
fieldId
;
// field id allocated to cache
int32_t
cVersion
;
// current version allocated to cache
pthread_mutex_t
mtx
;
};
...
...
@@ -58,6 +60,7 @@ struct SIndexMultiTermQuery {
// field and key;
typedef
struct
SIndexTerm
{
uint8_t
type
;
// term data type, str/interger/json
char
*
key
;
int32_t
nKey
;
char
*
val
;
...
...
source/libs/index/inc/index_cache.h
浏览文件 @
aace32e6
...
...
@@ -17,11 +17,12 @@
#include "index.h"
#include "tlockfree.h"
#include "tskiplist.h"
// ----------------- row structure in skiplist ---------------------
/* A data row, the format is like below:
*
|<--totalLen-->|<-- fieldId-->|<-- value len--->|<-- value
-->|<--version--->|<-- itermType -->|
*
*
content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid
-->|<--version--->|<-- itermType -->|
*
len : |<--int32_t -->|<-- int16_t-->|<-- int16_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
*/
#ifdef __cplusplus
...
...
@@ -30,7 +31,7 @@ extern "C" {
typedef
struct
IndexCache
{
T_REF_DECLARE
()
int
cVersion
;
//
SSkipList
*
skiplist
;
}
IndexCache
;
...
...
@@ -39,7 +40,8 @@ IndexCache *indexCacheCreate();
void
indexCacheDestroy
(
IndexCache
*
cache
);
int
indexCachePut
(
IndexCache
*
cache
,
int32_t
fieldId
,
const
char
*
fieldVale
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operaType
);
int
indexCachePut
(
IndexCache
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
uint32_t
version
,
uint64_t
uid
,
int8_t
operType
);
int
indexCacheGet
(
IndexCache
*
cache
,
uint64_t
*
rst
);
int
indexCacheSearch
(
IndexCache
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
);
...
...
source/libs/index/inc/index_fst.h
浏览文件 @
aace32e6
...
...
@@ -315,7 +315,6 @@ typedef struct StreamWithStateResult {
FstSlice
data
;
FstOutput
out
;
void
*
state
;
}
StreamWithStateResult
;
StreamWithStateResult
*
swsResultCreate
(
FstSlice
*
data
,
FstOutput
fOut
,
void
*
state
);
...
...
source/libs/index/src/index.c
浏览文件 @
aace32e6
...
...
@@ -23,7 +23,8 @@
typedef
struct
SIdxFieldInfo
{
int
id
;
// generated by index internal
int
fieldId
;
// generated by index internal
int
cVersion
;
int
type
;
// field type
}
SIdxFieldInfo
;
...
...
@@ -39,7 +40,7 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
}
SIndex
*
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
)
{
pthread_once
(
&
isInit
,
indexInit
);
SIndex
*
sIdx
=
malloc
(
sizeof
(
SIndex
));
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
#ifdef USE_LUCENE
index_t
*
index
=
index_open
(
path
);
...
...
@@ -49,6 +50,8 @@ SIndex *indexOpen(SIndexOpts *opts, const char *path) {
sIdx
->
cache
=
(
void
*
)
indexCacheCreate
();
sIdx
->
tindex
=
NULL
;
sIdx
->
fieldObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
fieldId
=
1
;
sIdx
->
cVersion
=
1
;
pthread_mutex_init
(
&
sIdx
->
mtx
,
NULL
);
return
sIdx
;
}
...
...
@@ -65,7 +68,7 @@ void indexClose(SIndex *sIdx) {
return
;
}
int
indexPut
(
SIndex
*
index
,
SArray
*
f
ield_v
als
,
int
uid
)
{
int
indexPut
(
SIndex
*
index
,
SArray
*
f
V
als
,
int
uid
)
{
#ifdef USE_LUCENE
index_document_t
*
doc
=
index_document_create
();
...
...
@@ -73,8 +76,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) {
char
buf
[
16
]
=
{
0
};
sprintf
(
buf
,
"%d"
,
uid
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
f
ield_v
als
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
f
ield_v
als
,
i
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
f
V
als
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
f
V
als
,
i
);
index_document_add
(
doc
,
(
const
char
*
)(
p
->
key
),
p
->
nKey
,
(
const
char
*
)(
p
->
val
),
p
->
nVal
,
1
);
}
index_document_add
(
doc
,
NULL
,
0
,
buf
,
strlen
(
buf
),
0
);
...
...
@@ -82,10 +85,33 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) {
index_put
(
index
->
index
,
doc
);
index_document_destroy
(
doc
);
#endif
//TODO(yihao): reduce the lock range
pthread_mutex_lock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdxFieldInfo
*
fi
=
taosHashGet
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
);
if
(
fi
==
NULL
)
{
SIdxFieldInfo
tfi
=
{.
fieldId
=
index
->
fieldId
,
.
type
=
p
->
type
};
index
->
cVersion
++
;
index
->
fieldId
++
;
taosHashPut
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
,
&
tfi
,
sizeof
(
tfi
));
}
else
{
//TODO, del
}
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdxFieldInfo
*
fi
=
taosHashGet
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
);
assert
(
fi
!=
NULL
);
int32_t
fieldId
=
fi
->
fieldId
;
int32_t
colType
=
fi
->
type
;
int32_t
version
=
index
->
cVersion
;
}
pthread_mutex_unlock
(
&
index
->
mtx
);
return
1
;
}
int
indexSearch
(
SIndex
*
index
,
SIndexMultiTermQuery
*
multiQuerys
,
SArray
*
result
)
{
#ifdef USE_LUCENE
...
...
@@ -152,7 +178,7 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery
*
p
=
(
SIndexMultiTermQuery
*
)
malloc
(
sizeof
(
SIndexMultiTermQuery
));
if
(
p
==
NULL
)
{
return
NULL
;
}
p
->
opera
=
opera
;
p
->
query
=
taosArrayInit
(
1
,
sizeof
(
SIndexTermQuery
));
p
->
query
=
taosArrayInit
(
4
,
sizeof
(
SIndexTermQuery
));
return
p
;
}
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
)
{
...
...
source/libs/index/src/index_cache.c
浏览文件 @
aace32e6
...
...
@@ -14,12 +14,18 @@
*/
#include "index_cache.h"
#include "tcompare.h"
#define MAX_INDEX_KEY_LEN 128 // test only, change later
static
char
*
getIndexKey
(
const
void
*
pData
)
{
return
NULL
;
}
static
int32_t
compareKey
(
const
void
*
l
,
const
void
*
r
)
{
char
*
lp
=
(
char
*
)
l
;
char
*
rp
=
(
char
*
)
r
;
// skip total len
// skip total len
, not compare
int32_t
ll
,
rl
;
// len
memcpy
(
&
ll
,
lp
,
sizeof
(
int32_t
));
memcpy
(
&
rl
,
rp
,
sizeof
(
int32_t
));
...
...
@@ -27,7 +33,7 @@ static int32_t compareKey(const void *l, const void *r) {
rp
+=
sizeof
(
int32_t
);
// compare field id
int
32
_t
lf
,
rf
;
// field id
int
16
_t
lf
,
rf
;
// field id
memcpy
(
&
lf
,
lp
,
sizeof
(
lf
));
memcpy
(
&
rf
,
rp
,
sizeof
(
rf
));
if
(
lf
!=
rf
)
{
...
...
@@ -36,14 +42,22 @@ static int32_t compareKey(const void *l, const void *r) {
lp
+=
sizeof
(
lf
);
rp
+=
sizeof
(
rf
);
// compare field value
// compare field type
int16_t
lft
,
rft
;
memcpy
(
&
lft
,
lp
,
sizeof
(
lft
));
memcpy
(
&
rft
,
rp
,
sizeof
(
rft
));
lp
+=
sizeof
(
lft
);
rp
+=
sizeof
(
rft
);
assert
(
rft
==
rft
);
// skip value len
int32_t
lfl
,
rfl
;
memcpy
(
&
lfl
,
lp
,
sizeof
(
lfl
));
memcpy
(
&
rfl
,
rp
,
sizeof
(
rfl
));
lp
+=
sizeof
(
lfl
);
rp
+=
sizeof
(
rfl
);
//
refator later
//
compare value
int32_t
i
,
j
;
for
(
i
=
0
,
j
=
0
;
i
<
lfl
&&
j
<
rfl
;
i
++
,
j
++
)
{
if
(
lp
[
i
]
==
rp
[
j
])
{
continue
;
}
...
...
@@ -54,58 +68,79 @@ static int32_t compareKey(const void *l, const void *r) {
lp
+=
lfl
;
rp
+=
rfl
;
// compare version
// skip uid
uint64_t
lu
,
ru
;
memcpy
(
&
lu
,
lp
,
sizeof
(
lu
));
memcpy
(
&
ru
,
rp
,
sizeof
(
ru
));
lp
+=
sizeof
(
lu
);
rp
+=
sizeof
(
ru
);
// compare version, desc order
int32_t
lv
,
rv
;
memcpy
(
&
lv
,
lp
,
sizeof
(
lv
));
memcpy
(
&
rv
,
rp
,
sizeof
(
rv
));
if
(
lv
!=
rv
)
{
return
lv
>
rv
?
-
1
:
1
;
}
}
lp
+=
sizeof
(
lv
);
rp
+=
sizeof
(
rv
);
// not care item type
return
0
;
}
IndexCache
*
indexCacheCreate
()
{
IndexCache
*
cache
=
calloc
(
1
,
sizeof
(
IndexCache
));
cache
->
skiplist
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
MAX_INDEX_KEY_LEN
,
compareKey
,
SL_ALLOW_DUP_KEY
,
getIndexKey
);
return
cache
;
}
void
indexCacheDestroy
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
tSkipListDestroy
(
cache
->
skiplist
);
free
(
cache
);
}
int
indexCachePut
(
IndexCache
*
cache
,
int32_t
fieldId
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
int
indexCachePut
(
IndexCache
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
uint32_t
version
,
uint64_t
uid
,
int8_t
operType
)
{
if
(
cache
==
NULL
)
{
return
-
1
;}
int32_t
version
=
T_REF_INC
(
cache
);
int32_t
total
=
sizeof
(
int32_t
)
+
sizeof
(
fieldId
)
+
4
+
fvlen
+
sizeof
(
version
)
+
sizeof
(
uid
)
+
sizeof
(
operType
);
// encode data
int32_t
total
=
sizeof
(
int32_t
)
+
sizeof
(
fieldId
)
+
sizeof
(
fieldType
)
+
sizeof
(
fvLen
)
+
fvLen
+
sizeof
(
version
)
+
sizeof
(
uid
)
+
sizeof
(
operType
);
char
*
buf
=
calloc
(
1
,
total
);
char
*
p
=
buf
;
memcpy
(
buf
,
&
total
,
sizeof
(
total
));
total
+=
total
;
memcpy
(
p
,
&
total
,
sizeof
(
total
));
p
+=
sizeof
(
total
);
memcpy
(
p
,
&
fieldId
,
sizeof
(
fieldId
));
p
+=
sizeof
(
fieldId
);
memcpy
(
buf
,
&
fieldId
,
sizeof
(
fieldId
));
buf
+=
sizeof
(
fieldId
);
memcpy
(
p
,
&
fieldType
,
sizeof
(
fieldType
));
p
+=
sizeof
(
fieldType
);
memcpy
(
p
,
&
fvLen
,
sizeof
(
fvLen
));
p
+=
sizeof
(
fvLen
);
memcpy
(
p
,
fieldValue
,
fvLen
);
p
+=
fvLen
;
memcpy
(
buf
,
&
fvlen
,
sizeof
(
fvlen
));
buf
+=
sizeof
(
fvlen
);
memcpy
(
buf
,
fieldValue
,
fvlen
);
buf
+=
fvlen
;
memcpy
(
p
,
&
version
,
sizeof
(
version
));
p
+=
sizeof
(
version
);
memcpy
(
buf
,
&
version
,
sizeof
(
version
));
buf
+=
sizeof
(
version
);
memcpy
(
p
,
&
uid
,
sizeof
(
uid
));
p
+=
sizeof
(
uid
);
memcpy
(
buf
,
&
uid
,
sizeof
(
uid
));
buf
+=
sizeof
(
uid
);
memcpy
(
p
,
&
operType
,
sizeof
(
operType
));
p
+=
sizeof
(
operType
);
memcpy
(
buf
,
&
operType
,
sizeof
(
operType
));
buf
+=
sizeof
(
operType
);
tSkipListPut
(
cache
->
skiplist
,
(
void
*
)
buf
);
// encode end
}
int
indexCacheDel
(
IndexCache
*
cache
,
int32_t
fieldId
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
}
int
indexCacheSearch
(
IndexCache
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
)
{
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
aace32e6
...
...
@@ -116,6 +116,8 @@ void destroyQueryPlan(struct SQueryPlanNode* pQueryNode);
*/
void
*
destroyQueryPhyPlan
(
struct
SPhyNode
*
pQueryPhyNode
);
int32_t
opNameToOpType
(
const
char
*
name
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
aace32e6
...
...
@@ -179,7 +179,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
assert
(
false
);
}
if
(
pPlanNode
->
pChildren
!=
NULL
&&
taosArrayGetSize
(
pPlanNode
->
pChildren
)
>
0
)
{
node
->
pChildren
=
taosArrayInit
(
4
,
POINTER_BYTES
);
node
->
pChildren
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
size_t
size
=
taosArrayGetSize
(
pPlanNode
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhyNode
*
child
=
createPhyNode
(
pCxt
,
taosArrayGet
(
pPlanNode
->
pChildren
,
i
));
...
...
@@ -215,3 +215,12 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
*
pDag
=
context
.
pDag
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
opNameToOpType
(
const
char
*
name
)
{
for
(
int32_t
i
=
1
;
i
<
sizeof
(
gOpName
)
/
sizeof
(
gOpName
[
0
]);
++
i
)
{
if
(
strcmp
(
name
,
gOpName
[
i
]))
{
return
i
;
}
}
return
OP_Unknown
;
}
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
aace32e6
此差异已折叠。
点击以展开。
source/libs/planner/src/planner.c
浏览文件 @
aace32e6
...
...
@@ -16,6 +16,10 @@
#include "parser.h"
#include "plannerInt.h"
void
qDestroySubplan
(
SSubplan
*
pSubplan
)
{
// todo
}
void
qDestroyQueryDag
(
struct
SQueryDag
*
pDag
)
{
// todo
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录