提交 83616537 编写于 作者: Z Zekun Li 提交者: Jialin Qiao

Added python session client.

上级 0d4492c0
此差异已折叠。
import sys
sys.path.append("./utils")
from IoTDBConstants import *
from Tablet import Tablet
from Session import Session
# creating session connection.
ip = "127.0.0.1"
port_ = "6667"
username_ = 'root'
password_ = 'root'
session = Session(ip, port_, username_, password_)
session.open(False)
# set and delete storage groups
session.set_storage_group("root.sg_test_01")
session.set_storage_group("root.sg_test_02")
session.set_storage_group("root.sg_test_03")
session.set_storage_group("root.sg_test_04")
session.delete_storage_group("root.sg_test_02")
session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
# setting time series.
session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY)
session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY)
session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY)
# setting multiple time series once.
ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06",
"root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]
data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT,
TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_)
# delete time series
session.delete_time_series(["root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"])
# checking time series
print("s_07 expecting False, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_07"))
print("s_03 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_03"))
# insert one record into the database.
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64,
TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
# insert multiple records into database
measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]]
values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, "test_records02"]]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
session.insert_records(device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_)
# insert one tablet into the database.
values_ = [[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"]] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_)
session.insert_tablet(tablet_)
# insert multiple tablets into database
tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11])
tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15])
session.insert_tablets([tablet_01, tablet_02])
# execute non-query sql statement
session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188);")
# execute sql query statement
session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01")
session_data_set.set_fetch_size(1024)
while session_data_set.has_next():
print(session_data_set.next())
session_data_set.close_operation_handle()
# close session connection.
session.close()
print("All executions done!!")
from IoTDBConstants import TSDataType
class Field(object):
def __init__(self, data_type):
"""
:param data_type: TSDataType
"""
self.__data_type = data_type
self.__bool_value = None
self.__int_value = None
self.__long_value = None
self.__float_value = None
self.__double_value = None
self.__binary_value = None
@staticmethod
def copy(field):
output = Field(field.get_data_type())
if output.get_data_type() is not None:
if output.get_data_type() == TSDataType.BOOLEAN:
output.set_bool_value(field.get_bool_value())
elif output.get_data_type() == TSDataType.INT32:
output.set_int_value(field.get_int_value())
elif output.get_data_type() == TSDataType.INT64:
output.set_long_value(field.get_long_value())
elif output.get_data_type() == TSDataType.FLOAT:
output.set_float_value(field.get_float_value())
elif output.get_data_type() == TSDataType.DOUBLE:
output.set_double_value(field.get_double_value())
elif output.get_data_type() == TSDataType.TEXT:
output.set_binary_value(field.get_binary_value())
else:
raise Exception("unsupported data type {}".format(output.get_data_type()))
return output
def get_data_type(self):
return self.__data_type
def is_null(self):
return self.__data_type is None
def set_bool_value(self, value):
self.__bool_value = value
def get_bool_value(self):
if self.__data_type is None:
raise Exception("Null Field Exception!")
return self.__bool_value
def set_int_value(self, value):
self.__int_value = value
def get_int_value(self):
if self.__data_type is None:
raise Exception("Null Field Exception!")
return self.__int_value
def set_long_value(self, value):
self.__long_value = value
def get_long_value(self):
if self.__data_type is None:
raise Exception("Null Field Exception!")
return self.__long_value
def set_float_value(self, value):
self.__float_value = value
def get_float_value(self):
if self.__data_type is None:
raise Exception("Null Field Exception!")
return self.__float_value
def set_double_value(self, value):
self.__double_value = value
def get_double_value(self):
if self.__data_type is None:
raise Exception("Null Field Exception!")
return self.__double_value
def set_binary_value(self, value):
self.__binary_value = value
def get_binary_value(self):
if self.__data_type is None:
raise Exception("Null Field Exception!")
return self.__binary_value
def get_string_value(self):
if self.__data_type is None:
return "None"
elif self.__data_type == TSDataType.BOOLEAN:
return str(self.__bool_value)
elif self.__data_type == TSDataType.INT64:
return str(self.__long_value)
elif self.__data_type == TSDataType.INT32:
return str(self.__int_value)
elif self.__data_type == TSDataType.FLOAT:
return str(self.__float_value)
elif self.__data_type == TSDataType.DOUBLE:
return str(self.__double_value)
elif self.__data_type == TSDataType.TEXT:
return self.__binary_value.decode('utf-8')
else:
raise Exception("unsupported data type {}".format(self.__data_type))
def __str__(self):
return self.get_string_value()
def get_object_value(self, data_type):
"""
:param data_type: TSDataType
"""
if self.__data_type is None:
return None
elif data_type == TSDataType.BOOLEAN:
return self.get_bool_value()
elif data_type == TSDataType.INT32:
return self.get_int_value()
elif data_type == TSDataType.INT64:
return self.get_long_value()
elif data_type == TSDataType.FLOAT:
return self.get_float_value()
elif data_type == TSDataType.DOUBLE:
return self.get_double_value()
elif data_type == TSDataType.TEXT:
return self.get_binary_value()
else:
raise Exception("unsupported data type {}".format(data_type))
@staticmethod
def get_field(value, data_type):
"""
:param value: field value corresponding to the data type
:param data_type: TSDataType
"""
if value is None:
return None
field = Field(data_type)
if data_type == TSDataType.BOOLEAN:
field.set_bool_value(value)
elif data_type == TSDataType.INT32:
field.set_int_value(value)
elif data_type == TSDataType.INT64:
field.set_long_value(value)
elif data_type == TSDataType.FLOAT:
field.set_float_value(value)
elif data_type == TSDataType.DOUBLE:
field.set_double_value(value)
elif data_type == TSDataType.TEXT:
field.set_binary_value(value)
else:
raise Exception("unsupported data type {}".format(data_type))
return field
from enum import Enum, unique
@unique
class TSDataType(Enum):
BOOLEAN = 0
INT32 = 1
INT64 = 2
FLOAT = 3
DOUBLE = 4
TEXT = 5
@unique
class TSEncoding(Enum):
PLAIN = 0
PLAIN_DICTIONARY = 1
RLE = 2
DIFF = 3
TS_2DIFF = 4
BITMAP = 5
GORILLA = 6
REGULAR = 7
@unique
class Compressor(Enum):
UNCOMPRESSED = 0
SNAPPY = 1
GZIP = 2
LZO = 3
SDT = 4
PAA = 5
PLA = 6
import sys
from IoTDBConstants import *
sys.path.append("../../target")
from thrift.transport import TSocket, TTransport
from iotdb.rpc.TSIService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \
TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet, TSFetchResultsReq, TSCloseOperationReq, \
TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq
from iotdb.rpc.ttypes import TSFetchMetadataReq, TSProtocolVersion
class IoTDBRpcDataSet(object):
TIMESTAMP_STR = "Time"
VALUE_IS_NULL = "The value got by %s (column name) is NULL."
START_INDEX = 2
FLAG = 0x80
def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id,
client, session_id, query_data_set, fetch_size):
self.__session_id = session_id
self.__ignore_timestamp = ignore_timestamp
self.__sql = sql
self.__query_id = query_id
self.__client = client
self.__fetch_size = fetch_size
self.__column_size = len(column_name_list)
self.__column_name_list = []
self.__column_type_list = []
self.__column_ordinal_dict = {}
if not ignore_timestamp:
self.__column_name_list.append(IoTDBRpcDataSet.TIMESTAMP_STR)
self.__column_type_list.append(TSDataType.INT64)
self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1
if column_name_index is not None:
self.__column_type_deduplicated_list = [None for _ in range(len(column_name_index))]
for i in range(len(column_name_list)):
name = column_name_list[i]
self.__column_name_list.append(name)
self.__column_type_list.append(TSDataType[column_type_list[i]])
if name not in self.__column_ordinal_dict:
index = column_name_index[name]
self.__column_ordinal_dict[name] = index + IoTDBRpcDataSet.START_INDEX
self.__column_type_deduplicated_list[index] = TSDataType[column_type_list[i]]
else:
index = IoTDBRpcDataSet.START_INDEX
self.__column_type_deduplicated_list = []
for i in range(len(column_name_list)):
name = column_name_list[i]
self.__column_name_list.append(name)
self.__column_type_list.append(TSDataType[column_type_list[i]])
if name not in self.__column_ordinal_dict:
self.__column_ordinal_dict[name] = index
index += 1
self.__column_type_deduplicated_list.append(TSDataType[column_type_list[i]])
self.__time_bytes = bytes(0)
self.__current_bitmap = [bytes(0) for _ in range(len(self.__column_type_deduplicated_list))]
self.__value = [None for _ in range(len(self.__column_type_deduplicated_list))]
self.__query_data_set = query_data_set
self.__is_closed = False
self.__empty_resultSet = False
self.__has_cached_record = False
self.__rows_index = 0
def close(self):
if self.__is_closed:
return
if self.__client is not None:
try:
status = self.__client.closeOperation(TSCloseOperationReq(self.__session_id, self.__query_id))
print("close session {}, message: {}".format(self.__session_id, status.message))
except TTransport.TException as e:
print("close session {} failed because: ".format(self.__session_id), e)
raise Exception
self.__is_closed = True
self.__client = None
def next(self):
if self.has_cached_result():
self.construct_one_row()
return True
if self.__empty_resultSet:
return False
if self.fetch_results():
self.construct_one_row()
return True
return False
def has_cached_result(self):
return (self.__query_data_set is not None) and (len(self.__query_data_set.time) != 0)
def construct_one_row(self):
self.__time_bytes = self.__query_data_set.time[:8]
self.__query_data_set.time = self.__query_data_set.time[8:]
for i in range(len(self.__query_data_set.bitmapList)):
bitmap_buffer = self.__query_data_set.bitmapList[i]
if self.__rows_index % 8 == 0:
self.__current_bitmap[i] = bitmap_buffer[0]
self.__query_data_set.bitmapList[i] = bitmap_buffer[1:]
if not self.is_null(i, self.__rows_index):
value_buffer = self.__query_data_set.valueList[i]
data_type = self.__column_type_deduplicated_list[i]
if data_type == TSDataType.BOOLEAN:
self.__value[i] = value_buffer[:1]
self.__query_data_set.valueList[i] = value_buffer[1:]
elif data_type == TSDataType.INT32:
self.__value[i] = value_buffer[:4]
self.__query_data_set.valueList[i] = value_buffer[4:]
elif data_type == TSDataType.INT64:
self.__value[i] = value_buffer[:8]
self.__query_data_set.valueList[i] = value_buffer[8:]
elif data_type == TSDataType.FLOAT:
self.__value[i] = value_buffer[:4]
self.__query_data_set.valueList[i] = value_buffer[4:]
elif data_type == TSDataType.DOUBLE:
self.__value[i] = value_buffer[:8]
self.__query_data_set.valueList[i] = value_buffer[8:]
elif data_type == TSDataType.TEXT:
length = int.from_bytes(value_buffer[:4], byteorder="big", signed=False)
self.__value[i] = value_buffer[4: 4 + length]
self.__query_data_set.valueList[i] = value_buffer[4 + length:]
else:
print("unsupported data type {}.".format(data_type))
# could raise exception here
self.__rows_index += 1
self.__has_cached_record = True
def fetch_results(self):
self.__rows_index = 0
request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True)
try:
resp = self.__client.fetchResults(request)
if not resp.hasResultSet:
self.__empty_resultSet = True
else:
self.__query_data_set = resp.queryDataSet
return resp.hasResultSet
except TTransport.TException as e:
print("Cannot fetch result from server, because of network connection: ", e)
def is_null(self, index, row_num):
bitmap = self.__current_bitmap[index]
shift = row_num % 8
return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xff)) == 0
def is_null_by_index(self, column_index):
index = self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] - IoTDBRpcDataSet.START_INDEX
# time column will never be None
if index < 0:
return True
return self.is_null(index, self.__rows_index - 1)
def is_null_by_name(self, column_name):
index = self.__column_ordinal_dict[column_name] - IoTDBRpcDataSet.START_INDEX
# time column will never be None
if index < 0:
return True
return self.is_null(index, self.__rows_index - 1)
def find_column_name_by_index(self, column_index):
if column_index <= 0:
raise Exception("Column index should start from 1")
if column_index > len(self.__column_name_list):
raise Exception("column index {} out of range {}".format(column_index, self.__column_size))
return self.__column_name_list[column_index - 1]
def get_fetch_size(self):
return self.__fetch_size
def set_fetch_size(self, fetch_size):
self.__fetch_size = fetch_size
def get_column_names(self):
return self.__column_name_list
def get_column_types(self):
return self.__column_type_list
def get_column_size(self):
return self.__column_size
def get_ignore_timestamp(self):
return self.__ignore_timestamp
def get_column_ordinal_dict(self):
return self.__column_ordinal_dict
def get_column_type_deduplicated_list(self):
return self.__column_type_deduplicated_list
def get_values(self):
return self.__value
def get_time_bytes(self):
return self.__time_bytes
def get_has_cached_record(self):
return self.__has_cached_record
from IoTDBConstants import TSDataType
from Field import Field
class RowRecord(object):
def __init__(self, timestamp, field_list=None):
self.__timestamp = timestamp
self.__field_list = field_list
def add_field(self, field):
self.__field_list.append(field)
def add_field(self, value, data_type):
self.__field_list.append(Field.get_field(value, data_type))
def __str__(self):
str_list = [str(self.__timestamp)]
for field in self.__field_list:
str_list.append("\t\t")
str_list.append(str(field))
return "".join(str_list)
def get_timestamp(self):
return self.__timestamp
def set_timestamp(self, timestamp):
self.__timestamp = timestamp
def get_fields(self):
return self.__field_list
def set_fields(self, field_list):
self.__field_list = field_list
def set_field(self, index, field):
self.__field_list[index] = field
from IoTDBConstants import TSDataType
from IoTDBRpcDataSet import IoTDBRpcDataSet
from Field import Field
from RowRecord import RowRecord
import struct
class SessionDataSet(object):
def __init__(self, sql, column_name_list, column_type_list, column_name_index, query_id, client, session_id,
query_data_set, ignore_timestamp):
self.iotdb_rpc_data_set = IoTDBRpcDataSet(sql, column_name_list, column_type_list, column_name_index,
ignore_timestamp, query_id, client, session_id, query_data_set, 1024)
def get_fetch_size(self):
return self.iotdb_rpc_data_set.get_fetch_size()
def set_fetch_size(self, fetch_size):
self.iotdb_rpc_data_set.set_fetch_size(fetch_size)
def get_column_names(self):
return self.iotdb_rpc_data_set.get_column_names()
def get_column_types(self):
return self.iotdb_rpc_data_set.get_column_types()
def has_next(self):
return self.iotdb_rpc_data_set.next()
def next(self):
if not self.iotdb_rpc_data_set.get_has_cached_record():
if not self.has_next():
return None
self.iotdb_rpc_data_set.has_cached_record = False
return self.construct_row_record_from_value_array()
def construct_row_record_from_value_array(self):
out_fields = []
for i in range(self.iotdb_rpc_data_set.get_column_size()):
index = i + 1
data_set_column_index = i + IoTDBRpcDataSet.START_INDEX
if self.iotdb_rpc_data_set.get_ignore_timestamp():
index -= 1
data_set_column_index -= 1
column_name = self.iotdb_rpc_data_set.get_column_names()[index]
location = self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] - IoTDBRpcDataSet.START_INDEX
if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index):
value_bytes = self.iotdb_rpc_data_set.get_values()[location]
data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[location]
field = Field(data_type)
if data_type == TSDataType.BOOLEAN:
value = struct.unpack(">?", value_bytes)[0]
field.set_bool_value(value)
elif data_type == TSDataType.INT32:
value = struct.unpack(">i", value_bytes)[0]
field.set_int_value(value)
elif data_type == TSDataType.INT64:
value = struct.unpack(">q", value_bytes)[0]
field.set_long_value(value)
elif data_type == TSDataType.FLOAT:
value = struct.unpack(">f", value_bytes)[0]
field.set_float_value(value)
elif data_type == TSDataType.DOUBLE:
value = struct.unpack(">d", value_bytes)[0]
field.set_double_value(value)
elif data_type == TSDataType.TEXT:
field.set_binary_value(value_bytes)
else:
print("unsupported data type {}.".format(data_type))
# could raise exception here
else:
field = Field(None)
out_fields.append(field)
return RowRecord(struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields)
def close_operation_handle(self):
self.iotdb_rpc_data_set.close()
from IoTDBConstants import *
import struct
class Tablet(object):
def __init__(self, device_id, measurements, data_types, values, timestamps):
"""
creating a tablet for insertion
for example, considering device: root.sg1.d1
timestamps, m1, m2, m3
1, 125.3, True, text1
2, 111.6, False, text2
3, 688.6, True, text3
Notice: The tablet should not have empty cell
The tablet will be sorted at the initialization by timestamps
:param device_id: String, IoTDB time series path to device layer (without sensor).
:param measurements: List, sensors.
:param data_types: TSDataType List, specify value types for sensors.
:param values: 2-D List, the values of each row should be the outer list element.
:param timestamps: List.
"""
if len(timestamps) != len(values):
print("Input error! len(timestamps) does not equal to len(values)!")
# could raise an error here.
if not Tablet.check_sorted(timestamps):
sorted_zipped = sorted(zip(timestamps, values))
result = zip(*sorted_zipped)
self.__timestamps, self.__values = [list(x) for x in result]
else:
self.__values = values
self.__timestamps = timestamps
self.__device_id = device_id
self.__measurements = measurements
self.__data_types = data_types
self.__row_number = len(timestamps)
self.__column_number = len(measurements)
@staticmethod
def check_sorted(timestamps):
for i in range(1, len(timestamps)):
if timestamps[i] < timestamps[i - 1]:
return False
return True
def get_measurements(self):
return self.__measurements
def get_data_types(self):
return self.__data_types
def get_row_number(self):
return self.__row_number
def get_device_id(self):
return self.__device_id
def get_binary_timestamps(self):
format_str_list = [">"]
values_tobe_packed = []
for timestamp in self.__timestamps:
format_str_list.append("q")
values_tobe_packed.append(timestamp)
format_str = ''.join(format_str_list)
return struct.pack(format_str, *values_tobe_packed)
def get_binary_values(self):
format_str_list = [">"]
values_tobe_packed = []
for i in range(self.__column_number):
if self.__data_types[i] == TSDataType.BOOLEAN:
format_str_list.append(str(self.__row_number))
format_str_list.append("?")
for j in range(self.__row_number):
values_tobe_packed.append(self.__values[j][i])
elif self.__data_types[i] == TSDataType.INT32:
format_str_list.append(str(self.__row_number))
format_str_list.append("i")
for j in range(self.__row_number):
values_tobe_packed.append(self.__values[j][i])
elif self.__data_types[i] == TSDataType.INT64:
format_str_list.append(str(self.__row_number))
format_str_list.append("q")
for j in range(self.__row_number):
values_tobe_packed.append(self.__values[j][i])
elif self.__data_types[i] == TSDataType.FLOAT:
format_str_list.append(str(self.__row_number))
format_str_list.append("f")
for j in range(self.__row_number):
values_tobe_packed.append(self.__values[j][i])
elif self.__data_types[i] == TSDataType.DOUBLE:
format_str_list.append(str(self.__row_number))
format_str_list.append("d")
for j in range(self.__row_number):
values_tobe_packed.append(self.__values[j][i])
elif self.__data_types[i] == TSDataType.TEXT:
for j in range(self.__row_number):
value_bytes = bytes(self.__values[j][i], 'utf-8')
format_str_list.append("i")
format_str_list.append(str(len(value_bytes)))
format_str_list.append("s")
values_tobe_packed.append(len(value_bytes))
values_tobe_packed.append(value_bytes)
else:
print("Unsupported data type:" + str(self.__data_types[i]))
# could raise an error here.
return
format_str = ''.join(format_str_list)
return struct.pack(format_str, *values_tobe_packed)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册