提交 cacfbc9b 编写于 作者: A Aaron Xiao 提交者: Calvin Miao

Data: Refactor data recorder. (#1324)

上级 94f110ee
......@@ -128,8 +128,7 @@ function build() {
build_py_proto
# Update task info template on compiling.
bazel-bin/modules/data/recorder/update_task_info \
--commit_id=$(git rev-parse HEAD)
bazel-bin/modules/data/util/update_task_info --commit_id=$(git rev-parse HEAD)
}
function cibuild() {
......
syntax = "proto2";
package apollo.data.data_recorder;
package apollo.data;
message RecorderInfo {
optional uint32 status = 1;
......
#!/bin/bash
###############################################################################
# Copyright 2017 The Apollo Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
APOLLO_ROOT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )/../../../.." && pwd )"
cd ${APOLLO_ROOT_DIR}
SCRIPT_NAME="data-recorder"
PWD=$(cd "$(dirname "$0")"; pwd)
color_message() {
local message=$1
local color=$2
case "${color}" in
"BLUE" )
echo -e "\033[34m>> ${message}\033[0m"
;;
"GREEN" )
echo -e "\033[32m>> ${message}\033[0m"
;;
"RED" )
echo -e "\033[31m>> ${message}\033[0m"
;;
"YELLOW" )
echo -e "\033[33m>> ${message}\033[0m"
;;
"*" )
echo -e "\033[31m>> Only support 'GREEN | RED |YELLOW'" && exit 1
esac
}
usage_start() {
color_message "Data Recorder:" "BLUE"
color_message "Usage: bash ${SCRIPT_NAME} start [debug | ads | collection]" "BLUE"
color_message "Version: ${SCRIPT_NAME} ${VERSION}." "BLUE"
}
usage() {
color_message "Data Recorder:" "BLUE"
color_message "Usage: bash ${SCRIPT_NAME} [ start | stop | check-env | config ]" "BLUE"
color_message "Version: ${SCRIPT_NAME} ${VERSION}." "BLUE"
}
start() {
local task_purpose=$1
ps -ef|grep 'data_recorder_manager'|grep -v 'data_recorder_control'|grep -v 'grep' &>/dev/null
[ $? -eq 0 ] && printf "\033[31mThere is another data-recorder process running, if you want to stop that process, you can run following command:\nbash data_recorder_control.sh stop\n\033[0m" && exit 1
python modules/data/recorder/tool/data_recorder_manager.py \
-c modules/data/conf/recorder.${task_purpose}.yaml &
sleep 1
ps -ef|grep 'data_recorder_manager'|grep -v 'grep' &>/dev/null
[ $? -eq 0 ] && echo "data-recorder start successfully." || echo "data-recorder start failed."
}
stop() {
ps -ef|grep 'data_recorder_manager'|grep -v 'grep'|awk '{print $2}'|xargs kill
[ $? -eq 0 ] && echo "Stop data-recorder successfully" || echo "Stop data-recorder failed."
return 0
}
restart() {
stop
sleep 10
start
}
main() {
if [[ $# -lt 1 ]]; then
usage && exit 0
fi
case "$1" in
start )
if [ $# -eq 1 ];then
start "debug"
elif [ $# -eq 2 ]; then
case "$2" in
(debug|ads|collection)
start $2 ;;
(*)
usage_start && exit 0
esac
else
usage_start && exit 0
fi
;;
stop )
stop
;;
config)
config
;;
*)
usage && exit 1
esac
}
main "$@"
......@@ -18,7 +18,7 @@ Modify the config file according to your system environments and data recording
#### Start Data-Recorder.
* bash data_reocrder_control.sh start # Start data-recoder with default task_purpose(debug).
* python modules/data/recorder/tool/data_recorder_manager.py -c modules/data/conf/recorder.debug.yaml # This is another way to start.
* python modules/data/tools/recorder/data_recorder_manager.py -c modules/data/conf/recorder.debug.yaml # This is another way to start.
#### Stop Data-Recorder.
* bash data-recorder_control.sh stop # stop data-recorder.
......
......@@ -42,7 +42,8 @@ class ConfigParser(object):
stream = file(config_file, 'r')
yaml_cf = yaml.safe_load(stream)
except Exception as e:
logging.error("Load config file %s encounters error, %s", config_file, str(e))
logging.error("Load config file %s encounters error, %s",
config_file, str(e))
else:
logging.info("Load config file %s successfully", config_file)
return yaml_cf
......@@ -62,21 +63,30 @@ class ConfigParser(object):
self.vehicle['vehicle_tag'] = vehicle_required['vehicle_tag']
self.vehicle['vehicle_type'] = vehicle_required['vehicle_type']
except KeyError as e:
logging.error("vehicle_id, vehicle_type and vehicle_tag are required, %s", str(e))
logging.error(
"vehicle_id, vehicle_type and vehicle_tag are required, %s",
str(e))
return -1
try:
self.vehicle['vehicle_id'] = os.environ['CARID']
logging.info("Get CARID from env variable successfully, CARID=%s", self.vehicle['vehicle_id'])
logging.info("Get CARID from env variable successfully, CARID=%s",
self.vehicle['vehicle_id'])
except KeyError as e:
logging.warn("Get CARID from env variable failed, read carid from conf.")
logging.warn(
"Get CARID from env variable failed, read carid from conf.")
if "optional" in vehicle:
vehicle_optional = vehicle.get('optional')
if vehicle_optional is not None:
try:
self.vehicle['description'] = vehicle_optional['description']
self.vehicle['description'] = \
vehicle_optional['description']
except KeyError as e:
logging.warn("get vehicle optional field encounters error, %s", str(e))
logging.info("get vehicle from yaml config file successfully, vehicle=%s", self.vehicle)
logging.warn(
"get vehicle optional field encounters error, %s",
str(e))
logging.info(
"get vehicle from yaml config file successfully, vehicle=%s",
self.vehicle)
return 0
def get_organization_from_yaml(self, yaml_config):
......@@ -87,7 +97,8 @@ class ConfigParser(object):
return -1
org_required = org.get('required')
if org_required is None:
logging.error("Config file must contain organization required field.")
logging.error(
"Config file must contain organization required field.")
return -1
try:
self.organization['name'] = org_required['name']
......@@ -99,11 +110,14 @@ class ConfigParser(object):
org_optional = org.get('optional')
if org_optional is not None:
try:
self.organization['description'] = org_optional['description']
self.organization['description'] = \
org_optional['description']
except KeyError as e:
logging.warn("Get organization optional field encounters error, %s", str(e))
logging.info("Get organization from yaml config file successfully, organization=%s",
self.organization)
logging.warn(
"Get organization optional field encounters error, %s",
str(e))
logging.info("Get organization from yaml config file successfully, "
"organization=%s", self.organization)
return 0
def get_datatype_from_yaml(self, yaml_config):
......@@ -119,7 +133,9 @@ class ConfigParser(object):
data_type = data.get('data_type')
if data_type is not None:
self.data_type = data_type
logging.info("Get data_type from yaml config file successfully, data_type=%s", self.data_type)
logging.info(
"Get data_type from yaml config file successfully, data_type=%s",
self.data_type)
return 0
def get_global_config(self, yaml_config):
......
#!/bin/bash
###############################################################################
# Copyright 2017 The Apollo Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
APOLLO_ROOT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )/../../../.." && pwd )"
cd ${APOLLO_ROOT_DIR}
SCRIPT_NAME="$(basename $0)"
PWD=$(cd "$(dirname "$0")"; pwd)
color_message() {
local message=$1
local color=$2
case "${color}" in
"BLUE" )
echo -e "\033[34m>> ${message}\033[0m"
;;
"GREEN" )
echo -e "\033[32m>> ${message}\033[0m"
;;
"RED" )
echo -e "\033[31m>> ${message}\033[0m"
;;
"YELLOW" )
echo -e "\033[33m>> ${message}\033[0m"
;;
"*" )
echo -e "\033[31m>> Only support 'GREEN | RED |YELLOW'" && exit 1
esac
}
usage_start() {
color_message "Data Recorder:" "BLUE"
color_message \
"Usage: bash ${SCRIPT_NAME} start [debug | ads | collection]" "BLUE"
color_message "Version: ${SCRIPT_NAME} ${VERSION}." "BLUE"
}
usage() {
color_message "Data Recorder:" "BLUE"
color_message \
"Usage: bash ${SCRIPT_NAME} [ start | stop | check-env | config ]" "BLUE"
color_message "Version: ${SCRIPT_NAME} ${VERSION}." "BLUE"
}
start() {
local task_purpose=$1
ps -ef | grep 'data_recorder_manager' | grep -v 'data_recorder_control' | \
grep -v 'grep' &>/dev/null
[ $? -eq 0 ] && printf "\033[31mThere is another data-recorder process \
running, if you want to stop that process, you can run following command:\n\
bash data_recorder_control.sh stop\n\033[0m" && exit 1
python modules/data/tools/recorder/data_recorder_manager.py \
-c modules/data/conf/recorder.${task_purpose}.yaml &
sleep 1
ps -ef|grep 'data_recorder_manager'|grep -v 'grep' &>/dev/null
[ $? -eq 0 ] && echo "data-recorder start successfully." || \
echo "data-recorder start failed."
}
stop() {
ps -ef | grep 'data_recorder_manager' | grep -v 'grep' | awk '{print $2}' | \
xargs kill
[ $? -eq 0 ] && echo "Stop data-recorder successfully" || \
echo "Stop data-recorder failed."
return 0
}
restart() {
stop
sleep 10
start
}
main() {
if [[ $# -lt 1 ]]; then
usage && exit 0
fi
case "$1" in
start )
if [ $# -eq 1 ];then
start "debug"
elif [ $# -eq 2 ]; then
case "$2" in
(debug|ads|collection)
start $2 ;;
(*)
usage_start && exit 0
esac
else
usage_start && exit 0
fi
;;
stop )
stop
;;
config)
config
;;
*)
usage && exit 1
esac
}
main "$@"
......@@ -78,7 +78,8 @@ class DataRecorderManager(object):
def init(self):
self.recorder_status = recorder_info_pb2.DATA_RECORDER_INIT
self.cmd_topic = self.conf_reader.data_args.get("recorder_cmd_topic")
self.status_topic = self.conf_reader.data_args.get("recorder_status_topic")
self.status_topic = self.conf_reader.data_args.get(
"recorder_status_topic")
self.pub = rospy.Publisher(self.status_topic, String, queue_size=100)
signal.signal(signal.SIGINT, self.shutdown_hook)
signal.signal(signal.SIGTERM, self.shutdown_hook)
......@@ -106,7 +107,8 @@ class DataRecorderManager(object):
def create_task_id(self):
"""Recorder init, create_task_id."""
if disk_handle.check_disk(self.conf_reader.data_args.get('output_path')) == -2:
if disk_handle.check_disk(
self.conf_reader.data_args.get('output_path')) == -2:
return -1
self.create_backup_id()
self.output_directory = self.conf_reader.data_args.get('output_path') \
......@@ -127,13 +129,20 @@ class DataRecorderManager(object):
{
'basic': {
'data_recorder_version': G_VERSION,
'organization_name': self.conf_reader.organization.get('name'),
'organization_website': self.conf_reader.organization.get('website'),
'organization_description': self.conf_reader.organization.get('description'),
'vehicle_id': self.conf_reader.vehicle.get('vehicle_id'),
'vehicle_type': self.conf_reader.vehicle.get('vehicle_type'),
'vehicle_tag': self.conf_reader.vehicle.get('vehicle_tag'),
'vehicle_description': self.conf_reader.vehicle.get('description'),
'organization_name': \
self.conf_reader.organization.get('name'),
'organization_website': \
self.conf_reader.organization.get('website'),
'organization_description': \
self.conf_reader.organization.get('description'),
'vehicle_id': \
self.conf_reader.vehicle.get('vehicle_id'),
'vehicle_type': \
self.conf_reader.vehicle.get('vehicle_type'),
'vehicle_tag': \
self.conf_reader.vehicle.get('vehicle_tag'),
'vehicle_description': \
self.conf_reader.vehicle.get('description'),
'task_purpose': self.conf_reader.task_purpose,
'system_uptime': self.get_system_uptime()
}
......@@ -143,9 +152,10 @@ class DataRecorderManager(object):
meta_ext = self.conf_reader.data_args.get('meta_extension')
meta_extension = (meta_ext
if meta_ext == 'ini' or meta_ext == 'json' else "ini")
ret = meta_manager.create_meta(self.output_directory + '/meta/recorder.' + meta_extension,
meta_extension,
recorder_meta)
ret = meta_manager.create_meta(
self.output_directory + '/meta/recorder.' + meta_extension,
meta_extension,
recorder_meta)
if ret != 0:
return -1
self.update_link()
......@@ -164,9 +174,11 @@ class DataRecorderManager(object):
def listener_callback(self, data):
"""Listener callback."""
logging.info("Receive message from %s, data=%s", self.cmd_topic, data.data)
logging.info("Receive message from %s, data=%s",
self.cmd_topic, data.data)
if self.latest_cmdtime is not None:
if (datetime.datetime.now() - self.latest_cmdtime) < datetime.timedelta(seconds=10):
if (datetime.datetime.now() - self.latest_cmdtime) < \
datetime.timedelta(seconds=10):
print_message = (
"The interval between two consecutive operation"
" must not be less than 10 seconds! Thanks!"
......@@ -208,7 +220,8 @@ class DataRecorderManager(object):
"""Sync data."""
src = self.conf_reader.task_data_args[data]['data_property']['src']
dst = self.conf_reader.task_data_args[data]['data_property']['dst']
limit = self.conf_reader.task_data_args[data]['action_args']['sync_bwlimit']
limit = \
self.conf_reader.task_data_args[data]['action_args']['sync_bwlimit']
src = src if src.endswith('/') else src + "/"
dst = self.output_directory + "/" + dst + "/"
cmd = "mkdir -p " + dst \
......@@ -223,10 +236,12 @@ class DataRecorderManager(object):
out, err = process.communicate()
errcode = process.returncode
if errcode == 0:
logging.info("Sync %s to %s successfully after creating task_id", src, dst)
logging.info("Sync %s to %s successfully after creating task_id",
src, dst)
else:
logging.error("Sync data failed after creating task_id, \
cmd=%s, stdout=%s, stderr=%s, errcode=%s", cmd, out, err, errcode)
cmd=%s, stdout=%s, stderr=%s, errcode=%s",
cmd, out, err, errcode)
def publish_recorder_info(self, event):
"""Publish data recorder status."""
......@@ -235,8 +250,10 @@ class DataRecorderManager(object):
task = recorder_info_pb2.Task()
task_id = os.path.split(os.path.abspath(self.output_directory))[1]
task.id = task_id
datetime_task_start = datetime.datetime.strptime(task_id.split('_')[1], '%Y%m%d%H%M%S')
task.duration = (datetime.datetime.now() - datetime_task_start).seconds
datetime_task_start = datetime.datetime.strptime(
task_id.split('_')[1], '%Y%m%d%H%M%S')
task.duration = (datetime.datetime.now() - datetime_task_start
).seconds
resources = recorder_info_pb2.Resources()
disk_partitions = disk_handle.get_disk_partitions_info()
for dp in disk_partitions:
......@@ -250,23 +267,33 @@ class DataRecorderManager(object):
disk.avail = float(db_usage.free)
disk.use_percent = float(db_usage.percent)
disk.mount = dp.mountpoint
if disk.mount == disk_handle.get_mount_point(self.conf_reader.data_args.get('output_path')):
if disk.mount == disk_handle.get_mount_point(
self.conf_reader.data_args.get('output_path')):
info.writing_disk.CopyFrom(disk)
if disk_handle.check_disk(self.conf_reader.data_args.get('output_path')) == 0:
self.recorder_status &= (~recorder_info_pb2.DISK_SPACE_WARNNING)
self.recorder_status &= (~recorder_info_pb2.DISK_SPACE_ALERT)
if disk_handle.check_disk(self.conf_reader.data_args.get('output_path')) == -1:
self.recorder_status |= recorder_info_pb2.DISK_SPACE_WARNNING
if disk_handle.check_disk(self.conf_reader.data_args.get('output_path')) == -2:
self.recorder_status |= recorder_info_pb2.DISK_SPACE_WARNNING
self.recorder_status |= recorder_info_pb2.DISK_SPACE_ALERT
if disk_handle.check_disk(
self.conf_reader.data_args.get('output_path')) == 0:
self.recorder_status &= (
~recorder_info_pb2.DISK_SPACE_WARNNING)
self.recorder_status &= (
~recorder_info_pb2.DISK_SPACE_ALERT)
if disk_handle.check_disk(
self.conf_reader.data_args.get('output_path')) == -1:
self.recorder_status |= \
recorder_info_pb2.DISK_SPACE_WARNNING
if disk_handle.check_disk(
self.conf_reader.data_args.get('output_path')) == -2:
self.recorder_status |= \
recorder_info_pb2.DISK_SPACE_WARNNING
self.recorder_status |= \
recorder_info_pb2.DISK_SPACE_ALERT
if self.rlock.acquire():
self.record_enable = False
self.sync_enable = False
self.rlock.release()
if not self.sync_enable and self.rlock.acquire():
self.recorder_status &= (~recorder_info_pb2.DATA_SYNC_ENABLE)
self.recorder_status &= (
~recorder_info_pb2.DATA_SYNC_ENABLE)
self.rlock.release()
data = recorder_info_pb2.Data()
info.status = self.recorder_status
......@@ -308,8 +335,10 @@ class DataRecorderManager(object):
rosbag_buffer_size = rosbag_args['action_args']['rosbag_buffer_size']
rosbag_chunk_size = rosbag_args['action_args']['rosbag_chunk_size']
rosbag_split = rosbag_args['action_args']['rosbag_split']
rosbag_split_duration = rosbag_args['action_args']['rosbag_split_duration']
rosbag_compress_type = rosbag_args['action_args']['rosbag_compress_type']
rosbag_split_duration = \
rosbag_args['action_args']['rosbag_split_duration']
rosbag_compress_type = \
rosbag_args['action_args']['rosbag_compress_type']
rosbag_groups = rosbag_args['action_args']['rosbag_topic_group']
rosbag_path = self.output_directory + "/" + rosbag_dst
record_instance = ()
......@@ -318,7 +347,8 @@ class DataRecorderManager(object):
group_name = group['group_name']
group_topic_match_regex = group['group_topic_match_re']
group_topic_exclude_regex = group['group_topic_exclude_re']
prefix = "rosbag_" + self.conf_reader.vehicle['vehicle_id'] + "_" + group_name
prefix = "rosbag_" + \
self.conf_reader.vehicle['vehicle_id'] + "_" + group_name
opts = recorder.RecorderOptions()
opts.record_path = rosbag_path
opts.record_prefix = prefix
......@@ -345,7 +375,8 @@ class DataRecorderManager(object):
self.recorder_status |= 7 # running && record && sync
self.listener()
timer_publish = rospy.Timer(rospy.Duration(2), self.publish_recorder_info)
timer_publish = rospy.Timer(rospy.Duration(2),
self.publish_recorder_info)
while not rospy.is_shutdown():
rospy.sleep(1)
is_alive = False
......@@ -360,21 +391,28 @@ class DataRecorderManager(object):
if record_pid.exitcode == -1024:
del self.recorder_list[index]
break
if record_pid.exitcode is not None and not record_pid.is_alive():
logging.warn("Record group %s is not alive, try to restart.", record_name)
if (record_pid.exitcode is not None and
not record_pid.is_alive()):
logging.warn(
"Record group %s is not alive, try to restart.",
record_name)
try:
record_pid.start()
except Exception as e:
logging.warn("Record group %s is not alive, restart, %s", record_name, str(e))
logging.warn(
"Record group %s is not alive, restart, %s",
record_name, str(e))
if record_pid.exitcode is not None and record_pid.exitcode != 0:
logging.info("Record exit, name=%s, exitcode=%s",
record_name,
str(record_pid.exitcode))
logging.warn("Record group %s is terminated with exception, try to restart.", record_name)
logging.warn("Record group %s is terminated with "
"exception, try to restart.", record_name)
record = recorder.Recorder(self, record_opts)
record.start()
del self.recorder_list[index]
self.recorder_list.append((record_name, record, record_opts))
self.recorder_list.append(
(record_name, record, record_opts))
for worker in self.worker_list:
is_alive = is_alive or worker.isAlive()
if not is_alive:
......@@ -416,7 +454,8 @@ def launch_data_recorder(cp):
def main():
"""main function"""
usage = "python modules/data/recorder/tool/data_recorder_manager.py -c modules/data/conf/recorder.conf"
usage = ("python modules/data/tools/recorder/data_recorder_manager.py -c "
"modules/data/conf/recorder.conf")
parser = optparse.OptionParser(usage)
parser.add_option("-c", "--conf",
dest = "conf_file",
......@@ -431,17 +470,20 @@ def main():
if len(sys.argv) == 1:
parser.error("Incorrect numbers of arguments")
if len(args):
parser.error("Incorrect numbers of arguments, please type python data_recorder.py -h")
parser.error("Incorrect numbers of arguments, "
"please type python data_recorder.py -h")
if options.my_version:
return print_version(G_VERSION)
if options.conf_file is not None:
if not os.path.exists(options.conf_file):
parser.error("The config file you given does not exists, please check!")
parser.error(
"The config file you given does not exists, please check!")
else:
cp =config_parser.ConfigParser()
global_conf = cp.load_config("modules/data/conf/recorder.global.yaml")
global_conf = cp.load_config(
"modules/data/conf/recorder.global.yaml")
task_conf = cp.load_config(options.conf_file)
if global_conf is None:
print("Load recorder.global.yaml error!")
......@@ -450,10 +492,13 @@ def main():
print("Load recorder.debug.yaml error!")
sys.exit(-1)
if not cp.get_global_config(global_conf) == 0:
print("Get global parameters from modules/data/conf/recorder.global.yaml encounters error!")
print("Get global parameters from "
"modules/data/conf/recorder.global.yaml "
"encounters error!")
sys.exit(-1)
if not cp.get_task_from_yaml(task_conf) == 0:
print("Get task parameters from %s encounters error!" % (options.conf_file))
print("Get task parameters from %s encounters error!" % (
options.conf_file))
rospy.init_node('data_recorder', anonymous=False)
launch_data_recorder(cp)
......
......@@ -68,8 +68,8 @@ class DataSync(threading.Thread):
sync_src = src if src.endswith('/') else src + "/"
sync_dst = self.recorder_manager.output_directory + "/" + dst + "/"
cmd = "mkdir -p " + sync_dst \
+ " && echo \"rootpass\n\" |sudo -S /usr/bin/rsync -auvrtzopgP" \
+ " --bwlimit=" \
+ " && echo \"rootpass\n\" |sudo -S /usr/bin/rsync " \
+ "-auvrtzopgP --bwlimit=" \
+ str(limit) \
+ " " \
+ sync_src \
......@@ -87,20 +87,24 @@ class DataSync(threading.Thread):
args=(backup=%s, limit=%s, with_remove=%s)",
sync_src, sync_dst, backup, str(limit), with_remove)
else:
logging.error("Sync %s to %s failed, cmd=%s, stderr=%s, errorcode=%s",
logging.error(
"Sync %s to %s failed, cmd=%s, stderr=%s, errorcode=%s",
sync_src, sync_dst, cmd, err, errcode)
return
if not with_remove:
return
# backup and remove.
cmd = "echo \"rootpass\n\"|sudo -S find " + sync_src + " -mmin +1 -type f"
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd = "echo \"rootpass\n\"|sudo -S find " + sync_src + \
" -mmin +1 -type f"
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = process.communicate()
errcode = process.returncode
logging.info("Find files in src=%s, stdout=%s, stderr=%s, errorcode=%s",
sync_src, out, err, errcode)
if errcode != 0:
logging.error("Find files failed, cmd=%s, stdout=%s, stderr=%s, errorcode=%s",
logging.error(
"Find files failed, cmd=%s, stdout=%s, stderr=%s, errorcode=%s",
cmd, out, err, errcode)
return
for f in out.splitlines():
......@@ -126,7 +130,8 @@ class DataSync(threading.Thread):
sync_src_tmp = sync_src.rstrip('/')
backup_dst = backup \
+ "/" \
+ os.path.split(os.path.abspath(f))[0].replace(sync_src_tmp, '')
+ os.path.split(
os.path.abspath(f))[0].replace(sync_src_tmp, '')
cmd = "mkdir -p " + backup_dst \
+ " && echo \"rootpass\n\" |sudo -S mv " \
+ f \
......@@ -140,11 +145,13 @@ class DataSync(threading.Thread):
out, err = process.communicate()
errcode = process.returncode
if errcode == 0:
logging.info("Sync %s to %s with remove source files successfully",
logging.info(
"Sync %s to %s with remove source files successfully",
f, backup_dst)
else:
logging.info("Sync @cmptnode %s to %s with remove failed, \
cmd=%s, stderr=%s, errorcode=%s", f, backup_dst, cmd, err, errcode)
cmd=%s, stderr=%s, errorcode=%s",
f, backup_dst, cmd, err, errcode)
def sync_data(self):
"""Sync data."""
......@@ -160,23 +167,27 @@ class DataSync(threading.Thread):
dst = data_args.get('data_property').get('dst')
bwlimit = data_args.get('action_args').get('sync_bwlimit')
if data_args['action_args']['with_remove']:
backup_dir = self.recorder_manager.backup_directory + "/" + dst
self.do_sync(src, dst, backup=backup_dir, limit=bwlimit, with_remove=True)
backup_dir = self.recorder_manager.backup_directory + "/" \
+ dst
self.do_sync(src, dst, backup=backup_dir, limit=bwlimit,
with_remove=True)
else:
self.do_sync(src, dst, backup=None, limit=bwlimit, with_remove=False)
self.do_sync(src, dst, backup=None, limit=bwlimit,
with_remove=False)
def clean_backup(self):
"""Clean backup dir."""
backup = self.conf_reader.data_args.get('backup_path')
backup_days = self.conf_reader.data_args.get('backup_keep_days')
backup_max_size = self.conf_reader.data_args.get('backup_max_size')
cmd = "cd " + backup + " && find . -maxdepth 1 -ctime +" + str(backup_days) \
+ " |xargs -i rm -rf {}"
cmd = "cd " + backup + " && find . -maxdepth 1 -ctime +" + str(
backup_days) + " |xargs -i rm -rf {}"
ret, rst = commands.getstatusoutput(cmd)
logging.info("cmd=%s, return=%s, result=%s", cmd, ret, rst)
backup_size = disk_handle.get_folder_size(backup)
while (backup_size > int(backup_max_size) * 1024 * 1024):
cmd = "cd " + backup + " && find -type f -printf \'%p\n\' | sort|head -n 5| xargs -i rm {}"
cmd = "cd " + backup + " && find -type f -printf \'%p\n\' | " + \
"sort|head -n 5| xargs -i rm {}"
try:
ret, rst = commands.getstatusoutput(cmd)
logging.info("cmd=%s, return=%s, result=%s", cmd, ret, rst)
......@@ -199,7 +210,8 @@ class DataSync(threading.Thread):
next_time = iter_now_time + period
continue
logging.info("Catch stop signal and begin to sync log")
print("\nCatch stop signal and begin to sync log, it will take a few minuts, please wait!")
print("\nCatch stop signal and begin to sync log, "
"it will take a few minuts, please wait!")
self.sync_data()
logging.info("Catch stop signal and sync log finished.")
print("Sync data finished!")
......@@ -59,12 +59,16 @@ def check_disk(output_path, min_space=1024):
disk_mp = get_mount_point(output_path)
free_space = get_disk_free_size(disk_mp)
if free_space < min_space * 1024 * 1024:
print("Less than %s of space free on disk %s." % (min_space, disk_mp))
logging.error("Less than %s of space free on disk %s.", min_space, disk_mp)
print("Less than %s of space free on disk %s." %
(min_space, disk_mp))
logging.error("Less than %s of space free on disk %s.",
min_space, disk_mp)
return -2
elif free_space < min_space * 1024 * 1024 * 5:
print("Less than 5 * %s of space free on disk %s." % (min_space, disk_mp))
logging.warn("Less than 5 * %s of space free on disk %s.", min_space, disk_mp)
print("Less than 5 * %s of space free on disk %s." %
(min_space, disk_mp))
logging.warn("Less than 5 * %s of space free on disk %s.",
min_space, disk_mp)
return -1
return 0
......@@ -99,7 +103,8 @@ def get_disk_usage_info(path):
vfs = os.statvfs(path)
free = (vfs[statvfs.F_BAVAIL] * vfs[statvfs.F_BSIZE])
total = (vfs[statvfs.F_BLOCKS] * vfs[statvfs.F_BSIZE])
used = (vfs[statvfs.F_BLOCKS] - vfs[statvfs.F_BFREE]) * vfs[statvfs.F_FRSIZE]
used = (vfs[statvfs.F_BLOCKS] - vfs[statvfs.F_BFREE]) * \
vfs[statvfs.F_FRSIZE]
try:
percent = (float(used) / total) * 100
except ZeroDivisionError:
......
......@@ -25,9 +25,11 @@ import os
class Logger(object):
"""The logger factory class. It is a template to help quickly create a log utility.
"""The logger factory class. It is a template to help quickly create a log
utility.
Attributes:
set_conf(log_file, use_stdout, log_level): this is a static method that returns a configured logger.
set_conf(log_file, use_stdout, log_level): this is a static method that
returns a configured logger.
get_logger(tag): this is a static method that returns a configured logger.
"""
__loggers = {}
......
......@@ -119,7 +119,8 @@ class Recorder(threading.Thread):
split_arg = "--split --size " \
+ str(self.recorder_opts.record_split_size)
else:
logging.error("Invalid compression parameter, only support duration or size")
logging.error("Invalid compression parameter, "
"only support duration or size")
return (-1, 'arguments error')
recordpath = roslib.packages.find_node('rosbag', 'record')
......@@ -141,12 +142,14 @@ class Recorder(threading.Thread):
cmd = cmd \
+ "--all" \
+ " " \
+ "--exclude " + "\'" + self.recorder_opts.record_topic_exclude_regex + "\'" \
+ "--exclude " + "\'" \
+ self.recorder_opts.record_topic_exclude_regex + "\'" \
+ " " \
+ if_quiet_arg
else:
cmd = cmd \
+ "--regex " + "\'" + self.recorder_opts.record_topic_match_regex + "\'" \
+ "--regex " + "\'" \
+ self.recorder_opts.record_topic_match_regex + "\'" \
+ " " \
+ if_quiet_arg
return (0, cmd)
......@@ -182,12 +185,14 @@ class Recorder(threading.Thread):
break
if not self.recorder_manager.record_enable:
os.killpg(self.record_process.pid, signal.SIGINT)
self.exitcode = -1024 # Stop record because of record has been disabled.
# Stop record because of record has been disabled.
self.exitcode = -1024
break
output = self.ssr.readline(0.1)
if output is not None:
logging.info("Record subprocess stream: %s", output)
if output in ['Aborted\n', 'Terminated\n'] and self.exitcode != -1024:
if (output in ['Aborted\n', 'Terminated\n'] and
self.exitcode != -1024):
os.killpg(self.record_process.pid, signal.SIGINT)
self.exitcode = -2
break
......@@ -198,7 +203,8 @@ class Recorder(threading.Thread):
self.exitcode = 0
break
logging.info("Record exit: stdout=%s, stderr=%s, errorcode=%s",
self.record_process.stdout, self.record_process.stderr, self.record_process.returncode)
self.record_process.stdout, self.record_process.stderr,
self.record_process.returncode)
def run(self):
"""Thread run."""
......@@ -208,4 +214,5 @@ class Recorder(threading.Thread):
logging.error("Record process exit with exception.")
self.exitcode = -1
self.exception = e
self.exc_traceback = ''.join(traceback.format_exception(*sys.exc_info()))
self.exc_traceback = ''.join(
traceback.format_exception(*sys.exc_info()))
......@@ -14,7 +14,7 @@
* limitations under the License.
*****************************************************************************/
#include "modules/data/recorder/info_collector.h"
#include "modules/data/util/info_collector.h"
#include <string>
......
......@@ -14,8 +14,8 @@
* limitations under the License.
*****************************************************************************/
#ifndef MODULES_DATA_RECORDER_INFO_COLLECTOR_H_
#define MODULES_DATA_RECORDER_INFO_COLLECTOR_H_
#ifndef MODULES_DATA_UTIL_INFO_COLLECTOR_H_
#define MODULES_DATA_UTIL_INFO_COLLECTOR_H_
#include "modules/data/proto/recorder_conf.pb.h"
#include "modules/data/proto/task.pb.h"
......@@ -54,4 +54,4 @@ class InfoCollector {
} // namespace data
} // namespace apollo
#endif // MODULES_DATA_RECORDER_INFO_COLLECTOR_H_
#endif // MODULES_DATA_UTIL_INFO_COLLECTOR_H_
......@@ -14,7 +14,7 @@
* limitations under the License.
*****************************************************************************/
#include "modules/data/recorder/info_collector.h"
#include "modules/data/util/info_collector.h"
#include "gtest/gtest.h"
......
......@@ -16,7 +16,7 @@
#include "gflags/gflags.h"
#include "modules/common/log.h"
#include "modules/data/recorder/info_collector.h"
#include "modules/data/util/info_collector.h"
DEFINE_string(commit_id, "", "Current commit ID.");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册