未验证 提交 1d7386d7 编写于 作者: J Julian 提交者: GitHub

[IOTDB-1273]Feature/restrucutre python module as well as supporting pandas dataframe (#2922)

上级 70e451eb
......@@ -99,4 +99,7 @@ classes/
### Cmake files ###
*.cmake
Makefile
**/CMakeFiles/
\ No newline at end of file
**/CMakeFiles/
# Exclude copied license
/client-py/LICENSE
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
[flake8]
ignore =
E203,
W503
max-line-length=200
exclude =
.git,
test/*,
iotdb/thrift/**/*
extend-exclude =
dist,
build,
venv
show-source = True
statistics = True
format = %(path)s:%(row)d,%(col)d:%(code)s:%(text)s:https://lintlyci.github.io/Flake8Rules/rules/%(code)s.html
/iotdb/thrift/
# generated by Pypi
/build/
/dist/
/apache_iotdb.egg-info
......@@ -89,4 +89,105 @@ class MyTestCase(unittest.TestCase):
session.close()
```
by default it will load the image `apache/iotdb:latest`, if you want a specific version just pass it like e.g. `IoTDBContainer("apache/iotdb:0.10.0")` to get version `0.10.0` running.
\ No newline at end of file
by default it will load the image `apache/iotdb:latest`, if you want a specific version just pass it like e.g. `IoTDBContainer("apache/iotdb:0.10.0")` to get version `0.10.0` running.
## Pandas Support
To easily transform a query result to a [Pandas Dataframe](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
the SessionDataSet has a method `.todf()` which consumes the dataset and transforms it to a pandas dataframe.
Example:
```python
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = 'root'
password_ = 'root'
session = Session(ip, port_, username_, password_)
session.open(False)
result = session.execute_query_statement("SELECT * FROM root.*")
# Transform to Pandas Dataset
df = result.todf()
session.close()
# Now you can work with the dataframe
df = ...
```
## Developers
### Introduction
This is an example of how to connect to IoTDB with python, using the thrift rpc interfaces. Things
are almost the same on Windows or Linux, but pay attention to the difference like path separator.
### Prerequisites
python3.7 or later is preferred.
You have to install Thrift (0.11.0 or later) to compile our thrift file into python code. Below is the official
tutorial of installation, eventually, you should have a thrift executable.
```
http://thrift.apache.org/docs/install/
```
Before starting you need to install `requirements_dev.txt` in your python environment, e.g. by calling
```
pip install -r requirements_dev.txt
```
### Compile the thrift library and Debug
In the root of IoTDB's source code folder, run `mvn clean generate-sources -pl client-py -am`.
This will automatically delete and repopulate the folder `iotdb/thrift` with the generated thrift files.
This folder is ignored from git and should **never be pushed to git!**
**Notice** Do not upload `iotdb/thrift` to the git repo.
### Session Client & Example
We packed up the Thrift interface in `client-py/src/iotdb/Session.py` (similar with its Java counterpart), also provided
an example file `client-py/src/SessionExample.py` of how to use the session module. please read it carefully.
Or, another simple example:
```python
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = 'root'
password_ = 'root'
session = Session(ip, port_, username_, password_)
session.open(False)
zone = session.get_time_zone()
session.close()
```
### Tests
Please add your custom tests in `tests` folder.
To run all defined tests just type `pytest .` in the root folder.
**Notice** Some tests need docker to be started on your system as a test instance is started in a docker container using [testcontainers](https://testcontainers-python.readthedocs.io/en/latest/index.html).
### Futher Tools
[black](https://pypi.org/project/black/) and [flake8](https://pypi.org/project/flake8/) are installed for autoformatting and linting.
Both can be run by `black .` or `flake8 .` respectively.
## Releasing
To do a release just ensure that you have the right set of generated thrift files.
Then run linting and auto-formatting.
Then, ensure that all tests work (via `pytest .`).
Then you are good to go to do a release!
\ No newline at end of file
......@@ -19,15 +19,15 @@
# Uncomment the following line to use apache-iotdb module installed by pip3
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.Tablet import Tablet
from iotdb.utils.IoTDBConstants import *
# creating session connection.
ip = "127.0.0.1"
port_ = "6667"
username_ = 'root'
password_ = 'root'
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id='UTC+8')
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
session.open(False)
# set and delete storage groups
......@@ -39,74 +39,136 @@ session.delete_storage_group("root.sg_test_02")
session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
# setting time series.
session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY)
session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY)
session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY)
session.create_time_series(
"root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
)
# setting multiple time series once.
ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06",
"root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]
data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT,
TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
ts_path_lst_ = [
"root.sg_test_01.d_01.s_04",
"root.sg_test_01.d_01.s_05",
"root.sg_test_01.d_01.s_06",
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_)
session.create_multi_time_series(
ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
# delete time series
session.delete_time_series(["root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"])
session.delete_time_series(
[
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
)
# checking time series
print("s_07 expecting False, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_07"))
print("s_03 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_03"))
print(
"s_07 expecting False, checking result: ",
session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
)
print(
"s_03 expecting True, checking result: ",
session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
)
# insert one record into the database.
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64,
TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
# insert multiple records into database
measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]]
values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"]]
measurements_list_ = [
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
]
values_list_ = [
[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"],
]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
session.insert_records(device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_)
session.insert_records(
device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
)
# insert one tablet into the database.
values_ = [[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"]] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_)
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
)
session.insert_tablet(tablet_)
# insert multiple tablets into database
tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11])
tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
tablet_01 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
)
tablet_02 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
)
session.insert_tablets([tablet_01, tablet_02])
# insert records of one device
time_list = [1, 2, 3]
measurements_list = [["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"]]
data_types_list = [[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64]]
measurements_list = [
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
]
data_types_list = [
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
]
values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
session.insert_records_of_one_device("root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list)
session.insert_records_of_one_device(
"root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list
)
# execute non-query sql statement
session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)")
session.execute_non_query_statement(
"insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
)
# execute sql query statement
session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
session_data_set.set_fetch_size(1024)
while session_data_set.has_next():
print(session_data_set.next())
print(session_data_set.next())
session_data_set.close_operation_handle()
# close session connection.
......
......@@ -18,33 +18,35 @@
# Uncomment the following line to use apache-iotdb module installed by pip3
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.Tablet import Tablet
from iotdb.utils.IoTDBConstants import *
# whether the test has passed
final_flag = True
failed_count = 0
def test_fail(message):
global failed_count
global final_flag
print("*********")
print(message)
print("*********")
final_flag = False
failed_count += 1
global failed_count
global final_flag
print("*********")
print(message)
print("*********")
final_flag = False
failed_count += 1
# creating session connection.
ip = "127.0.0.1"
port_ = "6667"
username_ = 'root'
password_ = 'root'
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id='UTC+8')
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
session.open(False)
if not session.is_open():
print("can't open session")
exit(1)
print("can't open session")
exit(1)
# set and delete storage groups
session.set_storage_group("root.sg_test_01")
......@@ -53,86 +55,165 @@ session.set_storage_group("root.sg_test_03")
session.set_storage_group("root.sg_test_04")
if session.delete_storage_group("root.sg_test_02") < 0:
test_fail("delete storage group failed")
test_fail("delete storage group failed")
if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
test_fail("delete storage groups failed")
test_fail("delete storage groups failed")
# setting time series.
session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY)
session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY)
session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY)
session.create_time_series(
"root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
)
# setting multiple time series once.
ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06",
"root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]
data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT,
TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
ts_path_lst_ = [
"root.sg_test_01.d_01.s_04",
"root.sg_test_01.d_01.s_05",
"root.sg_test_01.d_01.s_06",
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_)
session.create_multi_time_series(
ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
# delete time series
if session.delete_time_series(["root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]) < 0:
test_fail("delete time series failed")
if (
session.delete_time_series(
[
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
)
< 0
):
test_fail("delete time series failed")
# checking time series
# s_07 expecting False
if session.check_time_series_exists("root.sg_test_01.d_01.s_07"):
test_fail("root.sg_test_01.d_01.s_07 shouldn't exist")
test_fail("root.sg_test_01.d_01.s_07 shouldn't exist")
# s_03 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_01.s_03"):
test_fail("root.sg_test_01.d_01.s_03 should exist")
test_fail("root.sg_test_01.d_01.s_03 should exist")
# insert one record into the database.
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64,
TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
if session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) < 0:
test_fail("insert record failed")
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
if (
session.insert_record(
"root.sg_test_01.d_01", 1, measurements_, data_types_, values_
)
< 0
):
test_fail("insert record failed")
# insert multiple records into database
measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]]
values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"]]
measurements_list_ = [
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
]
values_list_ = [
[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"],
]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
if session.insert_records(device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_) < 0:
test_fail("insert records failed")
if (
session.insert_records(
device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
)
< 0
):
test_fail("insert records failed")
# insert one tablet into the database.
values_ = [[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"]] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_)
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
)
if session.insert_tablet(tablet_) < 0:
test_fail("insert tablet failed")
test_fail("insert tablet failed")
# insert multiple tablets into database
tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11])
tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
tablet_01 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
)
tablet_02 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
)
if session.insert_tablets([tablet_01, tablet_02]) < 0:
test_fail("insert tablets failed")
test_fail("insert tablets failed")
# insert records of one device
time_list = [1, 2, 3]
measurements_list = [["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"]]
data_types_list = [[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64]]
measurements_list = [
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
]
data_types_list = [
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
]
values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
if session.insert_records_of_one_device("root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list) < 0:
test_fail("insert records of one device failed")
if (
session.insert_records_of_one_device(
"root.sg_test_01.d_01",
time_list,
measurements_list,
data_types_list,
values_list,
)
< 0
):
test_fail("insert records of one device failed")
# execute non-query sql statement
if session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)") < 0:
test_fail("execute 'insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)' failed")
if (
session.execute_non_query_statement(
"insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
)
< 0
):
test_fail(
"execute 'insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)' failed"
)
# execute sql query statement
session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
......@@ -140,19 +221,23 @@ session_data_set.set_fetch_size(1024)
expect_count = 16
actual_count = 0
while session_data_set.has_next():
actual_count += 1
actual_count += 1
session_data_set.close_operation_handle()
if actual_count != expect_count:
test_fail("query count mismatch: expect count: "
+ str(expect_count) + " actual count: " + str(actual_count))
test_fail(
"query count mismatch: expect count: "
+ str(expect_count)
+ " actual count: "
+ str(actual_count)
)
# close session connection.
session.close()
if final_flag:
print("All executions done!!")
print("All executions done!!")
else:
print("Some test failed, please have a check")
print("failed count: ", failed_count)
exit(1)
print("Some test failed, please have a check")
print("failed count: ", failed_count)
exit(1)
......@@ -33,7 +33,9 @@ class IoTDBContainer(DockerContainer):
@wait_container_is_ready()
def _connect(self):
session = Session(self.get_container_host_ip(), self.get_exposed_port(6667), 'root', 'root')
session = Session(
self.get_container_host_ip(), self.get_exposed_port(6667), "root", "root"
)
session.open(False)
session.close()
......
......@@ -15,4 +15,3 @@
# specific language governing permissions and limitations
# under the License.
#
......@@ -19,11 +19,8 @@
# for package
from .IoTDBConstants import TSDataType
# for debug
# from IoTDBConstants import TSDataType
class Field(object):
def __init__(self, data_type):
"""
:param data_type: TSDataType
......@@ -53,7 +50,9 @@ class Field(object):
elif output.get_data_type() == TSDataType.TEXT:
output.set_binary_value(field.get_binary_value())
else:
raise Exception("unsupported data type {}".format(output.get_data_type()))
raise Exception(
"unsupported data type {}".format(output.get_data_type())
)
return output
def get_data_type(self):
......@@ -124,7 +123,7 @@ class Field(object):
elif self.__data_type == TSDataType.DOUBLE:
return str(self.__double_value)
elif self.__data_type == TSDataType.TEXT:
return self.__binary_value.decode('utf-8')
return self.__binary_value.decode("utf-8")
else:
raise Exception("unsupported data type {}".format(self.__data_type))
......@@ -176,4 +175,3 @@ class Field(object):
else:
raise Exception("unsupported data type {}".format(data_type))
return field
......@@ -17,18 +17,9 @@
#
# for package
from .IoTDBConstants import *
# for debug
# from IoTDBConstants import *
import sys
from os.path import dirname, abspath
path = dirname(dirname(abspath(__file__)))
sys.path.append(path)
from thrift.transport import TTransport
from iotdb.thrift.rpc.TSIService import TSFetchResultsReq, TSCloseOperationReq
from iotdb.utils.IoTDBConstants import TSDataType
class IoTDBRpcDataSet(object):
......@@ -37,8 +28,19 @@ class IoTDBRpcDataSet(object):
START_INDEX = 2
FLAG = 0x80
def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id,
client, session_id, query_data_set, fetch_size):
def __init__(
self,
sql,
column_name_list,
column_type_list,
column_name_index,
ignore_timestamp,
query_id,
client,
session_id,
query_data_set,
fetch_size,
):
self.__session_id = session_id
self.__ignore_timestamp = ignore_timestamp
self.__sql = sql
......@@ -57,15 +59,21 @@ class IoTDBRpcDataSet(object):
self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1
if column_name_index is not None:
self.__column_type_deduplicated_list = [None for _ in range(len(column_name_index))]
self.__column_type_deduplicated_list = [
None for _ in range(len(column_name_index))
]
for i in range(len(column_name_list)):
name = column_name_list[i]
self.__column_name_list.append(name)
self.__column_type_list.append(TSDataType[column_type_list[i]])
if name not in self.__column_ordinal_dict:
index = column_name_index[name]
self.__column_ordinal_dict[name] = index + IoTDBRpcDataSet.START_INDEX
self.__column_type_deduplicated_list[index] = TSDataType[column_type_list[i]]
self.__column_ordinal_dict[name] = (
index + IoTDBRpcDataSet.START_INDEX
)
self.__column_type_deduplicated_list[index] = TSDataType[
column_type_list[i]
]
else:
index = IoTDBRpcDataSet.START_INDEX
self.__column_type_deduplicated_list = []
......@@ -76,10 +84,14 @@ class IoTDBRpcDataSet(object):
if name not in self.__column_ordinal_dict:
self.__column_ordinal_dict[name] = index
index += 1
self.__column_type_deduplicated_list.append(TSDataType[column_type_list[i]])
self.__column_type_deduplicated_list.append(
TSDataType[column_type_list[i]]
)
self.__time_bytes = bytes(0)
self.__current_bitmap = [bytes(0) for _ in range(len(self.__column_type_deduplicated_list))]
self.__current_bitmap = [
bytes(0) for _ in range(len(self.__column_type_deduplicated_list))
]
self.__value = [None for _ in range(len(self.__column_type_deduplicated_list))]
self.__query_data_set = query_data_set
self.__is_closed = False
......@@ -92,8 +104,14 @@ class IoTDBRpcDataSet(object):
return
if self.__client is not None:
try:
status = self.__client.closeOperation(TSCloseOperationReq(self.__session_id, self.__query_id))
print("close session {}, message: {}".format(self.__session_id, status.message))
status = self.__client.closeOperation(
TSCloseOperationReq(self.__session_id, self.__query_id)
)
print(
"close session {}, message: {}".format(
self.__session_id, status.message
)
)
except TTransport.TException as e:
print("close session {} failed because: ".format(self.__session_id), e)
raise Exception
......@@ -113,7 +131,9 @@ class IoTDBRpcDataSet(object):
return False
def has_cached_result(self):
return (self.__query_data_set is not None) and (len(self.__query_data_set.time) != 0)
return (self.__query_data_set is not None) and (
len(self.__query_data_set.time) != 0
)
def construct_one_row(self):
# simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
......@@ -147,9 +167,11 @@ class IoTDBRpcDataSet(object):
self.__value[i] = value_buffer[:8]
self.__query_data_set.valueList[i] = value_buffer[8:]
elif data_type == TSDataType.TEXT:
length = int.from_bytes(value_buffer[:4], byteorder="big", signed=False)
self.__value[i] = value_buffer[4: 4 + length]
self.__query_data_set.valueList[i] = value_buffer[4 + length:]
length = int.from_bytes(
value_buffer[:4], byteorder="big", signed=False
)
self.__value[i] = value_buffer[4 : 4 + length]
self.__query_data_set.valueList[i] = value_buffer[4 + length :]
else:
print("unsupported data type {}.".format(data_type))
# could raise exception here
......@@ -158,7 +180,14 @@ class IoTDBRpcDataSet(object):
def fetch_results(self):
self.__rows_index = 0
request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True, self.__default_time_out)
request = TSFetchResultsReq(
self.__session_id,
self.__sql,
self.__fetch_size,
self.__query_id,
True,
self.__default_time_out,
)
try:
resp = self.__client.fetchResults(request)
if not resp.hasResultSet:
......@@ -172,10 +201,13 @@ class IoTDBRpcDataSet(object):
def is_null(self, index, row_num):
bitmap = self.__current_bitmap[index]
shift = row_num % 8
return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xff)) == 0
return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xFF)) == 0
def is_null_by_index(self, column_index):
index = self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] - IoTDBRpcDataSet.START_INDEX
index = (
self.__column_ordinal_dict[self.find_column_name_by_index(column_index)]
- IoTDBRpcDataSet.START_INDEX
)
# time column will never be None
if index < 0:
return True
......@@ -192,7 +224,11 @@ class IoTDBRpcDataSet(object):
if column_index <= 0:
raise Exception("Column index should start from 1")
if column_index > len(self.__column_name_list):
raise Exception("column index {} out of range {}".format(column_index, self.__column_size))
raise Exception(
"column index {} out of range {}".format(
column_index, self.__column_size
)
)
return self.__column_name_list[column_index - 1]
def get_fetch_size(self):
......
......@@ -23,15 +23,12 @@ from .Field import Field
# from IoTDBConstants import TSDataType
# from Field import Field
class RowRecord(object):
class RowRecord(object):
def __init__(self, timestamp, field_list=None):
self.__timestamp = timestamp
self.__field_list = field_list
def add_field(self, field):
self.__field_list.append(field)
def add_field(self, value, data_type):
self.__field_list.append(Field.get_field(value, data_type))
......
......@@ -16,27 +16,43 @@
# under the License.
#
# for package
from .IoTDBConstants import TSDataType
from .IoTDBRpcDataSet import IoTDBRpcDataSet
from .Field import Field
from .RowRecord import RowRecord
import struct
# for debug
# from IoTDBConstants import TSDataType
# from IoTDBRpcDataSet import IoTDBRpcDataSet
# from Field import Field
# from RowRecord import RowRecord
from iotdb.utils.Field import Field
import struct
# for package
from iotdb.utils.IoTDBConstants import TSDataType
from iotdb.utils.IoTDBRpcDataSet import IoTDBRpcDataSet
from iotdb.utils.RowRecord import RowRecord
import pandas as pd
class SessionDataSet(object):
def __init__(self, sql, column_name_list, column_type_list, column_name_index, query_id, client, session_id,
query_data_set, ignore_timestamp):
self.iotdb_rpc_data_set = IoTDBRpcDataSet(sql, column_name_list, column_type_list, column_name_index,
ignore_timestamp, query_id, client, session_id, query_data_set, 1024)
class SessionDataSet(object):
def __init__(
self,
sql,
column_name_list,
column_type_list,
column_name_index,
query_id,
client,
session_id,
query_data_set,
ignore_timestamp,
):
self.iotdb_rpc_data_set = IoTDBRpcDataSet(
sql,
column_name_list,
column_type_list,
column_name_index,
ignore_timestamp,
query_id,
client,
session_id,
query_data_set,
1024,
)
def get_fetch_size(self):
return self.iotdb_rpc_data_set.get_fetch_size()
......@@ -69,11 +85,16 @@ class SessionDataSet(object):
index -= 1
data_set_column_index -= 1
column_name = self.iotdb_rpc_data_set.get_column_names()[index]
location = self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] - IoTDBRpcDataSet.START_INDEX
location = (
self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name]
- IoTDBRpcDataSet.START_INDEX
)
if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index):
value_bytes = self.iotdb_rpc_data_set.get_values()[location]
data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[location]
data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[
location
]
field = Field(data_type)
if data_type == TSDataType.BOOLEAN:
value = struct.unpack(">?", value_bytes)[0]
......@@ -99,11 +120,61 @@ class SessionDataSet(object):
field = Field(None)
out_fields.append(field)
return RowRecord(struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields)
return RowRecord(
struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields
)
def close_operation_handle(self):
self.iotdb_rpc_data_set.close()
def todf(self):
return resultset_to_pandas(self)
def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
"""
Transforms a SessionDataSet from IoTDB to a Pandas Data Frame
Each Field from IoTDB is a column in Pandas
:param result_set:
:return:
"""
# get column names and fields
column_names = result_set.get_column_names()
value_dict = {}
for i in range(len(column_names)):
value_dict[column_names[i]] = []
while result_set.has_next():
record = result_set.next()
value_dict["Time"].append(record.get_timestamp())
for col in range(len(record.get_fields())):
field: Field = record.get_fields()[col]
value_dict[column_names[col + 1]].append(get_typed_point(field))
return pd.DataFrame(value_dict)
def get_typed_point(field: Field, none_value=None):
choices = {
# In Case of Boolean, cast to 0 / 1
TSDataType.BOOLEAN: lambda field: 1 if field.get_bool_value() else 0,
TSDataType.TEXT: lambda field: field.get_string_value(),
TSDataType.FLOAT: lambda field: field.get_float_value(),
TSDataType.INT32: lambda field: field.get_int_value(),
TSDataType.DOUBLE: lambda field: field.get_double_value(),
TSDataType.INT64: lambda field: field.get_long_value(),
}
result_next_type: TSDataType = field.get_data_type()
if result_next_type in choices.keys():
return choices.get(result_next_type)(field)
elif result_next_type is None:
return none_value
else:
raise Exception(f"Unknown DataType {result_next_type}!")
......@@ -16,17 +16,12 @@
# under the License.
#
# for package
from .IoTDBConstants import *
# for debug
# from IoTDBConstants import *
import struct
from iotdb.utils.IoTDBConstants import TSDataType
class Tablet(object):
class Tablet(object):
def __init__(self, device_id, measurements, data_types, values, timestamps):
"""
creating a tablet for insertion
......@@ -88,7 +83,7 @@ class Tablet(object):
format_str_list.append("q")
values_tobe_packed.append(timestamp)
format_str = ''.join(format_str_list)
format_str = "".join(format_str_list)
return struct.pack(format_str, *values_tobe_packed)
def get_binary_values(self):
......@@ -122,7 +117,7 @@ class Tablet(object):
values_tobe_packed.append(self.__values[j][i])
elif self.__data_types[i] == TSDataType.TEXT:
for j in range(self.__row_number):
value_bytes = bytes(self.__values[j][i], 'utf-8')
value_bytes = bytes(self.__values[j][i], "utf-8")
format_str_list.append("i")
format_str_list.append(str(len(value_bytes)))
format_str_list.append("s")
......@@ -133,6 +128,5 @@ class Tablet(object):
# could raise an error here.
return
format_str = ''.join(format_str_list)
format_str = "".join(format_str_list)
return struct.pack(format_str, *values_tobe_packed)
......@@ -15,4 +15,3 @@
# specific language governing permissions and limitations
# under the License.
#
......@@ -39,7 +39,37 @@
</dependencies>
<build>
<plugins>
<!-- for pypi distribution -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- clean thrift folder -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<filesets>
<fileset>
<directory>iotdb</directory>
<includes>
<include>thrift/</include>
</includes>
<followSymlinks>false</followSymlinks>
</fileset>
<fileset>
<directory>./</directory>
<includes>
<include>LICENSE</include>
</includes>
</fileset>
</filesets>
</configuration>
</plugin>
<!-- fill thrift folder -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
......@@ -57,16 +87,17 @@
</goals>
<configuration>
<encoding>utf-8</encoding>
<outputDirectory>${project.build.directory}/pypi/</outputDirectory>
<outputDirectory>${basedir}/iotdb/thrift/</outputDirectory>
<resources>
<resource>
<directory>${basedir}/../thrift/target/generated-sources-python</directory>
<directory>${basedir}/../thrift/target/generated-sources-python/iotdb/thrift/</directory>
</resource>
</resources>
</configuration>
</execution>
<!-- Copy License -->
<execution>
<id>copy-license-resources</id>
<id>copy-pypi-file-resources</id>
<!-- here the phase you need -->
<phase>generate-sources</phase>
<goals>
......@@ -74,10 +105,10 @@
</goals>
<configuration>
<encoding>utf-8</encoding>
<outputDirectory>${project.build.directory}/pypi</outputDirectory>
<outputDirectory>${basedir}/</outputDirectory>
<resources>
<resource>
<directory>${basedir}/../</directory>
<directory>${basedir}/..</directory>
<includes>
<include>LICENSE</include>
</includes>
......@@ -85,40 +116,6 @@
</resources>
</configuration>
</execution>
<execution>
<id>copy-python-file-resources</id>
<!-- here the phase you need -->
<phase>generate-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<encoding>utf-8</encoding>
<outputDirectory>${project.build.directory}/pypi/</outputDirectory>
<resources>
<resource>
<directory>${basedir}/src/</directory>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-pypi-file-resources</id>
<!-- here the phase you need -->
<phase>generate-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<encoding>utf-8</encoding>
<outputDirectory>${project.build.directory}/pypi</outputDirectory>
<resources>
<resource>
<directory>${basedir}/pypi</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
......
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
[tool.black]
line-length = 88
target-version = ['py37']
include = '\.pyi?$'
exclude = '''
(
/(
\.eggs # exclude a few common directories in the
| \.git # root of the project
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| venv
| _build
| buck-out
| build
| dist
| migrations
| test
| iotdb/thrift
)/
| foo.py # also separately exclude a file named foo.py in
# the root of the project
)
'''
\ No newline at end of file
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Python Client
## Introduction
This is an example of how to connect to IoTDB with python, using the thrift rpc interfaces. Things
are almost the same on Windows or Linux, but pay attention to the difference like path separator.
## Prerequisites
python3.7 or later is preferred.
You have to install Thrift (0.11.0 or later) to compile our thrift file into python code. Below is the official
tutorial of installation, eventually, you should have a thrift executable.
```
http://thrift.apache.org/docs/install/
```
## Compile the thrift library and Debug
In the root of IoTDB's source code folder, run `mvn generate-sources -pl client-py -am`.
Then a complete project will be generated at `client-py/target/pypi` folder.
But !BE CAUTIOUS!
All your modifications in `client-py/target/pypi` must be copied manually to `client-py/src/` folder.
Otherwise once you run `mvn clean`, you will lose all your effort.
Or, you can also copy `client-py/target/pypi/iotdb/thrift` folder to `client-py/src/thrift`, then the
`src` folder will become also a complete python project.
But !BE CAUTIOUS!
Do not upload `client-py/src/thrift` to the git repo.
## Session Client & Example
We packed up the Thrift interface in `client-py/src/iotdb/Session.py` (similar with its Java counterpart), also provided
an example file `client-py/src/SessionExample.py` of how to use the session module. please read it carefully.
Or, another simple example:
```python
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = 'root'
password_ = 'root'
session = Session(ip, port_, username_, password_)
session.open(False)
zone = session.get_time_zone()
session.close()
```
## test file
You can use `client-py/src/SessionTest.py` to test python session, if the test has been passed, it will return 0. Otherwise it will return 1. You can use the printed message to locate failed operations and the reason of them.
Notice: you should start IoTDB server firstly and then run the test.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Pandas Export
pandas==1.2.3
# Testcontainer
testcontainers==3.3.0
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
-r requirements.txt
# Pytest to run tests
pytest==6.2.2
thrift==0.13.0
flake8==3.9.0
black==20.8b1
\ No newline at end of file
......@@ -21,16 +21,16 @@ import io
try:
with io.open('README.md', encoding='utf-8') as f:
with io.open("README.md", encoding="utf-8") as f:
long_description = f.read()
except FileNotFoundError:
long_description = ''
long_description = ""
print(long_description)
setuptools.setup(
name="apache-iotdb", # Replace with your own username
name="apache-iotdb", # Replace with your own username
version="0.12.0",
author=" Apache Software Foundation",
author_email="dev@iotdb.apache.org",
......@@ -40,8 +40,10 @@ setuptools.setup(
url="https://github.com/apache/iotdb",
packages=setuptools.find_packages(),
install_requires=[
'thrift>=0.13.0',
],
"thrift>=0.13.0",
"pandas>=1.0.0,<1.99.99",
"testcontainers>=2.0.0",
],
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
......@@ -49,7 +51,7 @@ setuptools.setup(
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
python_requires='>=3.7',
license='Apache License, Version 2.0',
website='https://iotdb.apache.org',
python_requires=">=3.7",
license="Apache License, Version 2.0",
website="https://iotdb.apache.org",
)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from iotdb.Session import Session
from iotdb.IoTDBContainer import IoTDBContainer
from numpy.testing import assert_array_equal
def test_simple_query():
with IoTDBContainer("apache/iotdb:0.11.2") as db:
db: IoTDBContainer
session = Session(db.get_container_host_ip(), db.get_exposed_port(6667))
session.open(False)
# Write data
session.insert_str_record("root.device", 123, "pressure", "15.0")
# Read
session_data_set = session.execute_query_statement("SELECT * FROM root.*")
df = session_data_set.todf()
session.close()
assert list(df.columns) == ["Time", "root.device.pressure"]
assert_array_equal(df.values, [[123.0, 15.0]])
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册