Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4681611e
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4681611e
编写于
12月 19, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
77503092
e0e9eca7
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
428 addition
and
110 deletion
+428
-110
cmake/cmake.options
cmake/cmake.options
+7
-0
include/libs/index/index.h
include/libs/index/index.h
+17
-5
include/libs/planner/planner.h
include/libs/planner/planner.h
+7
-2
source/libs/index/CMakeLists.txt
source/libs/index/CMakeLists.txt
+5
-1
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+11
-12
source/libs/index/inc/index_cache.h
source/libs/index/inc/index_cache.h
+4
-4
source/libs/index/src/index.c
source/libs/index/src/index.c
+69
-46
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+15
-17
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+1
-1
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+78
-8
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+2
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+17
-13
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+191
-0
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+4
-0
未找到文件。
cmake/cmake.options
浏览文件 @
4681611e
...
@@ -37,6 +37,7 @@ option(
...
@@ -37,6 +37,7 @@ option(
off
off
)
)
option(
option(
BUILD_WITH_NURAFT
BUILD_WITH_NURAFT
"If build with NuRaft"
"If build with NuRaft"
...
@@ -54,3 +55,9 @@ option(
...
@@ -54,3 +55,9 @@ option(
"If use doxygen build documents"
"If use doxygen build documents"
OFF
OFF
)
)
option(
BUILD_WITH_INVERTEDINDEX
"If use invertedIndex"
ON
)
include/libs/index/index.h
浏览文件 @
4681611e
...
@@ -24,6 +24,7 @@ extern "C" {
...
@@ -24,6 +24,7 @@ extern "C" {
#endif
#endif
typedef
struct
SIndex
SIndex
;
typedef
struct
SIndex
SIndex
;
typedef
struct
SIndexTerm
SIndexTerm
;
typedef
struct
SIndexOpts
SIndexOpts
;
typedef
struct
SIndexOpts
SIndexOpts
;
typedef
struct
SIndexMultiTermQuery
SIndexMultiTermQuery
;
typedef
struct
SIndexMultiTermQuery
SIndexMultiTermQuery
;
typedef
struct
SArray
SIndexMultiTerm
;
typedef
struct
SArray
SIndexMultiTerm
;
...
@@ -35,7 +36,7 @@ typedef enum {
...
@@ -35,7 +36,7 @@ typedef enum {
ADD_INDEX
,
// add index on specify column
ADD_INDEX
,
// add index on specify column
DROP_INDEX
,
// drop existed index
DROP_INDEX
,
// drop existed index
DROP_SATBLE
// drop stable
DROP_SATBLE
// drop stable
}
SIndex
ColumnType
;
}
SIndex
OperOnColumn
;
typedef
enum
{
MUST
=
0
,
SHOULD
=
1
,
NOT
=
2
}
EIndexOperatorType
;
typedef
enum
{
MUST
=
0
,
SHOULD
=
1
,
NOT
=
2
}
EIndexOperatorType
;
typedef
enum
{
QUERY_TERM
=
0
,
QUERY_PREFIX
=
1
,
QUERY_SUFFIX
=
2
,
QUERY_REGEX
=
3
}
EIndexQueryType
;
typedef
enum
{
QUERY_TERM
=
0
,
QUERY_PREFIX
=
1
,
QUERY_SUFFIX
=
2
,
QUERY_REGEX
=
3
}
EIndexQueryType
;
...
@@ -45,12 +46,12 @@ typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX =
...
@@ -45,12 +46,12 @@ typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX =
*/
*/
SIndexMultiTermQuery
*
indexMultiTermQueryCreate
(
EIndexOperatorType
oper
);
SIndexMultiTermQuery
*
indexMultiTermQueryCreate
(
EIndexOperatorType
oper
);
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
);
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
);
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
const
char
*
field
,
int32_t
nFields
,
const
char
*
value
,
int32_t
nValue
,
EIndexQueryType
type
);
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
SIndexTerm
*
term
,
EIndexQueryType
type
);
/*
/*
* @param:
* @param:
* @param:
* @param:
*/
*/
SIndex
*
indexOpen
(
SIndexOpts
*
opt
,
const
char
*
path
);
int
indexOpen
(
SIndexOpts
*
opt
,
const
char
*
path
,
SIndex
**
index
);
void
indexClose
(
SIndex
*
index
);
void
indexClose
(
SIndex
*
index
);
int
indexPut
(
SIndex
*
index
,
SIndexMultiTerm
*
terms
,
int
uid
);
int
indexPut
(
SIndex
*
index
,
SIndexMultiTerm
*
terms
,
int
uid
);
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
);
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
);
...
@@ -61,8 +62,8 @@ int indexRebuild(SIndex *index, SIndexOpts *opt);
...
@@ -61,8 +62,8 @@ int indexRebuild(SIndex *index, SIndexOpts *opt);
* @param
* @param
*/
*/
SIndexMultiTerm
*
indexMultiTermCreate
();
SIndexMultiTerm
*
indexMultiTermCreate
();
int
indexMultiTermAdd
(
SIndexMultiTerm
*
terms
,
const
char
*
field
,
int32_t
nFields
,
const
char
*
value
,
int32_t
nValue
);
int
indexMultiTermAdd
(
SIndexMultiTerm
*
terms
,
SIndexTerm
*
term
);
void
indexMultiTermDestroy
(
SIndexMultiTerm
*
terms
);
void
indexMultiTermDestroy
(
SIndexMultiTerm
*
terms
);
/*
/*
* @param:
* @param:
* @param:
* @param:
...
@@ -70,6 +71,17 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms);
...
@@ -70,6 +71,17 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms);
SIndexOpts
*
indexOptsCreate
();
SIndexOpts
*
indexOptsCreate
();
void
indexOptsDestroy
(
SIndexOpts
*
opts
);
void
indexOptsDestroy
(
SIndexOpts
*
opts
);
/*
* @param:
* @param:
*/
SIndexTerm
*
indexTermCreate
(
int64_t
suid
,
SIndexOperOnColumn
operType
,
uint8_t
colType
,
const
char
*
colName
,
int32_t
nColName
,
const
char
*
colVal
,
int32_t
nColVal
);
void
indexTermDestroy
(
SIndexTerm
*
p
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
include/libs/planner/planner.h
浏览文件 @
4681611e
...
@@ -108,7 +108,7 @@ typedef struct SProjectPhyNode {
...
@@ -108,7 +108,7 @@ typedef struct SProjectPhyNode {
typedef
struct
SExchangePhyNode
{
typedef
struct
SExchangePhyNode
{
SPhyNode
node
;
SPhyNode
node
;
uint64_t
srcTemplateId
;
// template id of datasource suplans
uint64_t
srcTemplateId
;
// template id of datasource suplans
SArray
*
pS
ourceEpSet
;
// SEpSet
, scheduler fill by calling qSetSuplanExecutionNode
SArray
*
pS
rcEndPoints
;
// SEpAddrMsg
, scheduler fill by calling qSetSuplanExecutionNode
}
SExchangePhyNode
;
}
SExchangePhyNode
;
typedef
struct
SSubplanId
{
typedef
struct
SSubplanId
{
...
@@ -129,6 +129,7 @@ typedef struct SSubplan {
...
@@ -129,6 +129,7 @@ typedef struct SSubplan {
typedef
struct
SQueryDag
{
typedef
struct
SQueryDag
{
uint64_t
queryId
;
uint64_t
queryId
;
int32_t
numOfSubplans
;
SArray
*
pSubplans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
SArray
*
pSubplans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
SQueryDag
;
}
SQueryDag
;
...
@@ -137,7 +138,11 @@ typedef struct SQueryDag {
...
@@ -137,7 +138,11 @@ typedef struct SQueryDag {
*/
*/
int32_t
qCreateQueryDag
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
);
int32_t
qCreateQueryDag
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
struct
SQueryDag
**
pDag
);
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
SArray
*
eps
);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @eps Execution location of this group of datasource subplans, is an array of SEpAddr structures
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SArray
*
eps
);
int32_t
qExplainQuery
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
int32_t
qExplainQuery
(
const
struct
SQueryStmtInfo
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
...
...
source/libs/index/CMakeLists.txt
浏览文件 @
4681611e
...
@@ -22,9 +22,13 @@ if (${BUILD_WITH_LUCENE})
...
@@ -22,9 +22,13 @@ if (${BUILD_WITH_LUCENE})
index
index
PUBLIC lucene++
PUBLIC lucene++
)
)
endif
(
${
BUILD_WITH_LUCENE
}
)
endif
(
${
BUILD_WITH_LUCENE
}
)
if
(
${
BUILD_WITH_INVERTEDINDEX
}
)
add_definitions
(
-DUSE_INVERTED_INDEX
)
endif
(
${
BUILD_WITH_INVERTEDINDEX
}
)
if
(
${
BUILD_TEST
}
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
endif
(
${
BUILD_TEST
}
)
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
4681611e
...
@@ -37,10 +37,10 @@ struct SIndex {
...
@@ -37,10 +37,10 @@ struct SIndex {
#endif
#endif
void
*
cache
;
void
*
cache
;
void
*
tindex
;
void
*
tindex
;
SHashObj
*
field
Obj
;
// < field name, field id>
SHashObj
*
col
Obj
;
// < field name, field id>
int64_t
suid
;
// current super table id, -1 is normal table
int64_t
suid
;
// current super table id, -1 is normal table
int
field
Id
;
// field id allocated to cache
int
col
Id
;
// field id allocated to cache
int32_t
cVersion
;
// current version allocated to cache
int32_t
cVersion
;
// current version allocated to cache
pthread_mutex_t
mtx
;
pthread_mutex_t
mtx
;
};
};
...
@@ -60,22 +60,21 @@ struct SIndexMultiTermQuery {
...
@@ -60,22 +60,21 @@ struct SIndexMultiTermQuery {
// field and key;
// field and key;
typedef
struct
SIndexTerm
{
typedef
struct
SIndexTerm
{
uint8_t
type
;
// term data type, str/interger/json
int64_t
suid
;
char
*
key
;
SIndexOperOnColumn
operType
;
// oper type, add/del/update
int32_t
nKey
;
uint8_t
colType
;
// term data type, str/interger/json
char
*
val
;
char
*
colName
;
int32_t
nVal
;
int32_t
nColName
;
char
*
colVal
;
int32_t
nColVal
;
}
SIndexTerm
;
}
SIndexTerm
;
typedef
struct
SIndexTermQuery
{
typedef
struct
SIndexTermQuery
{
SIndexTerm
*
field_value
;
SIndexTerm
*
term
;
EIndexQueryType
t
ype
;
EIndexQueryType
qT
ype
;
}
SIndexTermQuery
;
}
SIndexTermQuery
;
SIndexTerm
*
indexTermCreate
(
const
char
*
key
,
int32_t
nKey
,
const
char
*
val
,
int32_t
nVal
);
void
indexTermDestroy
(
SIndexTerm
*
p
);
#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); }} while(0)
#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); }} while(0)
#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); }} while(0)
#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); }} while(0)
...
...
source/libs/index/inc/index_cache.h
浏览文件 @
4681611e
...
@@ -38,13 +38,13 @@ typedef struct IndexCache {
...
@@ -38,13 +38,13 @@ typedef struct IndexCache {
//
//
IndexCache
*
indexCacheCreate
();
IndexCache
*
indexCacheCreate
();
void
indexCacheDestroy
(
IndexCache
*
cache
);
void
indexCacheDestroy
(
void
*
cache
);
int
indexCachePut
(
IndexCache
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
int
indexCachePut
(
void
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
uint32_t
version
,
uint64_t
uid
,
int8_t
operType
);
uint32_t
version
,
uint64_t
uid
,
int8_t
operType
);
int
indexCacheGet
(
IndexCache
*
cache
,
uint64_t
*
rst
);
int
indexCacheGet
(
void
*
cache
,
uint64_t
*
rst
);
int
indexCacheSearch
(
IndexCache
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
);
int
indexCacheSearch
(
void
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/index/src/index.c
浏览文件 @
4681611e
...
@@ -22,11 +22,10 @@
...
@@ -22,11 +22,10 @@
#endif
#endif
typedef
struct
SIdx
Field
Info
{
typedef
struct
SIdx
Col
Info
{
int
field
Id
;
// generated by index internal
int
col
Id
;
// generated by index internal
int
cVersion
;
int
cVersion
;
int
type
;
// field type
}
SIdxColInfo
;
}
SIdxFieldInfo
;
static
pthread_once_t
isInit
=
PTHREAD_ONCE_INIT
;
static
pthread_once_t
isInit
=
PTHREAD_ONCE_INIT
;
static
void
indexInit
();
static
void
indexInit
();
...
@@ -38,22 +37,25 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
...
@@ -38,22 +37,25 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
indexWarn
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
indexWarn
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
return
0
;
return
0
;
}
}
SIndex
*
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
)
{
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
pthread_once
(
&
isInit
,
indexInit
);
pthread_once
(
&
isInit
,
indexInit
);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
#ifdef USE_LUCENE
#ifdef USE_LUCENE
index_t
*
index
=
index_open
(
path
);
index_t
*
index
=
index_open
(
path
);
sIdx
->
index
=
index
;
sIdx
->
index
=
index
;
#endif
#endif
sIdx
->
cache
=
(
void
*
)
indexCacheCreate
();
sIdx
->
cache
=
(
void
*
)
indexCacheCreate
();
sIdx
->
tindex
=
NULL
;
sIdx
->
tindex
=
NULL
;
sIdx
->
field
Obj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
col
Obj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
field
Id
=
1
;
sIdx
->
col
Id
=
1
;
sIdx
->
cVersion
=
1
;
sIdx
->
cVersion
=
1
;
pthread_mutex_init
(
&
sIdx
->
mtx
,
NULL
);
pthread_mutex_init
(
&
sIdx
->
mtx
,
NULL
);
return
sIdx
;
*
index
=
sIdx
;
return
0
;
}
}
void
indexClose
(
SIndex
*
sIdx
)
{
void
indexClose
(
SIndex
*
sIdx
)
{
...
@@ -61,14 +63,17 @@ void indexClose(SIndex *sIdx) {
...
@@ -61,14 +63,17 @@ void indexClose(SIndex *sIdx) {
index_close
(
sIdex
->
index
);
index_close
(
sIdex
->
index
);
sIdx
->
index
=
NULL
;
sIdx
->
index
=
NULL
;
#endif
#endif
#ifdef USE_INVERTED_INDEX
indexCacheDestroy
(
sIdx
->
cache
);
indexCacheDestroy
(
sIdx
->
cache
);
taosHashCleanup
(
sIdx
->
field
Obj
);
taosHashCleanup
(
sIdx
->
col
Obj
);
pthread_mutex_destroy
(
&
sIdx
->
mtx
);
pthread_mutex_destroy
(
&
sIdx
->
mtx
);
#endif
free
(
sIdx
);
free
(
sIdx
);
return
;
return
;
}
}
int
indexPut
(
SIndex
*
index
,
S
Array
*
fVals
,
int
uid
)
{
int
indexPut
(
SIndex
*
index
,
S
IndexMultiTerm
*
fVals
,
int
uid
)
{
#ifdef USE_LUCENE
#ifdef USE_LUCENE
index_document_t
*
doc
=
index_document_create
();
index_document_t
*
doc
=
index_document_create
();
...
@@ -86,32 +91,38 @@ int indexPut(SIndex *index, SArray* fVals, int uid) {
...
@@ -86,32 +91,38 @@ int indexPut(SIndex *index, SArray* fVals, int uid) {
index_document_destroy
(
doc
);
index_document_destroy
(
doc
);
#endif
#endif
#ifdef USE_INVERTED_INDEX
//TODO(yihao): reduce the lock range
//TODO(yihao): reduce the lock range
pthread_mutex_lock
(
&
index
->
mtx
);
pthread_mutex_lock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdx
FieldInfo
*
fi
=
taosHashGet
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
);
SIdx
ColInfo
*
fi
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
if
(
fi
==
NULL
)
{
if
(
fi
==
NULL
)
{
SIdx
FieldInfo
tfi
=
{.
fieldId
=
index
->
fieldId
,
.
type
=
p
->
type
};
SIdx
ColInfo
tfi
=
{.
colId
=
index
->
colId
};
index
->
cVersion
++
;
index
->
cVersion
++
;
index
->
field
Id
++
;
index
->
col
Id
++
;
taosHashPut
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
,
&
tfi
,
sizeof
(
tfi
));
taosHashPut
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
,
&
tfi
,
sizeof
(
tfi
));
}
else
{
}
else
{
//TODO, del
//TODO, del
}
}
}
}
pthread_mutex_unlock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdx
FieldInfo
*
fi
=
taosHashGet
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
);
SIdx
ColInfo
*
fi
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
assert
(
fi
!=
NULL
);
assert
(
fi
!=
NULL
);
int32_t
fieldId
=
fi
->
fieldId
;
int32_t
colId
=
fi
->
colId
;
int32_t
colType
=
fi
->
type
;
int32_t
version
=
index
->
cVersion
;
int32_t
version
=
index
->
cVersion
;
int
ret
=
indexCachePut
(
index
->
cache
,
colId
,
p
->
colType
,
p
->
colVal
,
p
->
nColVal
,
version
,
uid
,
p
->
operType
);
if
(
ret
!=
0
)
{
return
ret
;
}
}
}
pthread_mutex_unlock
(
&
index
->
mtx
);
#endif
return
1
;
return
0
;
}
}
int
indexSearch
(
SIndex
*
index
,
SIndexMultiTermQuery
*
multiQuerys
,
SArray
*
result
)
{
int
indexSearch
(
SIndex
*
index
,
SIndexMultiTermQuery
*
multiQuerys
,
SArray
*
result
)
{
#ifdef USE_LUCENE
#ifdef USE_LUCENE
...
@@ -148,16 +159,26 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
...
@@ -148,16 +159,26 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
free
(
fields
);
free
(
fields
);
free
(
keys
);
free
(
keys
);
free
(
types
);
free
(
types
);
#endif
#ifdef USE_INVERTED_INDEX
#endif
#endif
return
1
;
return
1
;
}
}
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
)
{
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
)
{
#ifdef USE_INVERTED_INDEX
#endif
return
1
;
return
1
;
}
}
int
indexRebuild
(
SIndex
*
index
,
SIndexOpts
*
opts
);
int
indexRebuild
(
SIndex
*
index
,
SIndexOpts
*
opts
)
{
#ifdef USE_INVERTED_INDEX
#endif
}
SIndexOpts
*
indexOptsCreate
()
{
SIndexOpts
*
indexOptsCreate
()
{
...
@@ -184,53 +205,55 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) {
...
@@ -184,53 +205,55 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) {
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
)
{
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pQuery
->
query
);
i
++
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pQuery
->
query
);
i
++
)
{
SIndexTermQuery
*
p
=
(
SIndexTermQuery
*
)
taosArrayGet
(
pQuery
->
query
,
i
);
SIndexTermQuery
*
p
=
(
SIndexTermQuery
*
)
taosArrayGet
(
pQuery
->
query
,
i
);
indexTermDestroy
(
p
->
field_value
);
indexTermDestroy
(
p
->
term
);
}
}
taosArrayDestroy
(
pQuery
->
query
);
taosArrayDestroy
(
pQuery
->
query
);
free
(
pQuery
);
free
(
pQuery
);
};
};
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
const
char
*
field
,
int32_t
nFields
,
const
char
*
value
,
int32_t
nValue
,
EIndexQueryType
type
){
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
SIndexTerm
*
term
,
EIndexQueryType
qType
){
SIndexTerm
*
t
=
indexTermCreate
(
field
,
nFields
,
value
,
nValue
);
SIndexTermQuery
q
=
{.
qType
=
qType
,
.
term
=
term
};
if
(
t
==
NULL
)
{
return
-
1
;}
SIndexTermQuery
q
=
{.
type
=
type
,
.
field_value
=
t
};
taosArrayPush
(
pQuery
->
query
,
&
q
);
taosArrayPush
(
pQuery
->
query
,
&
q
);
return
0
;
return
0
;
}
}
SIndexTerm
*
indexTermCreate
(
const
char
*
key
,
int32_t
nKey
,
const
char
*
val
,
int32_t
nVal
)
{
SIndexTerm
*
indexTermCreate
(
int64_t
suid
,
SIndexOperOnColumn
oper
,
uint8_t
colType
,
const
char
*
colName
,
int32_t
nColName
,
const
char
*
colVal
,
int32_t
nColVal
)
{
SIndexTerm
*
t
=
(
SIndexTerm
*
)
malloc
(
sizeof
(
SIndexTerm
));
SIndexTerm
*
t
=
(
SIndexTerm
*
)
calloc
(
1
,
(
sizeof
(
SIndexTerm
)));
t
->
key
=
(
char
*
)
calloc
(
nKey
+
1
,
1
);
if
(
t
==
NULL
)
{
return
NULL
;
}
memcpy
(
t
->
key
,
key
,
nKey
);
t
->
nKey
=
nKey
;
t
->
suid
=
suid
;
t
->
operType
=
oper
;
t
->
colType
=
colType
;
t
->
colName
=
(
char
*
)
calloc
(
1
,
nColName
+
1
);
memcpy
(
t
->
colName
,
colName
,
nColName
);
t
->
nColName
=
nColName
;
t
->
val
=
(
char
*
)
calloc
(
nVal
+
1
,
1
);
t
->
colVal
=
(
char
*
)
calloc
(
1
,
nColVal
+
1
);
memcpy
(
t
->
val
,
val
,
n
Val
);
memcpy
(
t
->
colVal
,
colVal
,
nCol
Val
);
t
->
n
Val
=
n
Val
;
t
->
n
ColVal
=
nCol
Val
;
return
t
;
return
t
;
}
}
void
indexTermDestroy
(
SIndexTerm
*
p
)
{
void
indexTermDestroy
(
SIndexTerm
*
p
)
{
free
(
p
->
key
);
free
(
p
->
colName
);
free
(
p
->
v
al
);
free
(
p
->
colV
al
);
free
(
p
);
free
(
p
);
}
}
S
Array
*
indexMultiTermCreate
()
{
S
IndexMultiTerm
*
indexMultiTermCreate
()
{
return
taosArrayInit
(
4
,
sizeof
(
SIndexTerm
*
));
return
taosArrayInit
(
4
,
sizeof
(
SIndexTerm
*
));
}
}
int
indexMultiTermAdd
(
SArray
*
array
,
const
char
*
field
,
int32_t
nField
,
const
char
*
val
,
int32_t
nVal
)
{
int
indexMultiTermAdd
(
SIndexMultiTerm
*
terms
,
SIndexTerm
*
term
)
{
SIndexTerm
*
term
=
indexTermCreate
(
field
,
nField
,
val
,
nVal
);
taosArrayPush
(
terms
,
&
term
);
if
(
term
==
NULL
)
{
return
-
1
;
}
taosArrayPush
(
array
,
&
term
);
return
0
;
return
0
;
}
}
void
indexMultiTermDestroy
(
S
Array
*
array
)
{
void
indexMultiTermDestroy
(
S
IndexMultiTerm
*
terms
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
array
);
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
terms
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
array
,
i
);
SIndexTerm
*
p
=
taosArrayGetP
(
terms
,
i
);
indexTermDestroy
(
p
);
indexTermDestroy
(
p
);
}
}
taosArrayDestroy
(
array
);
taosArrayDestroy
(
terms
);
}
}
void
indexInit
()
{
void
indexInit
()
{
...
...
source/libs/index/src/index_cache.c
浏览文件 @
4681611e
...
@@ -16,7 +16,7 @@
...
@@ -16,7 +16,7 @@
#include "index_cache.h"
#include "index_cache.h"
#include "tcompare.h"
#include "tcompare.h"
#define MAX_INDEX_KEY_LEN
128
// test only, change later
#define MAX_INDEX_KEY_LEN
256
// test only, change later
static
char
*
getIndexKey
(
const
void
*
pData
)
{
static
char
*
getIndexKey
(
const
void
*
pData
)
{
return
NULL
;
return
NULL
;
...
@@ -96,16 +96,19 @@ IndexCache *indexCacheCreate() {
...
@@ -96,16 +96,19 @@ IndexCache *indexCacheCreate() {
}
}
void
indexCacheDestroy
(
IndexCache
*
cache
)
{
void
indexCacheDestroy
(
void
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
IndexCache
*
pCache
=
cache
;
tSkipListDestroy
(
cache
->
skiplist
);
if
(
pCache
==
NULL
)
{
return
;
}
free
(
cache
);
tSkipListDestroy
(
pCache
->
skiplist
);
free
(
pCache
);
}
}
int
indexCachePut
(
IndexCache
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
int
indexCachePut
(
void
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
uint32_t
version
,
uint64_t
uid
,
int8_t
operType
)
{
uint32_t
version
,
uint64_t
uid
,
int8_t
operType
)
{
if
(
cache
==
NULL
)
{
return
-
1
;}
if
(
cache
==
NULL
)
{
return
-
1
;}
IndexCache
*
pCache
=
cache
;
// encode data
// encode data
int32_t
total
=
sizeof
(
int32_t
)
+
sizeof
(
fieldId
)
+
sizeof
(
fieldType
)
+
sizeof
(
fvLen
)
+
fvLen
+
sizeof
(
version
)
+
sizeof
(
uid
)
+
sizeof
(
operType
);
int32_t
total
=
sizeof
(
int32_t
)
+
sizeof
(
fieldId
)
+
sizeof
(
fieldType
)
+
sizeof
(
fvLen
)
+
fvLen
+
sizeof
(
version
)
+
sizeof
(
uid
)
+
sizeof
(
operType
);
...
@@ -135,20 +138,15 @@ int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const c
...
@@ -135,20 +138,15 @@ int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const c
memcpy
(
p
,
&
operType
,
sizeof
(
operType
));
memcpy
(
p
,
&
operType
,
sizeof
(
operType
));
p
+=
sizeof
(
operType
);
p
+=
sizeof
(
operType
);
tSkipListPut
(
c
ache
->
skiplist
,
(
void
*
)
buf
);
tSkipListPut
(
pC
ache
->
skiplist
,
(
void
*
)
buf
);
// encode end
// encode end
}
}
int
indexCacheDel
(
IndexCache
*
cache
,
int32_t
fieldId
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
int
indexCacheDel
(
void
*
cache
,
int32_t
fieldId
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
IndexCache
*
pCache
=
cache
;
return
0
;
}
}
int
indexCacheSearch
(
IndexCache
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
)
{
int
indexCacheSearch
(
void
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
)
{
return
0
;
return
0
;
}
}
source/libs/index/test/CMakeLists.txt
浏览文件 @
4681611e
add_executable
(
indexTest
""
)
add_executable
(
indexTest
""
)
target_sources
(
indexTest
target_sources
(
indexTest
PRIVATE
PRIVATE
"indexTests.c
pp
"
"indexTests.c
c
"
)
)
target_include_directories
(
indexTest
target_include_directories
(
indexTest
PUBLIC
PUBLIC
...
...
source/libs/index/test/indexTests.c
pp
→
source/libs/index/test/indexTests.c
c
浏览文件 @
4681611e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <gtest/gtest.h>
#include <string>
#include <string>
#include <iostream>
#include <iostream>
...
@@ -61,7 +75,7 @@ class FstReadMemory {
...
@@ -61,7 +75,7 @@ class FstReadMemory {
// add later
// add later
bool
Search
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>
&
result
)
{
bool
Search
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>
&
result
)
{
FstStreamBuilder
*
sb
=
fstSearch
(
_fst
,
ctx
);
FstStreamBuilder
*
sb
=
fstSearch
(
_fst
,
ctx
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
...
@@ -279,15 +293,71 @@ void validateFst() {
...
@@ -279,15 +293,71 @@ void validateFst() {
delete
m
;
delete
m
;
}
}
class
IndexEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
taosRemoveDir
(
path
);
opts
=
indexOptsCreate
();
int
ret
=
indexOpen
(
opts
,
path
,
&
index
);
assert
(
ret
==
0
);
}
virtual
void
TearDown
()
{
indexClose
(
index
);
indexOptsDestroy
(
opts
);
}
const
char
*
path
=
"/tmp/tindex"
;
SIndexOpts
*
opts
;
SIndex
*
index
;
};
TEST_F
(
IndexEnv
,
testPut
)
{
// single index column
{
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello world"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
100
;
i
++
)
{
int
tableId
=
i
;
int
ret
=
indexPut
(
index
,
terms
,
tableId
);
assert
(
ret
==
0
);
}
indexMultiTermDestroy
(
terms
);
}
// multi index column
{
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
{
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello world"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermAdd
(
terms
,
term
);
}
{
std
::
string
colName
(
"tag2"
),
colVal
(
"Hello world"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermAdd
(
terms
,
term
);
}
for
(
int
i
=
0
;
i
<
100
;
i
++
)
{
int
tableId
=
i
;
int
ret
=
indexPut
(
index
,
terms
,
tableId
);
assert
(
ret
==
0
);
}
indexMultiTermDestroy
(
terms
);
}
//
}
int
main
(
int
argc
,
char
**
argv
)
{
TEST_F
(
IndexEnv
,
testDel
)
{
checkFstPerf
();
//checkFstPrefixSearch();
return
1
;
}
}
//TEST(IndexFstBuilder, IndexFstInput) {
//
//}
source/libs/planner/inc/plannerInt.h
浏览文件 @
4681611e
...
@@ -100,8 +100,9 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
...
@@ -100,8 +100,9 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
);
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SArray
*
eps
);
int32_t
subPlanToString
(
const
SSubplan
*
pPhyNode
,
char
**
str
);
int32_t
subPlanToString
(
const
SSubplan
*
pPhyNode
,
char
**
str
);
int32_t
stringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
/**
/**
* Destroy the query plan object.
* Destroy the query plan object.
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
4681611e
...
@@ -19,6 +19,13 @@
...
@@ -19,6 +19,13 @@
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _
typedef
struct
SPlanContext
{
struct
SCatalog
*
pCatalog
;
struct
SQueryDag
*
pDag
;
SSubplan
*
pCurrentSubplan
;
SSubplanId
nextId
;
}
SPlanContext
;
static
const
char
*
gOpName
[]
=
{
static
const
char
*
gOpName
[]
=
{
"Unknown"
,
"Unknown"
,
#define INCLUDE_AS_NAME
#define INCLUDE_AS_NAME
...
@@ -26,12 +33,14 @@ static const char* gOpName[] = {
...
@@ -26,12 +33,14 @@ static const char* gOpName[] = {
#undef INCLUDE_AS_NAME
#undef INCLUDE_AS_NAME
};
};
typedef
struct
SPlanContext
{
int32_t
opNameToOpType
(
const
char
*
name
)
{
struct
SCatalog
*
pCatalog
;
for
(
int32_t
i
=
1
;
i
<
sizeof
(
gOpName
)
/
sizeof
(
gOpName
[
0
]);
++
i
)
{
struct
SQueryDag
*
pDag
;
if
(
strcmp
(
name
,
gOpName
[
i
]))
{
SSubplan
*
pCurrentSubplan
;
return
i
;
SSubplanId
nextId
;
}
}
SPlanContext
;
}
return
OP_Unknown
;
}
static
void
toDataBlockSchema
(
SQueryPlanNode
*
pPlanNode
,
SDataBlockSchema
*
dataBlockSchema
)
{
static
void
toDataBlockSchema
(
SQueryPlanNode
*
pPlanNode
,
SDataBlockSchema
*
dataBlockSchema
)
{
SWAP
(
dataBlockSchema
->
pSchema
,
pPlanNode
->
pSchema
,
SSchema
*
);
SWAP
(
dataBlockSchema
->
pSchema
,
pPlanNode
->
pSchema
,
SSchema
*
);
...
@@ -216,11 +225,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
...
@@ -216,11 +225,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
opNameToOpType
(
const
char
*
name
)
{
int32_t
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SArray
*
eps
)
{
for
(
int32_t
i
=
1
;
i
<
sizeof
(
gOpName
)
/
sizeof
(
gOpName
[
0
]);
++
i
)
{
//todo
if
(
strcmp
(
name
,
gOpName
[
i
]))
{
return
i
;
}
}
return
OP_Unknown
;
}
}
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
4681611e
...
@@ -463,6 +463,191 @@ static bool exprInfoFromJson(const cJSON* json, void* obj) {
...
@@ -463,6 +463,191 @@ static bool exprInfoFromJson(const cJSON* json, void* obj) {
return
res
;
return
res
;
}
}
static
const
char
*
jkTimeWindowStartKey
=
"StartKey"
;
static
const
char
*
jkTimeWindowEndKey
=
"EndKey"
;
static
bool
timeWindowToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
STimeWindow
*
win
=
(
const
STimeWindow
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkTimeWindowStartKey
,
win
->
skey
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkTimeWindowEndKey
,
win
->
ekey
);
}
return
res
;
}
static
bool
timeWindowFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
STimeWindow
*
win
=
(
STimeWindow
*
)
obj
;
win
->
skey
=
getNumber
(
json
,
jkTimeWindowStartKey
);
win
->
ekey
=
getNumber
(
json
,
jkTimeWindowEndKey
);
return
true
;
}
static
const
char
*
jkScanNodeTableId
=
"TableId"
;
static
const
char
*
jkScanNodeTableType
=
"TableType"
;
static
bool
scanNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SScanPhyNode
*
scan
=
(
const
SScanPhyNode
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableId
,
scan
->
uid
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableType
,
scan
->
tableType
);
}
return
res
;
}
static
bool
scanNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SScanPhyNode
*
scan
=
(
SScanPhyNode
*
)
obj
;
scan
->
uid
=
getNumber
(
json
,
jkScanNodeTableId
);
scan
->
tableType
=
getNumber
(
json
,
jkScanNodeTableType
);
return
true
;
}
static
const
char
*
jkTableScanNodeFlag
=
"Flag"
;
static
const
char
*
jkTableScanNodeWindow
=
"Window"
;
static
const
char
*
jkTableScanNodeTagsConditions
=
"TagsConditions"
;
static
bool
tableScanNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
STableScanPhyNode
*
scan
=
(
const
STableScanPhyNode
*
)
obj
;
bool
res
=
scanNodeToJson
(
obj
,
json
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkTableScanNodeFlag
,
scan
->
scanFlag
);
}
if
(
res
)
{
res
=
addObject
(
json
,
jkTableScanNodeWindow
,
timeWindowToJson
,
&
scan
->
window
);
}
if
(
res
)
{
res
=
addArray
(
json
,
jkTableScanNodeTagsConditions
,
exprInfoToJson
,
scan
->
pTagsConditions
);
}
return
res
;
}
static
bool
tableScanNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
STableScanPhyNode
*
scan
=
(
STableScanPhyNode
*
)
obj
;
bool
res
=
scanNodeFromJson
(
json
,
obj
);
if
(
res
)
{
scan
->
scanFlag
=
getNumber
(
json
,
jkTableScanNodeFlag
);
}
if
(
res
)
{
res
=
fromObject
(
json
,
jkTableScanNodeWindow
,
timeWindowFromJson
,
&
scan
->
window
,
true
);
}
if
(
res
)
{
res
=
fromArray
(
json
,
jkTableScanNodeTagsConditions
,
exprInfoFromJson
,
&
scan
->
pTagsConditions
,
sizeof
(
SExprInfo
));
}
return
res
;
}
static
const
char
*
jkEpAddrFqdn
=
"Fqdn"
;
static
const
char
*
jkEpAddrPort
=
"Port"
;
static
bool
epAddrToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SEpAddrMsg
*
ep
=
(
const
SEpAddrMsg
*
)
obj
;
bool
res
=
cJSON_AddStringToObject
(
json
,
jkEpAddrFqdn
,
ep
->
fqdn
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkEpAddrPort
,
ep
->
port
);
}
return
res
;
}
static
bool
epAddrFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SEpAddrMsg
*
ep
=
(
SEpAddrMsg
*
)
obj
;
copyString
(
json
,
jkEpAddrFqdn
,
ep
->
fqdn
);
ep
->
port
=
getNumber
(
json
,
jkEpAddrPort
);
return
true
;
}
static
const
char
*
jkExchangeNodeSrcTemplateId
=
"SrcTemplateId"
;
static
const
char
*
jkExchangeNodeSrcEndPoints
=
"SrcEndPoints"
;
static
bool
exchangeNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SExchangePhyNode
*
exchange
=
(
const
SExchangePhyNode
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkExchangeNodeSrcTemplateId
,
exchange
->
srcTemplateId
);
if
(
res
)
{
res
=
addArray
(
json
,
jkExchangeNodeSrcEndPoints
,
epAddrToJson
,
exchange
->
pSrcEndPoints
);
}
return
res
;
}
static
bool
exchangeNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SExchangePhyNode
*
exchange
=
(
SExchangePhyNode
*
)
obj
;
exchange
->
srcTemplateId
=
getNumber
(
json
,
jkExchangeNodeSrcTemplateId
);
return
fromArray
(
json
,
jkExchangeNodeSrcEndPoints
,
epAddrFromJson
,
&
exchange
->
pSrcEndPoints
,
sizeof
(
SEpAddrMsg
));
}
static
bool
specificPhyNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SPhyNode
*
phyNode
=
(
const
SPhyNode
*
)
obj
;
switch
(
phyNode
->
info
.
type
)
{
case
OP_TableScan
:
case
OP_DataBlocksOptScan
:
case
OP_TableSeqScan
:
return
tableScanNodeToJson
(
obj
,
json
);
case
OP_TagScan
:
case
OP_SystemTableScan
:
return
scanNodeToJson
(
obj
,
json
);
case
OP_Aggregate
:
break
;
// todo
case
OP_Project
:
return
true
;
case
OP_Groupby
:
case
OP_Limit
:
case
OP_SLimit
:
case
OP_TimeWindow
:
case
OP_SessionWindow
:
case
OP_StateWindow
:
case
OP_Fill
:
case
OP_MultiTableAggregate
:
case
OP_MultiTableTimeInterval
:
case
OP_Filter
:
case
OP_Distinct
:
case
OP_Join
:
case
OP_AllTimeWindow
:
case
OP_AllMultiTableTimeInterval
:
case
OP_Order
:
break
;
// todo
case
OP_Exchange
:
return
exchangeNodeToJson
(
obj
,
json
);
default:
break
;
}
return
false
;
}
static
bool
specificPhyNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SPhyNode
*
phyNode
=
(
SPhyNode
*
)
obj
;
switch
(
phyNode
->
info
.
type
)
{
case
OP_TableScan
:
case
OP_DataBlocksOptScan
:
case
OP_TableSeqScan
:
return
tableScanNodeFromJson
(
json
,
obj
);
case
OP_TagScan
:
case
OP_SystemTableScan
:
return
scanNodeFromJson
(
json
,
obj
);
case
OP_Aggregate
:
break
;
// todo
case
OP_Project
:
return
true
;
case
OP_Groupby
:
case
OP_Limit
:
case
OP_SLimit
:
case
OP_TimeWindow
:
case
OP_SessionWindow
:
case
OP_StateWindow
:
case
OP_Fill
:
case
OP_MultiTableAggregate
:
case
OP_MultiTableTimeInterval
:
case
OP_Filter
:
case
OP_Distinct
:
case
OP_Join
:
case
OP_AllTimeWindow
:
case
OP_AllMultiTableTimeInterval
:
case
OP_Order
:
break
;
// todo
case
OP_Exchange
:
return
exchangeNodeFromJson
(
json
,
obj
);
default:
break
;
}
return
false
;
}
static
const
char
*
jkPnodeName
=
"Name"
;
static
const
char
*
jkPnodeName
=
"Name"
;
static
const
char
*
jkPnodeTargets
=
"Targets"
;
static
const
char
*
jkPnodeTargets
=
"Targets"
;
static
const
char
*
jkPnodeConditions
=
"Conditions"
;
static
const
char
*
jkPnodeConditions
=
"Conditions"
;
...
@@ -484,6 +669,9 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
...
@@ -484,6 +669,9 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
if
(
res
)
{
if
(
res
)
{
res
=
addArray
(
jNode
,
jkPnodeChildren
,
phyNodeToJson
,
phyNode
->
pChildren
);
res
=
addArray
(
jNode
,
jkPnodeChildren
,
phyNodeToJson
,
phyNode
->
pChildren
);
}
}
if
(
res
)
{
res
=
addObject
(
jNode
,
phyNode
->
info
.
name
,
specificPhyNodeToJson
,
phyNode
);
}
return
res
;
return
res
;
}
}
...
@@ -501,6 +689,9 @@ static bool phyNodeFromJson(const cJSON* json, void* obj) {
...
@@ -501,6 +689,9 @@ static bool phyNodeFromJson(const cJSON* json, void* obj) {
if
(
res
)
{
if
(
res
)
{
res
=
fromArray
(
json
,
jkPnodeChildren
,
phyNodeFromJson
,
&
node
->
pChildren
,
sizeof
(
SSlotSchema
));
res
=
fromArray
(
json
,
jkPnodeChildren
,
phyNodeFromJson
,
&
node
->
pChildren
,
sizeof
(
SSlotSchema
));
}
}
if
(
res
)
{
res
=
fromObject
(
json
,
node
->
info
.
name
,
specificPhyNodeFromJson
,
node
,
true
);
}
return
res
;
return
res
;
}
}
...
...
source/libs/planner/src/planner.c
浏览文件 @
4681611e
...
@@ -46,6 +46,10 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
...
@@ -46,6 +46,10 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SArray
*
eps
)
{
return
setSubplanExecutionNode
(
subplan
,
templateId
,
eps
);
}
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
)
{
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
)
{
return
subPlanToString
(
subplan
,
str
);
return
subPlanToString
(
subplan
,
str
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录