未验证 提交 d1c0cf90 编写于 作者: H Haonan 提交者: GitHub

Optimize python client package and release process (#11007)

上级 d5450a1e
...@@ -48,6 +48,9 @@ jobs: ...@@ -48,6 +48,9 @@ jobs:
docker images docker images
- name: Install IoTDB python client requirements - name: Install IoTDB python client requirements
run: pip3 install -r iotdb-client/client-py/requirements_dev.txt run: pip3 install -r iotdb-client/client-py/requirements_dev.txt
- name: Check code style
shell: bash
run: black iotdb-client/client-py/ --check --diff
- name: Integration test - name: Integration test
shell: bash shell: bash
run: | run: |
......
...@@ -119,14 +119,9 @@ classes/ ...@@ -119,14 +119,9 @@ classes/
Makefile Makefile
**/CMakeFiles/ **/CMakeFiles/
### cluster test data
node1/
node2/
node3/
# Exclude copied license # Exclude copied license
/client-py/LICENSE iotdb-client/client-py/LICENSE
/mlnode/LICENSE iotdb-core/mlnode/LICENSE
# ANTLR # ANTLR
iotdb-core/antlr/gen/ iotdb-core/antlr/gen/
......
...@@ -3,3 +3,5 @@ ...@@ -3,3 +3,5 @@
/build/ /build/
/dist/ /dist/
/apache_iotdb.egg-info /apache_iotdb.egg-info
/setup.py
!/resources/setup/py
...@@ -45,13 +45,13 @@ def prepare_data(): ...@@ -45,13 +45,13 @@ def prepare_data():
"root.test.d1.s0", "root.test.d1.s0",
"root.test.d1.s1", "root.test.d1.s1",
"root.test.d1.s2", "root.test.d1.s2",
"root.test.d1.s3" "root.test.d1.s3",
] ]
data_type_lst = [ data_type_lst = [
TSDataType.BOOLEAN, TSDataType.BOOLEAN,
TSDataType.INT32, TSDataType.INT32,
TSDataType.INT64, TSDataType.INT64,
TSDataType.FLOAT TSDataType.FLOAT,
] ]
encoding_lst = [TSEncoding.PLAIN for _ in range(len(data_type_lst))] encoding_lst = [TSEncoding.PLAIN for _ in range(len(data_type_lst))]
compressor_lst = [Compressor.SNAPPY for _ in range(len(data_type_lst))] compressor_lst = [Compressor.SNAPPY for _ in range(len(data_type_lst))]
...@@ -76,11 +76,7 @@ def insert_data(num: int): ...@@ -76,11 +76,7 @@ def insert_data(num: int):
device = ["root.test.d" + str(num)] device = ["root.test.d" + str(num)]
measurements = ["s0", "s1", "s2"] measurements = ["s0", "s1", "s2"]
values = [False, 10, 11] values = [False, 10, 11]
data_types = [ data_types = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64]
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64
]
# Get a session from the pool # Get a session from the pool
session = session_pool.get_session() session = session_pool.get_session()
session.insert_records(device, [1], [measurements], [data_types], [values]) session.insert_records(device, [1], [measurements], [data_types], [values])
...@@ -119,9 +115,14 @@ ip = "127.0.0.1" ...@@ -119,9 +115,14 @@ ip = "127.0.0.1"
port = "6667" port = "6667"
username = "root" username = "root"
password = "root" password = "root"
pool_config = PoolConfig(node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], user_name=username, pool_config = PoolConfig(
password=password, fetch_size=1024, node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
time_zone="UTC+8", max_retry=3) user_name=username,
password=password,
fetch_size=1024,
time_zone="UTC+8",
max_retry=3,
)
max_pool_size = 5 max_pool_size = 5
wait_timeout_in_ms = 3000 wait_timeout_in_ms = 3000
......
...@@ -42,11 +42,12 @@ class IoTDBContainer(DockerContainer): ...@@ -42,11 +42,12 @@ class IoTDBContainer(DockerContainer):
self.get_container_host_ip(), self.get_exposed_port(6667), "root", "root" self.get_container_host_ip(), self.get_exposed_port(6667), "root", "root"
) )
session.open(False) session.open(False)
with session.execute_statement( with session.execute_statement("SHOW CLUSTER") as session_data_set:
"SHOW CLUSTER"
) as session_data_set:
while session_data_set.has_next(): while session_data_set.has_next():
if session_data_set.next().get_fields()[2].get_string_value() != "Running": if (
session_data_set.next().get_fields()[2].get_string_value()
!= "Running"
):
raise ContainerStartException("IoTDB is not started") raise ContainerStartException("IoTDB is not started")
session.close() session.close()
......
...@@ -31,15 +31,25 @@ logger = logging.getLogger("IoTDB") ...@@ -31,15 +31,25 @@ logger = logging.getLogger("IoTDB")
class PoolConfig(object): class PoolConfig(object):
def __init__(self, host: str = None, port: str = None, user_name: str = None, password: str = None, def __init__(
node_urls: list = None, self,
fetch_size: int = DEFAULT_FETCH_SIZE, time_zone: str = DEFAULT_TIME_ZONE, host: str = None,
max_retry: int = DEFAULT_MAX_RETRY, enable_compression: bool = False): port: str = None,
user_name: str = None,
password: str = None,
node_urls: list = None,
fetch_size: int = DEFAULT_FETCH_SIZE,
time_zone: str = DEFAULT_TIME_ZONE,
max_retry: int = DEFAULT_MAX_RETRY,
enable_compression: bool = False,
):
self.host = host self.host = host
self.port = port self.port = port
if node_urls is None: if node_urls is None:
if host is None or port is None: if host is None or port is None:
raise ValueError("(host,port) and node_urls cannot be None at the same time.") raise ValueError(
"(host,port) and node_urls cannot be None at the same time."
)
node_urls = [] node_urls = []
self.node_urls = node_urls self.node_urls = node_urls
self.user_name = user_name self.user_name = user_name
...@@ -51,8 +61,9 @@ class PoolConfig(object): ...@@ -51,8 +61,9 @@ class PoolConfig(object):
class SessionPool(object): class SessionPool(object):
def __init__(
def __init__(self, pool_config: PoolConfig, max_pool_size: int, wait_timeout_in_ms: int): self, pool_config: PoolConfig, max_pool_size: int, wait_timeout_in_ms: int
):
self.__pool_config = pool_config self.__pool_config = pool_config
self.__max_pool_size = max_pool_size self.__max_pool_size = max_pool_size
self.__wait_timeout_in_ms = wait_timeout_in_ms / 1000 self.__wait_timeout_in_ms = wait_timeout_in_ms / 1000
...@@ -63,13 +74,23 @@ class SessionPool(object): ...@@ -63,13 +74,23 @@ class SessionPool(object):
def __construct_session(self) -> Session: def __construct_session(self) -> Session:
if len(self.__pool_config.node_urls) > 0: if len(self.__pool_config.node_urls) > 0:
session = Session.init_from_node_urls(self.__pool_config.node_urls, self.__pool_config.user_name, session = Session.init_from_node_urls(
self.__pool_config.password, self.__pool_config.fetch_size, self.__pool_config.node_urls,
self.__pool_config.time_zone) self.__pool_config.user_name,
self.__pool_config.password,
self.__pool_config.fetch_size,
self.__pool_config.time_zone,
)
else: else:
session = Session(self.__pool_config.host, self.__pool_config.port, self.__pool_config.user_name, session = Session(
self.__pool_config.password, self.__pool_config.fetch_size, self.__pool_config.time_zone) self.__pool_config.host,
self.__pool_config.port,
self.__pool_config.user_name,
self.__pool_config.password,
self.__pool_config.fetch_size,
self.__pool_config.time_zone,
)
session.open(self.__pool_config.enable_compression) session.open(self.__pool_config.enable_compression)
return session return session
...@@ -97,8 +118,11 @@ class SessionPool(object): ...@@ -97,8 +118,11 @@ class SessionPool(object):
break break
else: else:
if time.time() - start > self.__wait_timeout_in_ms: if time.time() - start > self.__wait_timeout_in_ms:
raise TimeoutError("Wait to get session timeout in SessionPool, current pool size: {0}" raise TimeoutError(
.format(self.__max_pool_size)) "Wait to get session timeout in SessionPool, current pool size: {0}".format(
self.__max_pool_size
)
)
time.sleep(1) time.sleep(1)
session = self.__poll_session() session = self.__poll_session()
...@@ -115,7 +139,9 @@ class SessionPool(object): ...@@ -115,7 +139,9 @@ class SessionPool(object):
def put_back(self, session: Session): def put_back(self, session: Session):
if self.__closed: if self.__closed:
raise ConnectionError("SessionPool has already been closed, please close the session manually.") raise ConnectionError(
"SessionPool has already been closed, please close the session manually."
)
if session.is_open(): if session.is_open():
self.__queue.put(session) self.__queue.put(session)
...@@ -134,7 +160,9 @@ class SessionPool(object): ...@@ -134,7 +160,9 @@ class SessionPool(object):
logger.info("SessionPool has been closed successfully.") logger.info("SessionPool has been closed successfully.")
def create_session_pool(pool_config: PoolConfig, max_pool_size: int, wait_timeout_in_ms: int) -> SessionPool: def create_session_pool(
pool_config: PoolConfig, max_pool_size: int, wait_timeout_in_ms: int
) -> SessionPool:
if max_pool_size <= 0: if max_pool_size <= 0:
max_pool_size = multiprocessing.cpu_count() * DEFAULT_MULTIPIE max_pool_size = multiprocessing.cpu_count() * DEFAULT_MULTIPIE
return SessionPool(pool_config, max_pool_size, wait_timeout_in_ms) return SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
...@@ -38,11 +38,10 @@ ...@@ -38,11 +38,10 @@
<skip>true</skip> <skip>true</skip>
</configuration> </configuration>
</plugin> </plugin>
<!-- clean thrift folder --> <!-- clean thrift and old build folder -->
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId> <artifactId>maven-clean-plugin</artifactId>
<version>2.4.1</version>
<configuration> <configuration>
<filesets> <filesets>
<fileset> <fileset>
...@@ -52,6 +51,15 @@ ...@@ -52,6 +51,15 @@
</includes> </includes>
<followSymlinks>false</followSymlinks> <followSymlinks>false</followSymlinks>
</fileset> </fileset>
<fileset>
<directory>apache_iotdb.egg-info</directory>
</fileset>
<fileset>
<directory>build</directory>
</fileset>
<fileset>
<directory>dist</directory>
</fileset>
<fileset> <fileset>
<directory>./</directory> <directory>./</directory>
<includes> <includes>
...@@ -114,9 +122,59 @@ ...@@ -114,9 +122,59 @@
</resources> </resources>
</configuration> </configuration>
</execution> </execution>
<execution>
<id>copy-setup-file-resources</id>
<!-- here the phase you need -->
<phase>generate-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<encoding>utf-8</encoding>
<outputDirectory>${basedir}/</outputDirectory>
<resources>
<resource>
<directory>${basedir}/target/classes</directory>
<includes>
<include>setup.py</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.4.0</version>
<executions>
<execution>
<id>write-python-version</id>
<goals>
<goal>regex-property</goal>
</goals>
<phase>generate-resources</phase>
<configuration>
<name>python_version</name>
<regex>-SNAPSHOT</regex>
<value>${project.version}</value>
<replacement>\.dev0</replacement>
<failIfNoMatch>false</failIfNoMatch>
</configuration>
</execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
<resources>
<resource>
<directory>${basedir}/resources</directory>
<includes>
<include>setup.py</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
</build> </build>
<dependencies> <dependencies>
<dependency> <dependency>
......
#!/usr/bin/env bash #!/bin/bash
# #
# #
# Licensed to the Apache Software Foundation (ASF) under one # Licensed to the Apache Software Foundation (ASF) under one
...@@ -20,21 +20,22 @@ ...@@ -20,21 +20,22 @@
# #
# the python version must be python3. # the python version must be python3.
python --version python3 --version
rm -Rf build rm -Rf build
rm -Rf dist rm -Rf dist
rm -Rf iotdb_session.egg_info rm -Rf iotdb_session.egg_info
# (Re-)build generated code # (Re-)build generated code
(cd ..; mvn clean generate-sources -pl client-py -am) (cd ../..; mvn clean package -pl iotdb-client/client-py -am)
# Run Linting
flake8
# Run unit tests # Run unit tests
pytest . if [ "$1" == "test" ]; then
pytest .
fi
# See https://packaging.python.org/tutorials/packaging-projects/ # See https://packaging.python.org/tutorials/packaging-projects/
python setup.py sdist bdist_wheel python3 setup.py sdist bdist_wheel
twine upload --repository pypi dist/* if [ "$1" == "release" ]; then
\ No newline at end of file python3 -m twine upload dist/*
fi
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
# Pytest to run tests # Pytest to run tests
pytest==7.2.0 pytest==7.2.0
flake8==3.9.0 flake8==3.9.0
black==20.8b1 black==22.3.0
# Testcontainer # Testcontainer
testcontainers==3.4.2 testcontainers==3.4.2
# For releases # For releases
......
...@@ -31,7 +31,7 @@ print(long_description) ...@@ -31,7 +31,7 @@ print(long_description)
setuptools.setup( setuptools.setup(
name="apache-iotdb", # Replace with your own username name="apache-iotdb", # Replace with your own username
version="1.0.0", version="${python_version}",
author=" Apache Software Foundation", author=" Apache Software Foundation",
author_email="dev@iotdb.apache.org", author_email="dev@iotdb.apache.org",
description="Apache IoTDB client API", description="Apache IoTDB client API",
......
...@@ -59,8 +59,16 @@ def session_test(use_session_pool=False): ...@@ -59,8 +59,16 @@ def session_test(use_session_pool=False):
db: IoTDBContainer db: IoTDBContainer
if use_session_pool: if use_session_pool:
pool_config = PoolConfig(db.get_container_host_ip(), db.get_exposed_port(6667), "root", "root", None, 1024, pool_config = PoolConfig(
"Asia/Shanghai", 3) db.get_container_host_ip(),
db.get_exposed_port(6667),
"root",
"root",
None,
1024,
"Asia/Shanghai",
3,
)
session_pool = create_session_pool(pool_config, 1, 3000) session_pool = create_session_pool(pool_config, 1, 3000)
session = session_pool.get_session() session = session_pool.get_session()
else: else:
...@@ -170,14 +178,14 @@ def session_test(use_session_pool=False): ...@@ -170,14 +178,14 @@ def session_test(use_session_pool=False):
# delete time series # delete time series
if ( if (
session.delete_time_series( session.delete_time_series(
[ [
"root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09", "root.sg_test_01.d_01.s_09",
] ]
) )
< 0 < 0
): ):
test_fail() test_fail()
print_message("delete time series failed") print_message("delete time series failed")
...@@ -213,10 +221,10 @@ def session_test(use_session_pool=False): ...@@ -213,10 +221,10 @@ def session_test(use_session_pool=False):
TSDataType.TEXT, TSDataType.TEXT,
] ]
if ( if (
session.insert_record( session.insert_record(
"root.sg_test_01.d_01", 1, measurements_, data_types_, values_ "root.sg_test_01.d_01", 1, measurements_, data_types_, values_
) )
< 0 < 0
): ):
test_fail() test_fail()
print_message("insert record failed") print_message("insert record failed")
...@@ -233,10 +241,10 @@ def session_test(use_session_pool=False): ...@@ -233,10 +241,10 @@ def session_test(use_session_pool=False):
data_type_list_ = [data_types_, data_types_] data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_02"] device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_02"]
if ( if (
session.insert_records( session.insert_records(
device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
) )
< 0 < 0
): ):
test_fail() test_fail()
print_message("insert records failed") print_message("insert records failed")
...@@ -353,14 +361,14 @@ def session_test(use_session_pool=False): ...@@ -353,14 +361,14 @@ def session_test(use_session_pool=False):
values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
if ( if (
session.insert_records_of_one_device( session.insert_records_of_one_device(
"root.sg_test_01.d_01", "root.sg_test_01.d_01",
time_list, time_list,
measurements_list, measurements_list,
data_types_list, data_types_list,
values_list, values_list,
) )
< 0 < 0
): ):
test_fail() test_fail()
print_message("insert records of one device failed") print_message("insert records of one device failed")
......
...@@ -27,8 +27,16 @@ def test_session_pool(): ...@@ -27,8 +27,16 @@ def test_session_pool():
with IoTDBContainer(CONTAINER_NAME) as db: with IoTDBContainer(CONTAINER_NAME) as db:
db: IoTDBContainer db: IoTDBContainer
max_pool_size = 2 max_pool_size = 2
pool_config = PoolConfig(db.get_container_host_ip(), db.get_exposed_port(6667), "root", "root", pool_config = PoolConfig(
[], 1024, "Asia/Shanghai", 3) db.get_container_host_ip(),
db.get_exposed_port(6667),
"root",
"root",
[],
1024,
"Asia/Shanghai",
3,
)
session_pool = create_session_pool(pool_config, max_pool_size, 3000) session_pool = create_session_pool(pool_config, max_pool_size, 3000)
session = session_pool.get_session() session = session_pool.get_session()
assert session.is_open() is True assert session.is_open() is True
...@@ -39,7 +47,11 @@ def test_session_pool(): ...@@ -39,7 +47,11 @@ def test_session_pool():
session_pool.get_session() session_pool.get_session()
except TimeoutError as e: except TimeoutError as e:
timeout = True timeout = True
assert str(e) == "Wait to get session timeout in SessionPool, current pool size: " + str(max_pool_size) assert str(
e
) == "Wait to get session timeout in SessionPool, current pool size: " + str(
max_pool_size
)
assert timeout is True assert timeout is True
Thread(target=lambda: session_pool.put_back(session2)).start() Thread(target=lambda: session_pool.put_back(session2)).start()
...@@ -60,7 +72,10 @@ def test_session_pool(): ...@@ -60,7 +72,10 @@ def test_session_pool():
session_pool.put_back(session3) session_pool.put_back(session3)
except ConnectionError as e: except ConnectionError as e:
is_closed = True is_closed = True
assert str(e) == "SessionPool has already been closed, please close the session manually." assert (
str(e)
== "SessionPool has already been closed, please close the session manually."
)
assert is_closed is True assert is_closed is True
...@@ -69,7 +84,9 @@ def test_session_pool_by_node_urls(): ...@@ -69,7 +84,9 @@ def test_session_pool_by_node_urls():
db: IoTDBContainer db: IoTDBContainer
node_url = db.get_container_host_ip() + ":" + str(db.get_exposed_port(6667)) node_url = db.get_container_host_ip() + ":" + str(db.get_exposed_port(6667))
max_pool_size = 1 max_pool_size = 1
pool_config = PoolConfig(node_urls=[node_url], user_name="root", password="root") pool_config = PoolConfig(
node_urls=[node_url], user_name="root", password="root"
)
session_pool = create_session_pool(pool_config, max_pool_size, 3000) session_pool = create_session_pool(pool_config, max_pool_size, 3000)
session = session_pool.get_session() session = session_pool.get_session()
...@@ -79,7 +96,11 @@ def test_session_pool_by_node_urls(): ...@@ -79,7 +96,11 @@ def test_session_pool_by_node_urls():
session_pool.get_session() session_pool.get_session()
except TimeoutError as e: except TimeoutError as e:
timeout = True timeout = True
assert str(e) == "Wait to get session timeout in SessionPool, current pool size: " + str(max_pool_size) assert str(
e
) == "Wait to get session timeout in SessionPool, current pool size: " + str(
max_pool_size
)
assert timeout is True assert timeout is True
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册