Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Iotdb
提交
f061b284
I
Iotdb
项目概览
apache
/
Iotdb
10 个月 前同步成功
通知
25
Star
3344
Fork
916
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
I
Iotdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
f061b284
编写于
1月 06, 2022
作者:
L
liuminghui233
提交者:
GitHub
1月 06, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[IOTDB-1802] Add aligned timeseries APIs for C++ client (#4711)
上级
fe5d5915
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
552 addition
and
23 deletion
+552
-23
client-cpp/src/main/Session.cpp
client-cpp/src/main/Session.cpp
+254
-11
client-cpp/src/main/Session.h
client-cpp/src/main/Session.h
+50
-3
client-cpp/src/test/CMakeLists.txt
client-cpp/src/test/CMakeLists.txt
+1
-1
example/client-cpp-example/pom.xml
example/client-cpp-example/pom.xml
+4
-0
example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp
...lient-cpp-example/src/AlignedTimeseriesSessionExample.cpp
+217
-0
example/client-cpp-example/src/CMakeLists.txt
example/client-cpp-example/src/CMakeLists.txt
+4
-0
example/client-cpp-example/src/SessionExample.cpp
example/client-cpp-example/src/SessionExample.cpp
+22
-8
未找到文件。
client-cpp/src/main/Session.cpp
浏览文件 @
f061b284
...
...
@@ -152,6 +152,10 @@ int Tablet::getValueByteSize() {
return
valueOccupation
;
}
void
Tablet
::
setAligned
(
bool
isAligned
)
{
this
->
isAligned
=
isAligned
;
}
string
SessionUtils
::
getTime
(
const
Tablet
&
tablet
)
{
MyStringBuffer
timeBuffer
;
for
(
int
i
=
0
;
i
<
tablet
.
rowSize
;
i
++
)
{
...
...
@@ -574,6 +578,7 @@ void Session::insertRecord(const string &deviceId, int64_t time,
req
.
__set_timestamp
(
time
);
req
.
__set_measurements
(
measurements
);
req
.
__set_values
(
values
);
req
.
__set_isAligned
(
false
);
TSStatus
respStatus
;
try
{
client
->
insertStringRecord
(
respStatus
,
req
);
...
...
@@ -596,6 +601,49 @@ void Session::insertRecord(const string &prefixPath, int64_t time,
string
buffer
;
putValuesIntoBuffer
(
types
,
values
,
buffer
);
req
.
__set_values
(
buffer
);
req
.
__set_isAligned
(
false
);
TSStatus
respStatus
;
try
{
client
->
insertRecord
(
respStatus
,
req
);
RpcUtils
::
verifySuccess
(
respStatus
);
}
catch
(
IoTDBConnectionException
&
e
)
{
throw
IoTDBConnectionException
(
e
.
what
());
}
}
void
Session
::
insertAlignedRecord
(
const
string
&
deviceId
,
int64_t
time
,
const
vector
<
string
>
&
measurements
,
const
vector
<
string
>
&
values
)
{
TSInsertStringRecordReq
req
;
req
.
__set_sessionId
(
sessionId
);
req
.
__set_prefixPath
(
deviceId
);
req
.
__set_timestamp
(
time
);
req
.
__set_measurements
(
measurements
);
req
.
__set_values
(
values
);
req
.
__set_isAligned
(
true
);
TSStatus
respStatus
;
try
{
client
->
insertStringRecord
(
respStatus
,
req
);
RpcUtils
::
verifySuccess
(
respStatus
);
}
catch
(
IoTDBConnectionException
&
e
)
{
throw
IoTDBConnectionException
(
e
.
what
());
}
}
void
Session
::
insertAlignedRecord
(
const
string
&
prefixPath
,
int64_t
time
,
const
vector
<
string
>
&
measurements
,
const
vector
<
TSDataType
::
TSDataType
>
&
types
,
const
vector
<
char
*>
&
values
)
{
TSInsertRecordReq
req
;
req
.
__set_sessionId
(
sessionId
);
req
.
__set_prefixPath
(
prefixPath
);
req
.
__set_timestamp
(
time
);
req
.
__set_measurements
(
measurements
);
string
buffer
;
putValuesIntoBuffer
(
types
,
values
,
buffer
);
req
.
__set_values
(
buffer
);
req
.
__set_isAligned
(
true
);
TSStatus
respStatus
;
try
{
client
->
insertRecord
(
respStatus
,
req
);
...
...
@@ -620,6 +668,7 @@ void Session::insertRecords(const vector <string> &deviceIds,
request
.
__set_timestamps
(
times
);
request
.
__set_measurementsList
(
measurementsList
);
request
.
__set_valuesList
(
valuesList
);
request
.
__set_isAligned
(
false
);
try
{
TSStatus
respStatus
;
...
...
@@ -653,6 +702,67 @@ void Session::insertRecords(const vector <string> &deviceIds,
bufferList
.
push_back
(
buffer
);
}
request
.
__set_valuesList
(
bufferList
);
request
.
__set_isAligned
(
false
);
try
{
TSStatus
respStatus
;
client
->
insertRecords
(
respStatus
,
request
);
RpcUtils
::
verifySuccess
(
respStatus
);
}
catch
(
IoTDBConnectionException
&
e
)
{
throw
IoTDBConnectionException
(
e
.
what
());
}
}
void
Session
::
insertAlignedRecords
(
const
vector
<
string
>
&
deviceIds
,
const
vector
<
int64_t
>
&
times
,
const
vector
<
vector
<
string
>>
&
measurementsList
,
const
vector
<
vector
<
string
>>
&
valuesList
)
{
size_t
len
=
deviceIds
.
size
();
if
(
len
!=
times
.
size
()
||
len
!=
measurementsList
.
size
()
||
len
!=
valuesList
.
size
())
{
logic_error
e
(
"deviceIds, times, measurementsList and valuesList's size should be equal"
);
throw
exception
(
e
);
}
TSInsertStringRecordsReq
request
;
request
.
__set_sessionId
(
sessionId
);
request
.
__set_prefixPaths
(
deviceIds
);
request
.
__set_timestamps
(
times
);
request
.
__set_measurementsList
(
measurementsList
);
request
.
__set_valuesList
(
valuesList
);
request
.
__set_isAligned
(
true
);
try
{
TSStatus
respStatus
;
client
->
insertStringRecords
(
respStatus
,
request
);
RpcUtils
::
verifySuccess
(
respStatus
);
}
catch
(
IoTDBConnectionException
&
e
)
{
throw
IoTDBConnectionException
(
e
.
what
());
}
}
void
Session
::
insertAlignedRecords
(
const
vector
<
string
>
&
deviceIds
,
const
vector
<
int64_t
>
&
times
,
const
vector
<
vector
<
string
>>
&
measurementsList
,
const
vector
<
vector
<
TSDataType
::
TSDataType
>>
&
typesList
,
const
vector
<
vector
<
char
*>>
&
valuesList
)
{
size_t
len
=
deviceIds
.
size
();
if
(
len
!=
times
.
size
()
||
len
!=
measurementsList
.
size
()
||
len
!=
valuesList
.
size
())
{
logic_error
e
(
"deviceIds, times, measurementsList and valuesList's size should be equal"
);
throw
exception
(
e
);
}
TSInsertRecordsReq
request
;
request
.
__set_sessionId
(
sessionId
);
request
.
__set_prefixPaths
(
deviceIds
);
request
.
__set_timestamps
(
times
);
request
.
__set_measurementsList
(
measurementsList
);
vector
<
string
>
bufferList
;
for
(
size_t
i
=
0
;
i
<
valuesList
.
size
();
i
++
)
{
string
buffer
;
putValuesIntoBuffer
(
typesList
[
i
],
valuesList
[
i
],
buffer
);
bufferList
.
push_back
(
buffer
);
}
request
.
__set_valuesList
(
bufferList
);
request
.
__set_isAligned
(
true
);
try
{
TSStatus
respStatus
;
...
...
@@ -707,6 +817,62 @@ void Session::insertRecordsOfOneDevice(const string &deviceId,
bufferList
.
push_back
(
buffer
);
}
request
.
__set_valuesList
(
bufferList
);
request
.
__set_isAligned
(
false
);
try
{
TSStatus
respStatus
;
client
->
insertRecordsOfOneDevice
(
respStatus
,
request
);
RpcUtils
::
verifySuccess
(
respStatus
);
}
catch
(
const
exception
&
e
)
{
throw
IoTDBConnectionException
(
e
.
what
());
}
}
void
Session
::
insertAlignedRecordsOfOneDevice
(
const
string
&
deviceId
,
vector
<
int64_t
>
&
times
,
vector
<
vector
<
string
>>
&
measurementsList
,
vector
<
vector
<
TSDataType
::
TSDataType
>>
&
typesList
,
vector
<
vector
<
char
*>>
&
valuesList
)
{
insertAlignedRecordsOfOneDevice
(
deviceId
,
times
,
measurementsList
,
typesList
,
valuesList
,
false
);
}
void
Session
::
insertAlignedRecordsOfOneDevice
(
const
string
&
deviceId
,
vector
<
int64_t
>
&
times
,
vector
<
vector
<
string
>>
&
measurementsList
,
vector
<
vector
<
TSDataType
::
TSDataType
>>
&
typesList
,
vector
<
vector
<
char
*>>
&
valuesList
,
bool
sorted
)
{
if
(
sorted
)
{
if
(
!
checkSorted
(
times
))
{
throw
BatchExecutionException
(
"Times in InsertOneDeviceRecords are not in ascending order"
);
}
}
else
{
int
*
index
=
new
int
[
times
.
size
()];
for
(
size_t
i
=
0
;
i
<
times
.
size
();
i
++
)
{
index
[
i
]
=
i
;
}
this
->
sortIndexByTimestamp
(
index
,
times
,
times
.
size
());
times
=
sortList
(
times
,
index
,
times
.
size
());
measurementsList
=
sortList
(
measurementsList
,
index
,
times
.
size
());
typesList
=
sortList
(
typesList
,
index
,
times
.
size
());
valuesList
=
sortList
(
valuesList
,
index
,
times
.
size
());
delete
[]
index
;
}
TSInsertRecordsOfOneDeviceReq
request
;
request
.
__set_sessionId
(
sessionId
);
request
.
__set_prefixPath
(
deviceId
);
request
.
__set_timestamps
(
times
);
request
.
__set_measurementsList
(
measurementsList
);
vector
<
string
>
bufferList
;
for
(
size_t
i
=
0
;
i
<
valuesList
.
size
();
i
++
)
{
string
buffer
;
putValuesIntoBuffer
(
typesList
[
i
],
valuesList
[
i
],
buffer
);
bufferList
.
push_back
(
buffer
);
}
request
.
__set_valuesList
(
bufferList
);
request
.
__set_isAligned
(
true
);
try
{
TSStatus
respStatus
;
...
...
@@ -746,6 +912,7 @@ void Session::insertTablet(Tablet &tablet, bool sorted) {
request
.
__set_timestamps
(
SessionUtils
::
getTime
(
tablet
));
request
.
__set_values
(
SessionUtils
::
getValue
(
tablet
));
request
.
__set_size
(
tablet
.
rowSize
);
request
.
__set_isAligned
(
tablet
.
isAligned
);
try
{
TSStatus
respStatus
;
...
...
@@ -757,6 +924,21 @@ void Session::insertTablet(Tablet &tablet, bool sorted) {
}
}
void
Session
::
insertAlignedTablet
(
Tablet
&
tablet
)
{
insertAlignedTablet
(
tablet
,
false
);
}
void
Session
::
insertAlignedTablet
(
Tablet
&
tablet
,
bool
sorted
)
{
tablet
.
setAligned
(
true
);
try
{
insertTablet
(
tablet
,
sorted
);
}
catch
(
const
exception
&
e
)
{
logic_error
error
(
e
.
what
());
throw
exception
(
error
);
}
}
void
Session
::
insertTablets
(
map
<
string
,
Tablet
*>
&
tablets
)
{
try
{
insertTablets
(
tablets
,
false
);
...
...
@@ -770,8 +952,15 @@ void Session::insertTablets(map<string, Tablet *> &tablets) {
void
Session
::
insertTablets
(
map
<
string
,
Tablet
*>
&
tablets
,
bool
sorted
)
{
TSInsertTabletsReq
request
;
request
.
__set_sessionId
(
sessionId
);
if
(
tablets
.
empty
())
{
throw
BatchExecutionException
(
"No tablet is inserting!"
);
}
auto
beginIter
=
tablets
.
begin
();
bool
isFirstTabletAligned
=
((
*
beginIter
).
second
)
->
isAligned
;
for
(
const
auto
&
item
:
tablets
)
{
if
(
isFirstTabletAligned
!=
item
.
second
->
isAligned
)
{
throw
BatchExecutionException
(
"The tablets should be all aligned or non-aligned!"
);
}
if
(
sorted
)
{
if
(
!
checkSorted
(
*
(
item
.
second
)))
{
throw
BatchExecutionException
(
"Times in Tablet are not in ascending order"
);
...
...
@@ -779,7 +968,6 @@ void Session::insertTablets(map<string, Tablet *> &tablets, bool sorted) {
}
else
{
sortTablet
(
*
(
item
.
second
));
}
request
.
prefixPaths
.
push_back
(
item
.
second
->
deviceId
);
vector
<
string
>
measurements
;
vector
<
int
>
dataTypes
;
...
...
@@ -792,15 +980,32 @@ void Session::insertTablets(map<string, Tablet *> &tablets, bool sorted) {
request
.
timestampsList
.
push_back
(
SessionUtils
::
getTime
(
*
(
item
.
second
)));
request
.
valuesList
.
push_back
(
SessionUtils
::
getValue
(
*
(
item
.
second
)));
request
.
sizeList
.
push_back
(
item
.
second
->
rowSize
);
}
request
.
__set_isAligned
(
isFirstTabletAligned
);
try
{
TSStatus
respStatus
;
client
->
insertTablets
(
respStatus
,
request
);
RpcUtils
::
verifySuccess
(
respStatus
);
}
catch
(
IoTDBConnectionException
&
e
)
{
throw
IoTDBConnectionException
(
e
.
what
());
}
}
try
{
TSStatus
respStatus
;
client
->
insertTablets
(
respStatus
,
request
);
RpcUtils
::
verifySuccess
(
respStatus
);
}
catch
(
const
exception
&
e
)
{
throw
IoTDBConnectionException
(
e
.
what
());
}
void
Session
::
insertAlignedTablets
(
map
<
string
,
Tablet
*>
&
tablets
)
{
insertAlignedTablets
(
tablets
,
false
);
}
void
Session
::
insertAlignedTablets
(
map
<
string
,
Tablet
*>
&
tablets
,
bool
sorted
)
{
for
(
map
<
string
,
Tablet
*>::
iterator
iter
=
tablets
.
begin
();
iter
!=
tablets
.
end
();
iter
++
)
{
iter
->
second
->
setAligned
(
true
);
}
try
{
insertTablets
(
tablets
,
sorted
);
}
catch
(
const
exception
&
e
)
{
logic_error
error
(
e
.
what
());
throw
exception
(
error
);
}
}
...
...
@@ -840,7 +1045,7 @@ void Session::testInsertTablet(const Tablet &tablet) {
RpcUtils
::
verifySuccess
(
*
resp
);
}
catch
(
IoTDBConnectionException
&
e
)
{
throw
new
IoTDBConnectionException
(
e
.
what
());
throw
IoTDBConnectionException
(
e
.
what
());
}
}
...
...
@@ -1041,6 +1246,44 @@ void Session::createMultiTimeseries(const vector <string> &paths,
}
}
void
Session
::
createAlignedTimeseries
(
const
std
::
string
&
deviceId
,
const
std
::
vector
<
std
::
string
>
&
measurements
,
const
std
::
vector
<
TSDataType
::
TSDataType
>
&
dataTypes
,
const
std
::
vector
<
TSEncoding
::
TSEncoding
>
&
encodings
,
const
std
::
vector
<
CompressionType
::
CompressionType
>
&
compressors
)
{
shared_ptr
<
TSCreateAlignedTimeseriesReq
>
request
(
new
TSCreateAlignedTimeseriesReq
());
request
->
__set_sessionId
(
sessionId
);
request
->
__set_prefixPath
(
deviceId
);
request
->
__set_measurements
(
measurements
);
vector
<
int
>
dataTypesOrdinal
;
for
(
TSDataType
::
TSDataType
dataType
:
dataTypes
)
{
dataTypesOrdinal
.
push_back
(
dataType
);
}
request
->
__set_dataTypes
(
dataTypesOrdinal
);
vector
<
int
>
encodingsOrdinal
;
for
(
TSEncoding
::
TSEncoding
encoding
:
encodings
)
{
encodingsOrdinal
.
push_back
(
encoding
);
}
request
->
__set_encodings
(
encodingsOrdinal
);
vector
<
int
>
compressorsOrdinal
;
for
(
CompressionType
::
CompressionType
compressor
:
compressors
)
{
compressorsOrdinal
.
push_back
(
compressor
);
}
request
->
__set_compressors
(
compressorsOrdinal
);
try
{
shared_ptr
<
TSStatus
>
resp
(
new
TSStatus
());
client
->
createAlignedTimeseries
(
*
resp
,
*
request
);
RpcUtils
::
verifySuccess
(
*
resp
);
}
catch
(
IoTDBConnectionException
&
e
)
{
throw
IoTDBConnectionException
(
e
.
what
());
}
}
bool
Session
::
checkTimeseriesExists
(
const
string
&
path
)
{
try
{
std
::
unique_ptr
<
SessionDataSet
>
dataset
=
executeQueryStatement
(
"SHOW TIMESERIES "
+
path
);
...
...
client-cpp/src/main/Session.h
浏览文件 @
f061b284
...
...
@@ -372,6 +372,7 @@ public:
std
::
vector
<
std
::
vector
<
std
::
string
>>
values
;
int
rowSize
;
//the number of rows to include in this tablet
int
maxRowNumber
;
// the maximum number of rows for this tablet
bool
isAligned
;
// whether this tablet store data of aligned timeseries or not
Tablet
()
{}
...
...
@@ -397,7 +398,7 @@ public:
* @param maxRowNumber the maximum number of rows for this tablet
*/
Tablet
(
const
std
::
string
&
deviceId
,
const
std
::
vector
<
std
::
pair
<
std
::
string
,
TSDataType
::
TSDataType
>>
&
schemas
,
int
maxRowNumber
)
:
deviceId
(
deviceId
),
schemas
(
schemas
),
maxRowNumber
(
maxRowNumber
){
int
maxRowNumber
,
bool
isAligned_
=
false
)
:
deviceId
(
deviceId
),
schemas
(
schemas
),
maxRowNumber
(
maxRowNumber
),
isAligned
(
isAligned_
){
// create timestamp column
timestamps
.
resize
(
maxRowNumber
);
// create value columns
...
...
@@ -415,6 +416,8 @@ public:
int
getTimeBytesSize
();
int
getValueByteSize
();
// total byte size that values occupies
void
setAligned
(
bool
isAligned
);
};
class
SessionUtils
{
...
...
@@ -539,9 +542,9 @@ public:
this
->
columnTypeDeduplicatedList
.
push_back
(
columnTypeList
[
i
]);
}
this
->
valueBuffers
.
push_back
(
std
::
unique_ptr
<
MyStringBuffer
>
(
new
MyStringBuffer
(
queryDataSet
->
valueList
[
i
])));
std
::
unique_ptr
<
MyStringBuffer
>
(
new
MyStringBuffer
(
queryDataSet
->
valueList
[
columnMap
[
name
]
])));
this
->
bitmapBuffers
.
push_back
(
std
::
unique_ptr
<
MyStringBuffer
>
(
new
MyStringBuffer
(
queryDataSet
->
bitmapList
[
i
])));
std
::
unique_ptr
<
MyStringBuffer
>
(
new
MyStringBuffer
(
queryDataSet
->
bitmapList
[
columnMap
[
name
]
])));
}
this
->
tsQueryDataSet
=
queryDataSet
;
}
...
...
@@ -671,6 +674,12 @@ public:
void
insertRecord
(
const
std
::
string
&
deviceId
,
int64_t
time
,
const
std
::
vector
<
std
::
string
>
&
measurements
,
const
std
::
vector
<
TSDataType
::
TSDataType
>
&
types
,
const
std
::
vector
<
char
*>
&
values
);
void
insertAlignedRecord
(
const
std
::
string
&
deviceId
,
int64_t
time
,
const
std
::
vector
<
std
::
string
>
&
measurements
,
const
std
::
vector
<
std
::
string
>
&
values
);
void
insertAlignedRecord
(
const
std
::
string
&
deviceId
,
int64_t
time
,
const
std
::
vector
<
std
::
string
>
&
measurements
,
const
std
::
vector
<
TSDataType
::
TSDataType
>
&
types
,
const
std
::
vector
<
char
*>
&
values
);
void
insertRecords
(
const
std
::
vector
<
std
::
string
>
&
deviceIds
,
const
std
::
vector
<
int64_t
>
&
times
,
const
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
measurementsList
,
...
...
@@ -682,6 +691,17 @@ public:
const
std
::
vector
<
std
::
vector
<
TSDataType
::
TSDataType
>>
&
typesList
,
const
std
::
vector
<
std
::
vector
<
char
*>>
&
valuesList
);
void
insertAlignedRecords
(
const
std
::
vector
<
std
::
string
>
&
deviceIds
,
const
std
::
vector
<
int64_t
>
&
times
,
const
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
measurementsList
,
const
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
valuesList
);
void
insertAlignedRecords
(
const
std
::
vector
<
std
::
string
>
&
deviceIds
,
const
std
::
vector
<
int64_t
>
&
times
,
const
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
measurementsList
,
const
std
::
vector
<
std
::
vector
<
TSDataType
::
TSDataType
>>
&
typesList
,
const
std
::
vector
<
std
::
vector
<
char
*>>
&
valuesList
);
void
insertRecordsOfOneDevice
(
const
std
::
string
&
deviceId
,
std
::
vector
<
int64_t
>
&
times
,
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
measurementsList
,
...
...
@@ -695,14 +715,35 @@ public:
std
::
vector
<
std
::
vector
<
char
*>>
&
valuesList
,
bool
sorted
);
void
insertAlignedRecordsOfOneDevice
(
const
std
::
string
&
deviceId
,
std
::
vector
<
int64_t
>
&
times
,
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
measurementsList
,
std
::
vector
<
std
::
vector
<
TSDataType
::
TSDataType
>>
&
typesList
,
std
::
vector
<
std
::
vector
<
char
*>>
&
valuesList
);
void
insertAlignedRecordsOfOneDevice
(
const
std
::
string
&
deviceId
,
std
::
vector
<
int64_t
>
&
times
,
std
::
vector
<
std
::
vector
<
std
::
string
>>
&
measurementsList
,
std
::
vector
<
std
::
vector
<
TSDataType
::
TSDataType
>>
&
typesList
,
std
::
vector
<
std
::
vector
<
char
*>>
&
valuesList
,
bool
sorted
);
void
insertTablet
(
Tablet
&
tablet
);
void
insertTablet
(
Tablet
&
tablet
,
bool
sorted
);
void
insertAlignedTablet
(
Tablet
&
tablet
);
void
insertAlignedTablet
(
Tablet
&
tablet
,
bool
sorted
);
void
insertTablets
(
std
::
map
<
std
::
string
,
Tablet
*>
&
tablets
);
void
insertTablets
(
std
::
map
<
std
::
string
,
Tablet
*>
&
tablets
,
bool
sorted
);
void
insertAlignedTablets
(
std
::
map
<
std
::
string
,
Tablet
*>
&
tablets
);
void
insertAlignedTablets
(
std
::
map
<
std
::
string
,
Tablet
*>
&
tablets
,
bool
sorted
);
void
testInsertRecord
(
const
std
::
string
&
deviceId
,
int64_t
time
,
const
std
::
vector
<
std
::
string
>
&
measurements
,
const
std
::
vector
<
std
::
string
>
&
values
);
...
...
@@ -746,6 +787,12 @@ public:
std
::
vector
<
std
::
map
<
std
::
string
,
std
::
string
>>
*
attributesList
,
std
::
vector
<
std
::
string
>
*
measurementAliasList
);
void
createAlignedTimeseries
(
const
std
::
string
&
deviceId
,
const
std
::
vector
<
std
::
string
>
&
measurements
,
const
std
::
vector
<
TSDataType
::
TSDataType
>
&
dataTypes
,
const
std
::
vector
<
TSEncoding
::
TSEncoding
>
&
encodings
,
const
std
::
vector
<
CompressionType
::
CompressionType
>
&
compressors
);
bool
checkTimeseriesExists
(
const
std
::
string
&
path
);
std
::
unique_ptr
<
SessionDataSet
>
executeQueryStatement
(
const
std
::
string
&
sql
);
...
...
client-cpp/src/test/CMakeLists.txt
浏览文件 @
f061b284
...
...
@@ -33,7 +33,7 @@ INCLUDE_DIRECTORIES(${TOOLS_DIR}/thrift/target/thrift-0.14.1/lib/cpp/src)
find_package
(
Boost REQUIRED
)
IF
(
DEFINED BOOST_INCLUDEDIR
)
include_directories
(
SYSTEM
"
${
BOOST_INCLUDE
DIR
}
"
)
include_directories
(
"
${
Boost_INCLUDE_
DIR
}
"
)
ENDIF
()
# Link directories are different for Windows and Linux/Mac
...
...
example/client-cpp-example/pom.xml
浏览文件 @
f061b284
...
...
@@ -107,6 +107,10 @@
<sourceFile>
${project.basedir}/src/SessionExample.cpp
</sourceFile>
<destinationFile>
${project.build.directory}/SessionExample.cpp
</destinationFile>
</fileSet>
<fileSet>
<sourceFile>
${project.basedir}/src/AlignedTimeseriesSessionExample.cpp
</sourceFile>
<destinationFile>
${project.build.directory}/AlignedTimeseriesSessionExample.cpp
</destinationFile>
</fileSet>
<fileSet>
<sourceFile>
${project.basedir}/src/CMakeLists.txt
</sourceFile>
<destinationFile>
${project.build.directory}/CMakeLists.txt
</destinationFile>
...
...
example/client-cpp-example/src/AlignedTimeseriesSessionExample.cpp
0 → 100644
浏览文件 @
f061b284
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Session.h"
using
namespace
std
;
Session
*
session
;
void
createAlignedTimeseries
()
{
string
alignedDeviceId
=
"root.sg1.d1"
;
vector
<
string
>
measurements
=
{
"s1"
,
"s2"
,
"s3"
};
vector
<
string
>
alignedTimeseries
=
{
"root.sg1.d1.s1"
,
"root.sg1.d1.s2"
,
"root.sg1.d1.s3"
};
vector
<
TSDataType
::
TSDataType
>
dataTypes
=
{
TSDataType
::
INT32
,
TSDataType
::
DOUBLE
,
TSDataType
::
BOOLEAN
};
vector
<
TSEncoding
::
TSEncoding
>
encodings
=
{
TSEncoding
::
PLAIN
,
TSEncoding
::
GORILLA
,
TSEncoding
::
RLE
};
vector
<
CompressionType
::
CompressionType
>
compressors
=
{
CompressionType
::
SNAPPY
,
CompressionType
::
UNCOMPRESSED
,
CompressionType
::
SNAPPY
};
for
(
string
timeseries
:
alignedTimeseries
)
{
if
(
session
->
checkTimeseriesExists
(
timeseries
))
{
session
->
deleteTimeseries
(
timeseries
);
}
}
session
->
createAlignedTimeseries
(
alignedDeviceId
,
measurements
,
dataTypes
,
encodings
,
compressors
);
}
void
insertAlignedRecord
()
{
string
deviceId
=
"root.sg1.d1"
;
vector
<
string
>
measurements
;
measurements
.
push_back
(
"s1"
);
measurements
.
push_back
(
"s2"
);
measurements
.
push_back
(
"s3"
);
for
(
int64_t
time
=
0
;
time
<
100
;
time
++
)
{
vector
<
string
>
values
;
values
.
push_back
(
"1"
);
values
.
push_back
(
"1.0"
);
values
.
push_back
(
"true"
);
session
->
insertAlignedRecord
(
deviceId
,
time
,
measurements
,
values
);
}
}
void
insertAlignedRecords
()
{
string
deviceId
=
"root.sg1.d1"
;
vector
<
string
>
measurements
;
measurements
.
push_back
(
"s1"
);
measurements
.
push_back
(
"s2"
);
measurements
.
push_back
(
"s3"
);
vector
<
string
>
deviceIds
;
vector
<
vector
<
string
>>
measurementsList
;
vector
<
vector
<
string
>>
valuesList
;
vector
<
int64_t
>
timestamps
;
for
(
int64_t
time
=
100
;
time
<
200
;
time
++
)
{
vector
<
string
>
values
;
values
.
push_back
(
"1"
);
values
.
push_back
(
"1.0"
);
values
.
push_back
(
"true"
);
deviceIds
.
push_back
(
deviceId
);
measurementsList
.
push_back
(
measurements
);
valuesList
.
push_back
(
values
);
timestamps
.
push_back
(
time
);
if
(
time
!=
100
&&
time
%
100
==
0
)
{
session
->
insertAlignedRecords
(
deviceIds
,
timestamps
,
measurementsList
,
valuesList
);
deviceIds
.
clear
();
measurementsList
.
clear
();
valuesList
.
clear
();
timestamps
.
clear
();
}
}
session
->
insertAlignedRecords
(
deviceIds
,
timestamps
,
measurementsList
,
valuesList
);
}
void
insertAlignedTablet
()
{
pair
<
string
,
TSDataType
::
TSDataType
>
pairA
(
"s1"
,
TSDataType
::
INT32
);
pair
<
string
,
TSDataType
::
TSDataType
>
pairB
(
"s2"
,
TSDataType
::
DOUBLE
);
pair
<
string
,
TSDataType
::
TSDataType
>
pairC
(
"s3"
,
TSDataType
::
BOOLEAN
);
vector
<
pair
<
string
,
TSDataType
::
TSDataType
>>
schemas
;
schemas
.
push_back
(
pairA
);
schemas
.
push_back
(
pairB
);
schemas
.
push_back
(
pairC
);
Tablet
tablet
(
"root.sg1.d1"
,
schemas
,
100
);
tablet
.
setAligned
(
true
);
for
(
int64_t
time
=
200
;
time
<
300
;
time
++
)
{
int
row
=
tablet
.
rowSize
++
;
tablet
.
timestamps
[
row
]
=
time
;
tablet
.
values
[
0
][
row
]
=
"1"
;
tablet
.
values
[
1
][
row
]
=
"1.0"
;
tablet
.
values
[
2
][
row
]
=
"true"
;
if
(
tablet
.
rowSize
==
tablet
.
maxRowNumber
)
{
session
->
insertTablet
(
tablet
,
true
);
tablet
.
reset
();
}
}
if
(
tablet
.
rowSize
!=
0
)
{
session
->
insertTablet
(
tablet
);
tablet
.
reset
();
}
}
void
insertAlignedTablets
()
{
pair
<
string
,
TSDataType
::
TSDataType
>
pairA
(
"s1"
,
TSDataType
::
INT32
);
pair
<
string
,
TSDataType
::
TSDataType
>
pairB
(
"s2"
,
TSDataType
::
DOUBLE
);
pair
<
string
,
TSDataType
::
TSDataType
>
pairC
(
"s3"
,
TSDataType
::
BOOLEAN
);
vector
<
pair
<
string
,
TSDataType
::
TSDataType
>>
schemas
;
schemas
.
push_back
(
pairA
);
schemas
.
push_back
(
pairB
);
schemas
.
push_back
(
pairC
);
Tablet
tablet1
(
"root.sg1.d1"
,
schemas
,
100
);
Tablet
tablet2
(
"root.sg1.d2"
,
schemas
,
100
);
Tablet
tablet3
(
"root.sg1.d3"
,
schemas
,
100
);
map
<
string
,
Tablet
*>
tabletMap
;
tabletMap
[
"root.sg1.d1"
]
=
&
tablet1
;
tabletMap
[
"root.sg1.d2"
]
=
&
tablet2
;
tabletMap
[
"root.sg1.d3"
]
=
&
tablet3
;
for
(
int64_t
time
=
300
;
time
<
400
;
time
++
)
{
int
row1
=
tablet1
.
rowSize
++
;
int
row2
=
tablet2
.
rowSize
++
;
int
row3
=
tablet3
.
rowSize
++
;
tablet1
.
timestamps
[
row1
]
=
time
;
tablet2
.
timestamps
[
row2
]
=
time
;
tablet3
.
timestamps
[
row3
]
=
time
;
tablet1
.
values
[
0
][
row1
]
=
"1"
;
tablet2
.
values
[
0
][
row2
]
=
"2"
;
tablet3
.
values
[
0
][
row3
]
=
"3"
;
tablet1
.
values
[
1
][
row1
]
=
"1.0"
;
tablet2
.
values
[
1
][
row2
]
=
"2.0"
;
tablet3
.
values
[
1
][
row3
]
=
"3.0"
;
tablet1
.
values
[
2
][
row1
]
=
"true"
;
tablet2
.
values
[
2
][
row2
]
=
"false"
;
tablet3
.
values
[
2
][
row3
]
=
"true"
;
if
(
tablet1
.
rowSize
==
tablet1
.
maxRowNumber
)
{
session
->
insertAlignedTablets
(
tabletMap
,
true
);
tablet1
.
reset
();
tablet2
.
reset
();
tablet3
.
reset
();
}
}
if
(
tablet1
.
rowSize
!=
0
)
{
session
->
insertAlignedTablets
(
tabletMap
,
true
);
tablet1
.
reset
();
tablet2
.
reset
();
tablet3
.
reset
();
}
}
int
main
()
{
session
=
new
Session
(
"127.0.0.1"
,
6667
,
"root"
,
"root"
);
cout
<<
"session open
\n
"
<<
endl
;
session
->
open
(
false
);
cout
<<
"setStorageGroup
\n
"
<<
endl
;
try
{
session
->
setStorageGroup
(
"root.sg1"
);
}
catch
(
IoTDBConnectionException
e
){
string
errorMessage
(
e
.
what
());
if
(
errorMessage
.
find
(
"StorageGroupAlreadySetException"
)
==
string
::
npos
)
{
cout
<<
errorMessage
<<
endl
;
throw
e
;
}
}
cout
<<
"createAlignedTimeseries
\n
"
<<
endl
;
createAlignedTimeseries
();
cout
<<
"insertAlignedRecord
\n
"
<<
endl
;
insertAlignedRecord
();
cout
<<
"insertAlignedRecords
\n
"
<<
endl
;
insertAlignedRecords
();
cout
<<
"insertAlignedTablet
\n
"
<<
endl
;
insertAlignedTablet
();
cout
<<
"insertAlignedTablets
\n
"
<<
endl
;
insertAlignedTablets
();
cout
<<
"session close
\n
"
<<
endl
;
session
->
close
();
delete
session
;
cout
<<
"finished
\n
"
<<
endl
;
return
0
;
}
example/client-cpp-example/src/CMakeLists.txt
浏览文件 @
f061b284
...
...
@@ -36,8 +36,12 @@ ENDIF()
LINK_DIRECTORIES
(
${
CMAKE_SOURCE_DIR
}
/../target/client/lib
)
ADD_EXECUTABLE
(
SessionExample SessionExample.cpp
)
ADD_EXECUTABLE
(
AlignedTimeseriesSessionExample AlignedTimeseriesSessionExample.cpp
)
IF
(
MSVC
)
TARGET_LINK_LIBRARIES
(
SessionExample iotdb_session thriftmd
)
TARGET_LINK_LIBRARIES
(
AlignedTimeseriesSessionExample iotdb_session thriftmd
)
ELSE
()
TARGET_LINK_LIBRARIES
(
SessionExample iotdb_session pthread
)
TARGET_LINK_LIBRARIES
(
AlignedTimeseriesSessionExample iotdb_session pthread
)
ENDIF
()
example/client-cpp-example/src/SessionExample.cpp
浏览文件 @
f061b284
...
...
@@ -110,7 +110,7 @@ void insertTablet() {
Tablet
tablet
(
"root.sg1.d1"
,
schemas
,
100
);
for
(
int64_t
time
=
0
;
time
<
1
00
;
time
++
)
{
for
(
int64_t
time
=
100
;
time
<
2
00
;
time
++
)
{
int
row
=
tablet
.
rowSize
++
;
tablet
.
timestamps
[
row
]
=
time
;
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
...
...
@@ -146,7 +146,7 @@ void insertTablets() {
tabletMap
[
"root.sg1.d2"
]
=
&
tablet2
;
tabletMap
[
"root.sg1.d3"
]
=
&
tablet3
;
for
(
int64_t
time
=
0
;
time
<
1
00
;
time
++
)
{
for
(
int64_t
time
=
200
;
time
<
3
00
;
time
++
)
{
int
row1
=
tablet1
.
rowSize
++
;
int
row2
=
tablet2
.
rowSize
++
;
int
row3
=
tablet3
.
rowSize
++
;
...
...
@@ -156,8 +156,8 @@ void insertTablets() {
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
tablet1
.
values
[
i
][
row1
]
=
to_string
(
i
);
tablet2
.
values
[
i
][
row
1
]
=
to_string
(
i
);
tablet3
.
values
[
i
][
row
1
]
=
to_string
(
i
);
tablet2
.
values
[
i
][
row
2
]
=
to_string
(
i
);
tablet3
.
values
[
i
][
row
3
]
=
to_string
(
i
);
}
if
(
tablet1
.
rowSize
==
tablet1
.
maxRowNumber
)
{
session
->
insertTablets
(
tabletMap
,
true
);
...
...
@@ -188,7 +188,7 @@ void insertRecords() {
vector
<
vector
<
string
>>
valuesList
;
vector
<
int64_t
>
timestamps
;
for
(
int64_t
time
=
0
;
time
<
5
00
;
time
++
)
{
for
(
int64_t
time
=
300
;
time
<
4
00
;
time
++
)
{
vector
<
string
>
values
;
values
.
push_back
(
"1"
);
values
.
push_back
(
"2"
);
...
...
@@ -198,7 +198,7 @@ void insertRecords() {
measurementsList
.
push_back
(
measurements
);
valuesList
.
push_back
(
values
);
timestamps
.
push_back
(
time
);
if
(
time
!=
0
&&
time
%
100
==
0
)
{
if
(
time
!=
30
0
&&
time
%
100
==
0
)
{
session
->
insertRecords
(
deviceIds
,
timestamps
,
measurementsList
,
valuesList
);
deviceIds
.
clear
();
measurementsList
.
clear
();
...
...
@@ -261,7 +261,7 @@ void queryLast() {
while
(
dataSet
->
hasNext
())
{
cout
<<
dataSet
->
next
()
->
toString
();
}
cout
<<
endl
;
dataSet
->
closeOperationHandle
();
}
...
...
@@ -269,42 +269,56 @@ int main() {
session
=
new
Session
(
"127.0.0.1"
,
6667
,
"root"
,
"root"
);
session
->
open
(
false
);
cout
<<
"setStorageGroup
\n
"
;
cout
<<
"setStorageGroup
\n
"
<<
endl
;
try
{
session
->
setStorageGroup
(
"root.sg1"
);
}
catch
(
IoTDBConnectionException
e
){
string
errorMessage
(
e
.
what
());
if
(
errorMessage
.
find
(
"StorageGroupAlreadySetException"
)
==
string
::
npos
)
{
cout
<<
errorMessage
<<
endl
;
throw
e
;
}
}
cout
<<
"createTimeseries
\n
"
<<
endl
;
createTimeseries
();
cout
<<
"createMultiTimeseries
\n
"
<<
endl
;
createMultiTimeseries
();
cout
<<
"insertRecord
\n
"
<<
endl
;
insertRecord
();
cout
<<
"queryLast
\n
"
<<
endl
;
queryLast
();
cout
<<
"insertTablet
\n
"
<<
endl
;
insertTablet
();
cout
<<
"insertRecords
\n
"
<<
endl
;
insertRecords
();
cout
<<
"insertTablets
\n
"
<<
endl
;
insertTablets
();
cout
<<
"nonQuery
\n
"
<<
endl
;
nonQuery
();
cout
<<
"query
\n
"
<<
endl
;
query
();
cout
<<
"deleteData
\n
"
<<
endl
;
deleteData
();
cout
<<
"deleteTimeseries
\n
"
<<
endl
;
deleteTimeseries
();
cout
<<
"session close
\n
"
<<
endl
;
session
->
close
();
delete
session
;
cout
<<
"finished!
\n
"
<<
endl
;
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录