未验证 提交 2c85f9cd 编写于 作者: H Haonan 提交者: GitHub

[IOTDB-2839] Add Python client CI (#5407)

上级 90e381d7
......@@ -3,7 +3,7 @@
# CPP compiling is too slow, so let's do it in parallel with testing other modules.
# As there is no Java client, we just use one JDK.
name: Clients_except_Java CI with Maven
name: C++ Client
on:
push:
......
# This workflow is just for checking whether modifications works for the Python client.
name: Python Client
on:
push:
branches:
- master
- 'rel/*'
- "new_*"
paths-ignore:
- 'docs/**'
pull_request:
branches:
- master
- 'rel/*'
- "new_*"
paths-ignore:
- 'docs/**'
# allow manually run the action:
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
jobs:
unix:
strategy:
fail-fast: false
max-parallel: 20
matrix:
java: [ 11 ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os}}
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Cache Maven packages
uses: actions/cache@v2
with:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2-
- name: Build IoTDB server distribution zip and python client
run: mvn -B clean install -pl distribution,client-py -am -DskipTests
- name: Build IoTDB server docker image
run: |
docker build . -f docker/src/main/Dockerfile-single -t "iotdb:dev"
docker images
- name: Install IoTDB python client requirements
run: pip3 install -r client-py/requirements_dev.txt
- name: Integration test
shell: bash
run: |
cd client-py && pytest .
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Uncomment the following line to use apache-iotdb module installed by pip3
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.Tablet import Tablet
# whether the test has passed
final_flag = True
failed_count = 0
def test_fail():
global failed_count
global final_flag
final_flag = False
failed_count += 1
def print_message(message):
print("*********")
print(message)
print("*********")
# creating session connection.
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
session.open(False)
if not session.is_open():
print("can't open session")
exit(1)
# set and delete storage groups
session.set_storage_group("root.sg_test_01")
session.set_storage_group("root.sg_test_02")
session.set_storage_group("root.sg_test_03")
session.set_storage_group("root.sg_test_04")
if session.delete_storage_group("root.sg_test_02") < 0:
test_fail()
print_message("delete storage group failed")
if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
test_fail()
print_message("delete storage groups failed")
# setting aligned time series.
measurements_lst_ = [
"s_01",
"s_02",
"s_03",
]
data_type_lst_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_aligned_time_series(
"root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
# setting more aligned time series once.
measurements_lst_ = [
"s_04",
"s_05",
"s_06",
"s_07",
"s_08",
"s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_aligned_time_series(
"root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
# delete time series
if (
session.delete_time_series(
[
"root.sg_test_01.d_02.s_07",
"root.sg_test_01.d_02.s_08",
"root.sg_test_01.d_02.s_09",
]
)
< 0
):
test_fail()
print_message("delete time series failed")
# checking time series
# s_07 expecting False
if session.check_time_series_exists("root.sg_test_01.d_02.s_07"):
test_fail()
print_message("root.sg_test_01.d_02.s_07 shouldn't exist")
# s_03 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_02.s_03"):
test_fail()
print_message("root.sg_test_01.d_02.s_03 should exist")
# insert one record into the database.
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
if (
session.insert_aligned_record(
"root.sg_test_01.d_02", 1, measurements_, data_types_, values_
)
< 0
):
test_fail()
print_message("insert record failed")
# insert multiple records into database
measurements_list_ = [
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
]
values_list_ = [
[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"],
]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"]
if (
session.insert_aligned_records(
device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
)
< 0
):
test_fail()
print_message("insert records failed")
# insert one tablet into the database.
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet(
"root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
)
if session.insert_aligned_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet failed")
# insert multiple tablets into database
tablet_01 = Tablet(
"root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11]
)
tablet_02 = Tablet(
"root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15]
)
if session.insert_aligned_tablets([tablet_01, tablet_02]) < 0:
test_fail()
print_message("insert tablets failed")
# insert one tablet with empty cells into the database.
values_ = [
[None, 10, 11, 1.1, 10011.1, "test01"],
[True, None, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, None, 688.25, "test03"],
[True, 0, 0, 0, None, None],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [20, 21, 22, 23]
tablet_ = Tablet(
"root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
)
if session.insert_aligned_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet with empty cells failed")
# insert records of one device
time_list = [1, 2, 3]
measurements_list = [
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
]
data_types_list = [
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
]
values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
if (
session.insert_aligned_records_of_one_device(
"root.sg_test_01.d_02",
time_list,
measurements_list,
data_types_list,
values_list,
)
< 0
):
test_fail()
print_message("insert records of one device failed")
# execute non-query sql statement
if (
session.execute_non_query_statement(
"insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)"
)
< 0
):
test_fail()
print_message(
"execute 'insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)' failed"
)
# execute sql query statement
session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_02")
session_data_set.set_fetch_size(1024)
expect_count = 20
actual_count = 0
while session_data_set.has_next():
print(session_data_set.next())
actual_count += 1
session_data_set.close_operation_handle()
if actual_count != expect_count:
test_fail()
print_message(
"query count mismatch: expect count: "
+ str(expect_count)
+ " actual count: "
+ str(actual_count)
)
# close session connection.
session.close()
if final_flag:
print("All executions done!!")
else:
print("Some test failed, please have a check")
print("failed count: ", failed_count)
exit(1)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Uncomment the following line to use apache-iotdb module installed by pip3
import numpy as np
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.NumpyTablet import NumpyTablet
from iotdb.utils.Tablet import Tablet
# whether the test has passed
final_flag = True
failed_count = 0
def test_fail():
global failed_count
global final_flag
final_flag = False
failed_count += 1
def print_message(message):
print("*********")
print(message)
print("*********")
# creating session connection.
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
session.open(False)
if not session.is_open():
print("can't open session")
exit(1)
# set and delete storage groups
session.set_storage_group("root.sg_test_01")
session.set_storage_group("root.sg_test_02")
session.set_storage_group("root.sg_test_03")
session.set_storage_group("root.sg_test_04")
if session.delete_storage_group("root.sg_test_02") < 0:
test_fail()
print_message("delete storage group failed")
if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
test_fail()
print_message("delete storage groups failed")
# setting time series.
session.create_time_series(
"root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_02.s_01",
TSDataType.BOOLEAN,
TSEncoding.PLAIN,
Compressor.SNAPPY,
None,
{"tag1": "v1"},
{"description": "v1"},
"temperature"
)
# setting multiple time series once.
ts_path_lst_ = [
"root.sg_test_01.d_01.s_04",
"root.sg_test_01.d_01.s_05",
"root.sg_test_01.d_01.s_06",
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_multi_time_series(
ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
ts_path_lst_ = [
"root.sg_test_01.d_02.s_04",
"root.sg_test_01.d_02.s_05",
"root.sg_test_01.d_02.s_06",
"root.sg_test_01.d_02.s_07",
"root.sg_test_01.d_02.s_08",
"root.sg_test_01.d_02.s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
session.create_multi_time_series(
ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_, None, tags_lst_, attributes_lst_, None
)
# delete time series
if (
session.delete_time_series(
[
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
)
< 0
):
test_fail()
print_message("delete time series failed")
# checking time series
# s_07 expecting False
if session.check_time_series_exists("root.sg_test_01.d_01.s_07"):
test_fail()
print_message("root.sg_test_01.d_01.s_07 shouldn't exist")
# s_03 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_01.s_03"):
test_fail()
print_message("root.sg_test_01.d_01.s_03 should exist")
# d_02.s_01 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_02.s_01"):
test_fail()
print_message("root.sg_test_01.d_02.s_01 should exist")
# d_02.s_06 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_02.s_06"):
test_fail()
print_message("root.sg_test_01.d_02.s_06 should exist")
# insert one record into the database.
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
if (
session.insert_record(
"root.sg_test_01.d_01", 1, measurements_, data_types_, values_
)
< 0
):
test_fail()
print_message("insert record failed")
# insert multiple records into database
measurements_list_ = [
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
]
values_list_ = [
[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"],
]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
if (
session.insert_records(
device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
)
< 0
):
test_fail()
print_message("insert records failed")
# insert one tablet into the database.
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
)
if session.insert_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet failed")
# insert one numpy tablet into the database.
np_values_ = [
np.array([False, True, False, True], np.dtype('>?')),
np.array([10, 100, 100, 0], np.dtype('>i4')),
np.array([11, 11111, 1, 0], np.dtype('>i8')),
np.array([1.1, 1.25, 188.1, 0], np.dtype('>f4')),
np.array([10011.1, 101.0, 688.25, 6.25], np.dtype('>f8')),
np.array(["test01", "test02", "test03", "test04"]),
]
np_timestamps_ = np.array([1, 2, 3, 4], np.dtype('>i8'))
np_tablet_ = NumpyTablet(
"root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
)
if session.insert_tablet(np_tablet_) < 0:
test_fail()
print_message("insert numpy tablet failed")
# insert multiple tablets into database
tablet_01 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
)
tablet_02 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
)
if session.insert_tablets([tablet_01, tablet_02]) < 0:
test_fail()
print_message("insert tablets failed")
# insert one tablet with empty cells into the database.
values_ = [
[None, 10, 11, 1.1, 10011.1, "test01"],
[True, None, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, None, 688.25, "test03"],
[True, 0, 0, 0, None, None],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [20, 21, 22, 23]
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
)
if session.insert_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet with empty cells failed")
# insert records of one device
time_list = [1, 2, 3]
measurements_list = [
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
]
data_types_list = [
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
]
values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
if (
session.insert_records_of_one_device(
"root.sg_test_01.d_01",
time_list,
measurements_list,
data_types_list,
values_list,
)
< 0
):
test_fail()
print_message("insert records of one device failed")
# execute non-query sql statement
if (
session.execute_non_query_statement(
"insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
)
< 0
):
test_fail()
print_message(
"execute 'insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)' failed"
)
# execute sql query statement
session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
session_data_set.set_fetch_size(1024)
expect_count = 20
actual_count = 0
while session_data_set.has_next():
print(session_data_set.next())
actual_count += 1
session_data_set.close_operation_handle()
if actual_count != expect_count:
test_fail()
print_message(
"query count mismatch: expect count: "
+ str(expect_count)
+ " actual count: "
+ str(actual_count)
)
# close session connection.
session.close()
if final_flag:
print("All executions done!!")
else:
print("Some test failed, please have a check")
print("failed count: ", failed_count)
exit(1)
......@@ -19,7 +19,7 @@
-r requirements.txt
# Pytest to run tests
pytest==6.2.2
thrift==0.13.0
thrift==0.14.1
flake8==3.9.0
black==20.8b1
# For releases
......
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Uncomment the following line to use apache-iotdb module installed by pip3
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.Tablet import Tablet
from iotdb.IoTDBContainer import IoTDBContainer
# whether the test has passed
final_flag = True
failed_count = 0
def test_fail():
global failed_count
global final_flag
final_flag = False
failed_count += 1
def print_message(message):
print("*********")
print(message)
print("*********")
def test_aligned_timeseries():
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(db.get_container_host_ip(), db.get_exposed_port(6667))
session.open(False)
if not session.is_open():
print("can't open session")
exit(1)
# set and delete storage groups
session.set_storage_group("root.sg_test_01")
session.set_storage_group("root.sg_test_02")
session.set_storage_group("root.sg_test_03")
session.set_storage_group("root.sg_test_04")
if session.delete_storage_group("root.sg_test_02") < 0:
test_fail()
print_message("delete storage group failed")
if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
test_fail()
print_message("delete storage groups failed")
# setting aligned time series.
measurements_lst_ = [
"s_01",
"s_02",
"s_03",
]
data_type_lst_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_aligned_time_series(
"root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
# setting more aligned time series once.
measurements_lst_ = [
"s_04",
"s_05",
"s_06",
"s_07",
"s_08",
"s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_aligned_time_series(
"root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
# delete time series
if (
session.delete_time_series(
[
"root.sg_test_01.d_02.s_07",
"root.sg_test_01.d_02.s_08",
"root.sg_test_01.d_02.s_09",
]
)
< 0
):
test_fail()
print_message("delete time series failed")
# checking time series
# s_07 expecting False
if session.check_time_series_exists("root.sg_test_01.d_02.s_07"):
test_fail()
print_message("root.sg_test_01.d_02.s_07 shouldn't exist")
# s_03 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_02.s_03"):
test_fail()
print_message("root.sg_test_01.d_02.s_03 should exist")
# insert one record into the database.
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
if (
session.insert_aligned_record(
"root.sg_test_01.d_02", 1, measurements_, data_types_, values_
)
< 0
):
test_fail()
print_message("insert record failed")
# insert multiple records into database
measurements_list_ = [
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
]
values_list_ = [
[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"],
]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"]
if (
session.insert_aligned_records(
device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
)
< 0
):
test_fail()
print_message("insert records failed")
# insert one tablet into the database.
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet(
"root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
)
if session.insert_aligned_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet failed")
# insert multiple tablets into database
tablet_01 = Tablet(
"root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11]
)
tablet_02 = Tablet(
"root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15]
)
if session.insert_aligned_tablets([tablet_01, tablet_02]) < 0:
test_fail()
print_message("insert tablets failed")
# insert one tablet with empty cells into the database.
values_ = [
[None, 10, 11, 1.1, 10011.1, "test01"],
[True, None, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, None, 688.25, "test03"],
[True, 0, 0, 0, None, None],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [20, 21, 22, 23]
tablet_ = Tablet(
"root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_
)
if session.insert_aligned_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet with empty cells failed")
# insert records of one device
time_list = [1, 2, 3]
measurements_list = [
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
]
data_types_list = [
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
]
values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
if (
session.insert_aligned_records_of_one_device(
"root.sg_test_01.d_02",
time_list,
measurements_list,
data_types_list,
values_list,
)
< 0
):
test_fail()
print_message("insert records of one device failed")
# execute non-query sql statement
if (
session.execute_non_query_statement(
"insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)"
)
< 0
):
test_fail()
print_message(
"execute 'insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)' failed"
)
# execute sql query statement
session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_02")
session_data_set.set_fetch_size(1024)
expect_count = 20
actual_count = 0
while session_data_set.has_next():
print(session_data_set.next())
actual_count += 1
session_data_set.close_operation_handle()
if actual_count != expect_count:
test_fail()
print_message(
"query count mismatch: expect count: "
+ str(expect_count)
+ " actual count: "
+ str(actual_count)
)
# close session connection.
session.close()
if final_flag:
print("All executions done!!")
else:
print("Some test failed, please have a check")
print("failed count: ", failed_count)
exit(1)
......@@ -23,7 +23,7 @@ from numpy.testing import assert_array_equal
def test_simple_query():
with IoTDBContainer("apache/iotdb:0.11.2") as db:
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(db.get_container_host_ip(), db.get_exposed_port(6667))
session.open(False)
......@@ -32,7 +32,7 @@ def test_simple_query():
session.insert_str_record("root.device", 123, "pressure", "15.0")
# Read
session_data_set = session.execute_query_statement("SELECT * FROM root.*")
session_data_set = session.execute_query_statement("SELECT ** FROM root")
df = session_data_set.todf()
session.close()
......@@ -42,7 +42,7 @@ def test_simple_query():
def test_non_time_query():
with IoTDBContainer("apache/iotdb:0.11.2") as db:
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(db.get_container_host_ip(), db.get_exposed_port(6667))
session.open(False)
......
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Uncomment the following line to use apache-iotdb module installed by pip3
import numpy as np
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.NumpyTablet import NumpyTablet
from iotdb.utils.Tablet import Tablet
from iotdb.IoTDBContainer import IoTDBContainer
# whether the test has passed
final_flag = True
failed_count = 0
def test_fail():
global failed_count
global final_flag
final_flag = False
failed_count += 1
def print_message(message):
print("*********")
print(message)
print("*********")
def test_session():
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(db.get_container_host_ip(), db.get_exposed_port(6667))
session.open(False)
if not session.is_open():
print("can't open session")
exit(1)
# set and delete storage groups
session.set_storage_group("root.sg_test_01")
session.set_storage_group("root.sg_test_02")
session.set_storage_group("root.sg_test_03")
session.set_storage_group("root.sg_test_04")
if session.delete_storage_group("root.sg_test_02") < 0:
test_fail()
print_message("delete storage group failed")
if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
test_fail()
print_message("delete storage groups failed")
# setting time series.
session.create_time_series(
"root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_02.s_01",
TSDataType.BOOLEAN,
TSEncoding.PLAIN,
Compressor.SNAPPY,
None,
{"tag1": "v1"},
{"description": "v1"},
"temperature"
)
# setting multiple time series once.
ts_path_lst_ = [
"root.sg_test_01.d_01.s_04",
"root.sg_test_01.d_01.s_05",
"root.sg_test_01.d_01.s_06",
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_multi_time_series(
ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
ts_path_lst_ = [
"root.sg_test_01.d_02.s_04",
"root.sg_test_01.d_02.s_05",
"root.sg_test_01.d_02.s_06",
"root.sg_test_01.d_02.s_07",
"root.sg_test_01.d_02.s_08",
"root.sg_test_01.d_02.s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
session.create_multi_time_series(
ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_, None, tags_lst_, attributes_lst_, None
)
# delete time series
if (
session.delete_time_series(
[
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
)
< 0
):
test_fail()
print_message("delete time series failed")
# checking time series
# s_07 expecting False
if session.check_time_series_exists("root.sg_test_01.d_01.s_07"):
test_fail()
print_message("root.sg_test_01.d_01.s_07 shouldn't exist")
# s_03 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_01.s_03"):
test_fail()
print_message("root.sg_test_01.d_01.s_03 should exist")
# d_02.s_01 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_02.s_01"):
test_fail()
print_message("root.sg_test_01.d_02.s_01 should exist")
# d_02.s_06 expecting True
if not session.check_time_series_exists("root.sg_test_01.d_02.s_06"):
test_fail()
print_message("root.sg_test_01.d_02.s_06 should exist")
# insert one record into the database.
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
if (
session.insert_record(
"root.sg_test_01.d_01", 1, measurements_, data_types_, values_
)
< 0
):
test_fail()
print_message("insert record failed")
# insert multiple records into database
measurements_list_ = [
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
]
values_list_ = [
[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"],
]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
if (
session.insert_records(
device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
)
< 0
):
test_fail()
print_message("insert records failed")
# insert one tablet into the database.
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
)
if session.insert_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet failed")
# insert one numpy tablet into the database.
np_values_ = [
np.array([False, True, False, True], np.dtype('>?')),
np.array([10, 100, 100, 0], np.dtype('>i4')),
np.array([11, 11111, 1, 0], np.dtype('>i8')),
np.array([1.1, 1.25, 188.1, 0], np.dtype('>f4')),
np.array([10011.1, 101.0, 688.25, 6.25], np.dtype('>f8')),
np.array(["test01", "test02", "test03", "test04"]),
]
np_timestamps_ = np.array([1, 2, 3, 4], np.dtype('>i8'))
np_tablet_ = NumpyTablet(
"root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
)
if session.insert_tablet(np_tablet_) < 0:
test_fail()
print_message("insert numpy tablet failed")
# insert multiple tablets into database
tablet_01 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
)
tablet_02 = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]
)
if session.insert_tablets([tablet_01, tablet_02]) < 0:
test_fail()
print_message("insert tablets failed")
# insert one tablet with empty cells into the database.
values_ = [
[None, 10, 11, 1.1, 10011.1, "test01"],
[True, None, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, None, 688.25, "test03"],
[True, 0, 0, 0, None, None],
] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [20, 21, 22, 23]
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
)
if session.insert_tablet(tablet_) < 0:
test_fail()
print_message("insert tablet with empty cells failed")
# insert records of one device
time_list = [1, 2, 3]
measurements_list = [
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
["s_01", "s_02", "s_03"],
]
data_types_list = [
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
[TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
]
values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
if (
session.insert_records_of_one_device(
"root.sg_test_01.d_01",
time_list,
measurements_list,
data_types_list,
values_list,
)
< 0
):
test_fail()
print_message("insert records of one device failed")
# execute non-query sql statement
if (
session.execute_non_query_statement(
"insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
)
< 0
):
test_fail()
print_message(
"execute 'insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)' failed"
)
# execute sql query statement
session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
session_data_set.set_fetch_size(1024)
expect_count = 20
actual_count = 0
while session_data_set.has_next():
print(session_data_set.next())
actual_count += 1
session_data_set.close_operation_handle()
if actual_count != expect_count:
test_fail()
print_message(
"query count mismatch: expect count: "
+ str(expect_count)
+ " actual count: "
+ str(actual_count)
)
# close session connection.
session.close()
if final_flag:
print("All executions done!!")
else:
print("Some test failed, please have a check")
print("failed count: ", failed_count)
exit(1)
......@@ -65,7 +65,7 @@ def create_ts(session):
def test_simple_query():
with IoTDBContainer() as db:
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(db.get_container_host_ip(), db.get_exposed_port(6667))
session.open(False)
......@@ -92,7 +92,7 @@ def test_simple_query():
df_input.insert(0, "Time", timestamps)
session_data_set = session.execute_query_statement("SELECT * FROM root.*")
session_data_set = session.execute_query_statement("SELECT ** FROM root")
df_output = session_data_set.todf()
df_output = df_output[df_input.columns.tolist()]
......@@ -101,7 +101,7 @@ def test_simple_query():
def test_with_null_query():
with IoTDBContainer() as db:
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(db.get_container_host_ip(), db.get_exposed_port(6667))
session.open(False)
......@@ -171,7 +171,7 @@ def test_with_null_query():
df_input.insert(0, "Time", timestamps)
session_data_set = session.execute_query_statement("SELECT * FROM root.*")
session_data_set = session.execute_query_statement("SELECT ** FROM root")
df_output = session_data_set.todf()
df_output = df_output[df_input.columns.tolist()]
......@@ -180,7 +180,7 @@ def test_with_null_query():
def test_multi_fetch():
with IoTDBContainer() as db:
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer
session = Session(db.get_container_host_ip(), db.get_exposed_port(6667))
session.open(False)
......@@ -207,7 +207,7 @@ def test_multi_fetch():
df_input.insert(0, "Time", timestamps)
session_data_set = session.execute_query_statement("SELECT * FROM root.*")
session_data_set = session.execute_query_statement("SELECT ** FROM root")
session_data_set.set_fetch_size(100)
df_output = session_data_set.todf()
df_output = df_output[df_input.columns.tolist()]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册