提交 e7ee3b04 编写于 作者: J jiacy-jcy

update timetruncate test case

上级 0a4fce35
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import time
from datetime import datetime
class GetTime:
def get_ms_timestamp(self,ts_str):
_ts_str = ts_str
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 15 :
_ts_str = ts_str[:-3]
if ':' in _ts_str and '.' in _ts_str:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d %H:%M:%S.%f")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
elif ':' in _ts_str and '.' not in _ts_str:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d %H:%M:%S")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
else:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
return date_time
def get_us_timestamp(self,ts_str):
_ts = self.get_ms_timestamp(ts_str) * 1000
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 12:
us_ts = p[12:15]
_ts += int(us_ts)
return _ts
def get_ns_timestamp(self,ts_str):
_ts = self.get_us_timestamp(ts_str) *1000
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 15:
us_ts = p[15:]
_ts += int(us_ts)
return _ts
\ No newline at end of file
......@@ -5,12 +5,12 @@ from util.sql import *
import numpy as np
import time
from datetime import datetime
from util.gettime import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.get_time = GetTime()
self.rowNum = 10
self.ts = 1537146000000 # 2018-9-17 09:00:00.000
......@@ -28,103 +28,71 @@ class TDTestCase:
self.ntbname = 'ntb'
self.stbname = 'stb'
self.ctbname = 'ctb'
def get_ms_timestamp(self,ts_str):
_ts_str = ts_str
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 15 :
_ts_str = ts_str[:-3]
if ':' in _ts_str and '.' in _ts_str:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d %H:%M:%S.%f")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
elif ':' in _ts_str and '.' not in _ts_str:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d %H:%M:%S")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
else:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
return date_time
def get_us_timestamp(self,ts_str):
_ts = self.get_ms_timestamp(ts_str) * 1000
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 12:
us_ts = p[12:15]
_ts += int(us_ts)
return _ts
def get_ns_timestamp(self,ts_str):
_ts = self.get_us_timestamp(ts_str) *1000
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 15:
us_ts = p[15:]
_ts += int(us_ts)
return _ts
def time_transform(self,ts_str,precision):
date_time = []
if precision == 'ms':
for i in ts_str:
date_time.append(self.get_ms_timestamp(i))
date_time.append(self.get_time.get_ms_timestamp(i))
elif precision == 'us':
for i in ts_str:
date_time.append(self.get_us_timestamp(i))
date_time.append(self.get_time.get_us_timestamp(i))
elif precision == 'ns':
for i in ts_str:
date_time.append(self.get_us_timestamp(i))
date_time.append(self.get_time.get_us_timestamp(i))
return date_time
def check_ms_timestamp(self,unit,date_time):
if unit.lower() == '1a':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]))
elif unit.lower() == '1s':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000)*1000)
elif unit.lower() == '1m':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60)*60*1000)
elif unit.lower() == '1h':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60)*60*60*1000 )
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24)*24*60*60*1000)
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24/7)*7*24*60*60*1000)
def check_us_timestamp(self,unit,date_time):
if unit.lower() == '1u':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]))
elif unit.lower() == '1a':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000)*1000)
elif unit.lower() == '1s':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000)*1000*1000)
elif unit.lower() == '1m':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60)*60*1000*1000)
elif unit.lower() == '1h':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60)*60*60*1000*1000 )
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24)*24*60*60*1000*1000 )
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
ts_result = self.get_time.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24/7)*7*24*60*60*1000*1000)
def check_ns_timestamp(self,unit,date_time):
if unit.lower() == '1u':
......@@ -201,7 +169,6 @@ class TDTestCase:
tdSql.execute(f'insert into {self.ntbname} values("{ts}",1)')
date_time = self.time_transform(self.ts_str,precision)
self.data_check(date_time,precision,'ntb')
def function_check_stb(self):
for precision in self.db_param_precision:
tdSql.execute('drop database if exists db')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册