Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7ae87c10
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7ae87c10
编写于
3月 02, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add UT
上级
6a6f31c4
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
153 addition
and
44 deletion
+153
-44
source/libs/index/inc/index_comm.h
source/libs/index/inc/index_comm.h
+32
-0
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+3
-32
source/libs/index/src/index_comm.c
source/libs/index/src/index_comm.c
+48
-0
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+12
-1
source/libs/index/test/fstUT.cc
source/libs/index/test/fstUT.cc
+16
-0
source/libs/index/test/jsonDemo.cc
source/libs/index/test/jsonDemo.cc
+0
-0
source/libs/index/test/jsonUT.cc
source/libs/index/test/jsonUT.cc
+30
-1
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+12
-10
未找到文件。
source/libs/index/inc/index_comm.h
0 → 100644
浏览文件 @
7ae87c10
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_INDEX_COMM_H_
#define _TD_INDEX_COMM_H_
#ifdef __cplusplus
extern
"C"
{
#endif
extern
char
JSON_COLUMN
[];
extern
char
JSON_VALUE_DELIM
;
char
*
indexPackJsonData
(
SIndexTerm
*
itm
);
#ifdef __cplusplus
}
#endif
#endif
source/libs/index/src/index_cache.c
浏览文件 @
7ae87c10
...
...
@@ -14,6 +14,7 @@
*/
#include "index_cache.h"
#include "index_comm.h"
#include "index_util.h"
#include "tcompare.h"
#include "tsched.h"
...
...
@@ -24,9 +25,6 @@
#define MEM_THRESHOLD 1024 * 1024
#define MEM_ESTIMATE_RADIO 1.5
static
char
JSON_COLUMN
[]
=
"JSON"
;
static
char
JSON_VALUE_DELIM
=
'&'
;
static
void
indexMemRef
(
MemTable
*
tbl
);
static
void
indexMemUnRef
(
MemTable
*
tbl
);
...
...
@@ -211,33 +209,6 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
}
}
}
static
char
*
indexCachePackJsonData
(
SIndexTerm
*
itm
)
{
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t
ty
=
INDEX_TYPE_GET_TYPE
(
itm
->
colType
);
int32_t
sz
=
itm
->
nColName
+
itm
->
nColVal
+
sizeof
(
uint8_t
)
+
sizeof
(
JSON_VALUE_DELIM
)
*
2
+
1
;
char
*
buf
=
(
char
*
)
calloc
(
1
,
sz
);
char
*
p
=
buf
;
memcpy
(
p
,
itm
->
colName
,
itm
->
nColName
);
p
+=
itm
->
nColName
;
memcpy
(
p
,
&
JSON_VALUE_DELIM
,
sizeof
(
JSON_VALUE_DELIM
));
p
+=
sizeof
(
JSON_VALUE_DELIM
);
memcpy
(
p
,
&
ty
,
sizeof
(
ty
));
p
+=
sizeof
(
ty
);
memcpy
(
p
,
&
JSON_VALUE_DELIM
,
sizeof
(
JSON_VALUE_DELIM
));
p
+=
sizeof
(
JSON_VALUE_DELIM
);
memcpy
(
p
,
itm
->
colVal
,
itm
->
nColVal
);
return
buf
;
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
...
...
@@ -254,7 +225,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
// set up key
ct
->
colType
=
term
->
colType
;
if
(
hasJson
)
{
ct
->
colVal
=
index
Cache
PackJsonData
(
term
);
ct
->
colVal
=
indexPackJsonData
(
term
);
}
else
{
ct
->
colVal
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
(
term
->
nColVal
+
1
));
memcpy
(
ct
->
colVal
,
term
->
colVal
,
term
->
nColVal
);
...
...
@@ -333,7 +304,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
);
char
*
p
=
term
->
colVal
;
if
(
hasJson
)
{
p
=
index
Cache
PackJsonData
(
term
);
p
=
indexPackJsonData
(
term
);
}
CacheTerm
ct
=
{.
colVal
=
p
,
.
version
=
atomic_load_32
(
&
pCache
->
version
)};
...
...
source/libs/index/src/index_comm.c
0 → 100644
浏览文件 @
7ae87c10
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "index.h"
#include "indexInt.h"
char
JSON_COLUMN
[]
=
"JSON"
;
char
JSON_VALUE_DELIM
=
'&'
;
char
*
indexPackJsonData
(
SIndexTerm
*
itm
)
{
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t
ty
=
INDEX_TYPE_GET_TYPE
(
itm
->
colType
);
int32_t
sz
=
itm
->
nColName
+
itm
->
nColVal
+
sizeof
(
uint8_t
)
+
sizeof
(
JSON_VALUE_DELIM
)
*
2
+
1
;
char
*
buf
=
(
char
*
)
calloc
(
1
,
sz
);
char
*
p
=
buf
;
memcpy
(
p
,
itm
->
colName
,
itm
->
nColName
);
p
+=
itm
->
nColName
;
memcpy
(
p
,
&
JSON_VALUE_DELIM
,
sizeof
(
JSON_VALUE_DELIM
));
p
+=
sizeof
(
JSON_VALUE_DELIM
);
memcpy
(
p
,
&
ty
,
sizeof
(
ty
));
p
+=
sizeof
(
ty
);
memcpy
(
p
,
&
JSON_VALUE_DELIM
,
sizeof
(
JSON_VALUE_DELIM
));
p
+=
sizeof
(
JSON_VALUE_DELIM
);
memcpy
(
p
,
itm
->
colVal
,
itm
->
nColVal
);
return
buf
;
}
source/libs/index/src/index_tfile.c
浏览文件 @
7ae87c10
...
...
@@ -15,6 +15,7 @@ p *
#include "index_tfile.h"
#include "index.h"
#include "index_comm.h"
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_util.h"
...
...
@@ -186,13 +187,20 @@ void tfileReaderDestroy(TFileReader* reader) {
int
tfileReaderSearch
(
TFileReader
*
reader
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
SIndexTerm
*
term
=
query
->
term
;
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
);
EIndexQueryType
qtype
=
query
->
qType
;
int
ret
=
-
1
;
// refactor to callback later
if
(
qtype
==
QUERY_TERM
)
{
uint64_t
offset
;
FstSlice
key
=
fstSliceCreate
(
term
->
colVal
,
term
->
nColVal
);
char
*
p
=
term
->
colVal
;
uint64_t
sz
=
term
->
nColVal
;
if
(
hasJson
)
{
p
=
indexPackJsonData
(
term
);
sz
=
strlen
(
p
);
}
FstSlice
key
=
fstSliceCreate
(
p
,
sz
);
if
(
fstGet
(
reader
->
fst
,
&
key
,
&
offset
))
{
indexInfo
(
"index: %"
PRIu64
", col: %s, colVal: %s, found table info in tindex"
,
term
->
suid
,
term
->
colName
,
term
->
colVal
);
...
...
@@ -202,6 +210,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
term
->
colVal
);
}
fstSliceDestroy
(
&
key
);
if
(
hasJson
)
{
free
(
p
);
}
}
else
if
(
qtype
==
QUERY_PREFIX
)
{
// handle later
//
...
...
source/libs/index/test/fstUT.cc
浏览文件 @
7ae87c10
...
...
@@ -238,3 +238,19 @@ TEST_F(FstEnv, writeNormal) {
assert
(
fst
->
Search
(
ctx
,
rlt
)
==
true
);
}
TEST_F
(
FstEnv
,
WriteMillonrRecord
)
{}
TEST_F
(
FstEnv
,
writeAbNormal
)
{
fst
->
CreateWriter
();
std
::
string
str1
(
"voltage&
\b
&ab"
);
std
::
string
str2
(
"voltbge&
\b
&ab"
);
fst
->
Put
(
str1
,
1
);
fst
->
Put
(
str2
,
2
);
fst
->
DestroyWriter
();
fst
->
CreateReader
();
uint64_t
val
;
assert
(
fst
->
Get
(
"1"
,
&
val
)
==
false
);
assert
(
fst
->
Get
(
"voltage&
\b
&ab"
,
&
val
)
==
true
);
assert
(
val
==
1
);
}
source/libs/index/test/jsonDemo.cc
已删除
100644 → 0
浏览文件 @
6a6f31c4
source/libs/index/test/jsonUT.cc
浏览文件 @
7ae87c10
...
...
@@ -21,6 +21,8 @@ class JsonEnv : public ::testing::Test {
protected:
virtual
void
SetUp
()
{
taosRemoveDir
(
dir
.
c_str
());
taosMkDir
(
dir
.
c_str
());
opts
=
indexOptsCreate
();
int
ret
=
tIndexJsonOpen
(
opts
,
dir
.
c_str
(),
&
index
);
assert
(
ret
==
0
);
...
...
@@ -87,6 +89,33 @@ TEST_F(JsonEnv, testWrite) {
assert
(
100
==
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
}
TEST_F
(
JsonEnv
,
testWriteMillonData
)
{
{
std
::
string
colName
(
"voltagefdadfa"
);
std
::
string
colVal
(
"abxxxxxxxxxxxx"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
// SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
1000000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"voltage"
);
std
::
string
colVal
(
"ab"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
tIndexJsonSearch
(
index
,
mq
,
result
);
assert
(
100
==
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
}
source/libs/transport/src/transSrv.c
浏览文件 @
7ae87c10
...
...
@@ -286,15 +286,17 @@ void uvOnWriteCb(uv_write_t* req, int status) {
transClearBuffer
(
&
conn
->
readBuf
);
if
(
status
==
0
)
{
tTrace
(
"server conn %p data already was written on stream"
,
conn
);
assert
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>=
1
);
SSrvMsg
*
msg
=
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
taosArrayRemove
(
conn
->
srvMsgs
,
0
);
destroySmsg
(
msg
);
// send second data, just use for push
if
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>
0
)
{
msg
=
(
SSrvMsg
*
)
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
uvStartSendRespInternal
(
msg
);
if
(
conn
->
srvMsgs
!=
NULL
)
{
assert
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>=
1
);
SSrvMsg
*
msg
=
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
taosArrayRemove
(
conn
->
srvMsgs
,
0
);
destroySmsg
(
msg
);
// send second data, just use for push
if
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>
0
)
{
msg
=
(
SSrvMsg
*
)
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
uvStartSendRespInternal
(
msg
);
}
}
}
else
{
tError
(
"server conn %p failed to write data, %s"
,
conn
,
uv_err_name
(
status
));
...
...
@@ -615,7 +617,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
SSrvMsg
*
msg
=
taosArrayGetP
(
conn
->
srvMsgs
,
i
);
destroySmsg
(
msg
);
}
taosArrayDestroy
(
conn
->
srvMsgs
);
conn
->
srvMsgs
=
taosArrayDestroy
(
conn
->
srvMsgs
);
QUEUE_REMOVE
(
&
conn
->
queue
);
if
(
clear
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录