diff --git a/apollo.sh b/apollo.sh index 1bf72d78ebf448e3a854251ed24617231ac54f9e..2f37cb22b6d3d81c3205eaf68ccdf5fa8d53b8af 100755 --- a/apollo.sh +++ b/apollo.sh @@ -128,8 +128,7 @@ function build() { build_py_proto # Update task info template on compiling. - bazel-bin/modules/data/recorder/update_task_info \ - --commit_id=$(git rev-parse HEAD) + bazel-bin/modules/data/util/update_task_info --commit_id=$(git rev-parse HEAD) } function cibuild() { diff --git a/modules/data/proto/recorder_info.proto b/modules/data/proto/recorder_info.proto index 687c9e67cdbff5eba9775508e45f0ada83212eac..92c80643dc0f0d9094d8b028e1453b70e4d3e62c 100644 --- a/modules/data/proto/recorder_info.proto +++ b/modules/data/proto/recorder_info.proto @@ -1,6 +1,6 @@ syntax = "proto2"; -package apollo.data.data_recorder; +package apollo.data; message RecorderInfo { optional uint32 status = 1; diff --git a/modules/data/recorder/tool/data_recorder_control.sh b/modules/data/recorder/tool/data_recorder_control.sh deleted file mode 100755 index b7d54c3335462d002680a36c1889dc8ff900eec3..0000000000000000000000000000000000000000 --- a/modules/data/recorder/tool/data_recorder_control.sh +++ /dev/null @@ -1,111 +0,0 @@ -#!/bin/bash - -############################################################################### -# Copyright 2017 The Apollo Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -############################################################################### - -APOLLO_ROOT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )/../../../.." && pwd )" -cd ${APOLLO_ROOT_DIR} - -SCRIPT_NAME="data-recorder" -PWD=$(cd "$(dirname "$0")"; pwd) - -color_message() { - local message=$1 - local color=$2 - case "${color}" in - "BLUE" ) - echo -e "\033[34m>> ${message}\033[0m" - ;; - "GREEN" ) - echo -e "\033[32m>> ${message}\033[0m" - ;; - "RED" ) - echo -e "\033[31m>> ${message}\033[0m" - ;; - "YELLOW" ) - echo -e "\033[33m>> ${message}\033[0m" - ;; - "*" ) - echo -e "\033[31m>> Only support 'GREEN | RED |YELLOW'" && exit 1 - esac -} - -usage_start() { - color_message "Data Recorder:" "BLUE" - color_message "Usage: bash ${SCRIPT_NAME} start [debug | ads | collection]" "BLUE" - color_message "Version: ${SCRIPT_NAME} ${VERSION}." "BLUE" -} - -usage() { - color_message "Data Recorder:" "BLUE" - color_message "Usage: bash ${SCRIPT_NAME} [ start | stop | check-env | config ]" "BLUE" - color_message "Version: ${SCRIPT_NAME} ${VERSION}." "BLUE" -} - -start() { - local task_purpose=$1 - ps -ef|grep 'data_recorder_manager'|grep -v 'data_recorder_control'|grep -v 'grep' &>/dev/null - [ $? -eq 0 ] && printf "\033[31mThere is another data-recorder process running, if you want to stop that process, you can run following command:\nbash data_recorder_control.sh stop\n\033[0m" && exit 1 - python modules/data/recorder/tool/data_recorder_manager.py \ - -c modules/data/conf/recorder.${task_purpose}.yaml & - sleep 1 - ps -ef|grep 'data_recorder_manager'|grep -v 'grep' &>/dev/null - [ $? -eq 0 ] && echo "data-recorder start successfully." || echo "data-recorder start failed." -} - -stop() { - ps -ef|grep 'data_recorder_manager'|grep -v 'grep'|awk '{print $2}'|xargs kill - [ $? -eq 0 ] && echo "Stop data-recorder successfully" || echo "Stop data-recorder failed." - return 0 -} - -restart() { - stop - sleep 10 - start -} - -main() { - if [[ $# -lt 1 ]]; then - usage && exit 0 - fi - case "$1" in - start ) - if [ $# -eq 1 ];then - start "debug" - elif [ $# -eq 2 ]; then - case "$2" in - (debug|ads|collection) - start $2 ;; - (*) - usage_start && exit 0 - esac - else - usage_start && exit 0 - fi - ;; - stop ) - stop - ;; - config) - config - ;; - *) - usage && exit 1 - esac -} - -main "$@" diff --git a/modules/data/recorder/tool/README.md b/modules/data/tools/recorder/README.md similarity index 94% rename from modules/data/recorder/tool/README.md rename to modules/data/tools/recorder/README.md index ada9309876609ea9ab6bc79f6327ee4dcc10459f..c76abc79b7e480715c09eaa400b57d39b917bdf9 100644 --- a/modules/data/recorder/tool/README.md +++ b/modules/data/tools/recorder/README.md @@ -18,7 +18,7 @@ Modify the config file according to your system environments and data recording #### Start Data-Recorder. * bash data_reocrder_control.sh start # Start data-recoder with default task_purpose(debug). - * python modules/data/recorder/tool/data_recorder_manager.py -c modules/data/conf/recorder.debug.yaml # This is another way to start. + * python modules/data/tools/recorder/data_recorder_manager.py -c modules/data/conf/recorder.debug.yaml # This is another way to start. #### Stop Data-Recorder. * bash data-recorder_control.sh stop # stop data-recorder. diff --git a/modules/data/recorder/tool/config_parser.py b/modules/data/tools/recorder/config_parser.py similarity index 80% rename from modules/data/recorder/tool/config_parser.py rename to modules/data/tools/recorder/config_parser.py index c28742b9ae41432de50b5424f83bd70da70796ea..266fe5444d4bb573f05b2760d70ec39dda558cd9 100755 --- a/modules/data/recorder/tool/config_parser.py +++ b/modules/data/tools/recorder/config_parser.py @@ -42,7 +42,8 @@ class ConfigParser(object): stream = file(config_file, 'r') yaml_cf = yaml.safe_load(stream) except Exception as e: - logging.error("Load config file %s encounters error, %s", config_file, str(e)) + logging.error("Load config file %s encounters error, %s", + config_file, str(e)) else: logging.info("Load config file %s successfully", config_file) return yaml_cf @@ -62,21 +63,30 @@ class ConfigParser(object): self.vehicle['vehicle_tag'] = vehicle_required['vehicle_tag'] self.vehicle['vehicle_type'] = vehicle_required['vehicle_type'] except KeyError as e: - logging.error("vehicle_id, vehicle_type and vehicle_tag are required, %s", str(e)) + logging.error( + "vehicle_id, vehicle_type and vehicle_tag are required, %s", + str(e)) return -1 try: self.vehicle['vehicle_id'] = os.environ['CARID'] - logging.info("Get CARID from env variable successfully, CARID=%s", self.vehicle['vehicle_id']) + logging.info("Get CARID from env variable successfully, CARID=%s", + self.vehicle['vehicle_id']) except KeyError as e: - logging.warn("Get CARID from env variable failed, read carid from conf.") + logging.warn( + "Get CARID from env variable failed, read carid from conf.") if "optional" in vehicle: vehicle_optional = vehicle.get('optional') if vehicle_optional is not None: try: - self.vehicle['description'] = vehicle_optional['description'] + self.vehicle['description'] = \ + vehicle_optional['description'] except KeyError as e: - logging.warn("get vehicle optional field encounters error, %s", str(e)) - logging.info("get vehicle from yaml config file successfully, vehicle=%s", self.vehicle) + logging.warn( + "get vehicle optional field encounters error, %s", + str(e)) + logging.info( + "get vehicle from yaml config file successfully, vehicle=%s", + self.vehicle) return 0 def get_organization_from_yaml(self, yaml_config): @@ -87,7 +97,8 @@ class ConfigParser(object): return -1 org_required = org.get('required') if org_required is None: - logging.error("Config file must contain organization required field.") + logging.error( + "Config file must contain organization required field.") return -1 try: self.organization['name'] = org_required['name'] @@ -99,11 +110,14 @@ class ConfigParser(object): org_optional = org.get('optional') if org_optional is not None: try: - self.organization['description'] = org_optional['description'] + self.organization['description'] = \ + org_optional['description'] except KeyError as e: - logging.warn("Get organization optional field encounters error, %s", str(e)) - logging.info("Get organization from yaml config file successfully, organization=%s", - self.organization) + logging.warn( + "Get organization optional field encounters error, %s", + str(e)) + logging.info("Get organization from yaml config file successfully, " + "organization=%s", self.organization) return 0 def get_datatype_from_yaml(self, yaml_config): @@ -119,7 +133,9 @@ class ConfigParser(object): data_type = data.get('data_type') if data_type is not None: self.data_type = data_type - logging.info("Get data_type from yaml config file successfully, data_type=%s", self.data_type) + logging.info( + "Get data_type from yaml config file successfully, data_type=%s", + self.data_type) return 0 def get_global_config(self, yaml_config): diff --git a/modules/data/tools/recorder/data_recorder_control.sh b/modules/data/tools/recorder/data_recorder_control.sh new file mode 100755 index 0000000000000000000000000000000000000000..2296370f5a633b02a7cf0e9cbb105890ac17c674 --- /dev/null +++ b/modules/data/tools/recorder/data_recorder_control.sh @@ -0,0 +1,119 @@ +#!/bin/bash + +############################################################################### +# Copyright 2017 The Apollo Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### + +APOLLO_ROOT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )/../../../.." && pwd )" +cd ${APOLLO_ROOT_DIR} + +SCRIPT_NAME="$(basename $0)" +PWD=$(cd "$(dirname "$0")"; pwd) + +color_message() { + local message=$1 + local color=$2 + case "${color}" in + "BLUE" ) + echo -e "\033[34m>> ${message}\033[0m" + ;; + "GREEN" ) + echo -e "\033[32m>> ${message}\033[0m" + ;; + "RED" ) + echo -e "\033[31m>> ${message}\033[0m" + ;; + "YELLOW" ) + echo -e "\033[33m>> ${message}\033[0m" + ;; + "*" ) + echo -e "\033[31m>> Only support 'GREEN | RED |YELLOW'" && exit 1 + esac +} + +usage_start() { + color_message "Data Recorder:" "BLUE" + color_message \ + "Usage: bash ${SCRIPT_NAME} start [debug | ads | collection]" "BLUE" + color_message "Version: ${SCRIPT_NAME} ${VERSION}." "BLUE" +} + +usage() { + color_message "Data Recorder:" "BLUE" + color_message \ + "Usage: bash ${SCRIPT_NAME} [ start | stop | check-env | config ]" "BLUE" + color_message "Version: ${SCRIPT_NAME} ${VERSION}." "BLUE" +} + +start() { + local task_purpose=$1 + ps -ef | grep 'data_recorder_manager' | grep -v 'data_recorder_control' | \ + grep -v 'grep' &>/dev/null + [ $? -eq 0 ] && printf "\033[31mThere is another data-recorder process \ +running, if you want to stop that process, you can run following command:\n\ +bash data_recorder_control.sh stop\n\033[0m" && exit 1 + python modules/data/tools/recorder/data_recorder_manager.py \ + -c modules/data/conf/recorder.${task_purpose}.yaml & + sleep 1 + ps -ef|grep 'data_recorder_manager'|grep -v 'grep' &>/dev/null + [ $? -eq 0 ] && echo "data-recorder start successfully." || \ + echo "data-recorder start failed." +} + +stop() { + ps -ef | grep 'data_recorder_manager' | grep -v 'grep' | awk '{print $2}' | \ + xargs kill + [ $? -eq 0 ] && echo "Stop data-recorder successfully" || \ + echo "Stop data-recorder failed." + return 0 +} + +restart() { + stop + sleep 10 + start +} + +main() { + if [[ $# -lt 1 ]]; then + usage && exit 0 + fi + case "$1" in + start ) + if [ $# -eq 1 ];then + start "debug" + elif [ $# -eq 2 ]; then + case "$2" in + (debug|ads|collection) + start $2 ;; + (*) + usage_start && exit 0 + esac + else + usage_start && exit 0 + fi + ;; + stop ) + stop + ;; + config) + config + ;; + *) + usage && exit 1 + esac +} + +main "$@" diff --git a/modules/data/recorder/tool/data_recorder_manager.py b/modules/data/tools/recorder/data_recorder_manager.py similarity index 78% rename from modules/data/recorder/tool/data_recorder_manager.py rename to modules/data/tools/recorder/data_recorder_manager.py index 61c4ff137ebb29e455f7a9d284fa7f002631600c..bb6a460c44d0b66347651891d27a4abc81d2b8c0 100755 --- a/modules/data/recorder/tool/data_recorder_manager.py +++ b/modules/data/tools/recorder/data_recorder_manager.py @@ -78,7 +78,8 @@ class DataRecorderManager(object): def init(self): self.recorder_status = recorder_info_pb2.DATA_RECORDER_INIT self.cmd_topic = self.conf_reader.data_args.get("recorder_cmd_topic") - self.status_topic = self.conf_reader.data_args.get("recorder_status_topic") + self.status_topic = self.conf_reader.data_args.get( + "recorder_status_topic") self.pub = rospy.Publisher(self.status_topic, String, queue_size=100) signal.signal(signal.SIGINT, self.shutdown_hook) signal.signal(signal.SIGTERM, self.shutdown_hook) @@ -106,7 +107,8 @@ class DataRecorderManager(object): def create_task_id(self): """Recorder init, create_task_id.""" - if disk_handle.check_disk(self.conf_reader.data_args.get('output_path')) == -2: + if disk_handle.check_disk( + self.conf_reader.data_args.get('output_path')) == -2: return -1 self.create_backup_id() self.output_directory = self.conf_reader.data_args.get('output_path') \ @@ -127,13 +129,20 @@ class DataRecorderManager(object): { 'basic': { 'data_recorder_version': G_VERSION, - 'organization_name': self.conf_reader.organization.get('name'), - 'organization_website': self.conf_reader.organization.get('website'), - 'organization_description': self.conf_reader.organization.get('description'), - 'vehicle_id': self.conf_reader.vehicle.get('vehicle_id'), - 'vehicle_type': self.conf_reader.vehicle.get('vehicle_type'), - 'vehicle_tag': self.conf_reader.vehicle.get('vehicle_tag'), - 'vehicle_description': self.conf_reader.vehicle.get('description'), + 'organization_name': \ + self.conf_reader.organization.get('name'), + 'organization_website': \ + self.conf_reader.organization.get('website'), + 'organization_description': \ + self.conf_reader.organization.get('description'), + 'vehicle_id': \ + self.conf_reader.vehicle.get('vehicle_id'), + 'vehicle_type': \ + self.conf_reader.vehicle.get('vehicle_type'), + 'vehicle_tag': \ + self.conf_reader.vehicle.get('vehicle_tag'), + 'vehicle_description': \ + self.conf_reader.vehicle.get('description'), 'task_purpose': self.conf_reader.task_purpose, 'system_uptime': self.get_system_uptime() } @@ -143,9 +152,10 @@ class DataRecorderManager(object): meta_ext = self.conf_reader.data_args.get('meta_extension') meta_extension = (meta_ext if meta_ext == 'ini' or meta_ext == 'json' else "ini") - ret = meta_manager.create_meta(self.output_directory + '/meta/recorder.' + meta_extension, - meta_extension, - recorder_meta) + ret = meta_manager.create_meta( + self.output_directory + '/meta/recorder.' + meta_extension, + meta_extension, + recorder_meta) if ret != 0: return -1 self.update_link() @@ -164,9 +174,11 @@ class DataRecorderManager(object): def listener_callback(self, data): """Listener callback.""" - logging.info("Receive message from %s, data=%s", self.cmd_topic, data.data) + logging.info("Receive message from %s, data=%s", + self.cmd_topic, data.data) if self.latest_cmdtime is not None: - if (datetime.datetime.now() - self.latest_cmdtime) < datetime.timedelta(seconds=10): + if (datetime.datetime.now() - self.latest_cmdtime) < \ + datetime.timedelta(seconds=10): print_message = ( "The interval between two consecutive operation" " must not be less than 10 seconds! Thanks!" @@ -208,7 +220,8 @@ class DataRecorderManager(object): """Sync data.""" src = self.conf_reader.task_data_args[data]['data_property']['src'] dst = self.conf_reader.task_data_args[data]['data_property']['dst'] - limit = self.conf_reader.task_data_args[data]['action_args']['sync_bwlimit'] + limit = \ + self.conf_reader.task_data_args[data]['action_args']['sync_bwlimit'] src = src if src.endswith('/') else src + "/" dst = self.output_directory + "/" + dst + "/" cmd = "mkdir -p " + dst \ @@ -223,10 +236,12 @@ class DataRecorderManager(object): out, err = process.communicate() errcode = process.returncode if errcode == 0: - logging.info("Sync %s to %s successfully after creating task_id", src, dst) + logging.info("Sync %s to %s successfully after creating task_id", + src, dst) else: logging.error("Sync data failed after creating task_id, \ - cmd=%s, stdout=%s, stderr=%s, errcode=%s", cmd, out, err, errcode) + cmd=%s, stdout=%s, stderr=%s, errcode=%s", + cmd, out, err, errcode) def publish_recorder_info(self, event): """Publish data recorder status.""" @@ -235,8 +250,10 @@ class DataRecorderManager(object): task = recorder_info_pb2.Task() task_id = os.path.split(os.path.abspath(self.output_directory))[1] task.id = task_id - datetime_task_start = datetime.datetime.strptime(task_id.split('_')[1], '%Y%m%d%H%M%S') - task.duration = (datetime.datetime.now() - datetime_task_start).seconds + datetime_task_start = datetime.datetime.strptime( + task_id.split('_')[1], '%Y%m%d%H%M%S') + task.duration = (datetime.datetime.now() - datetime_task_start + ).seconds resources = recorder_info_pb2.Resources() disk_partitions = disk_handle.get_disk_partitions_info() for dp in disk_partitions: @@ -250,23 +267,33 @@ class DataRecorderManager(object): disk.avail = float(db_usage.free) disk.use_percent = float(db_usage.percent) disk.mount = dp.mountpoint - if disk.mount == disk_handle.get_mount_point(self.conf_reader.data_args.get('output_path')): + if disk.mount == disk_handle.get_mount_point( + self.conf_reader.data_args.get('output_path')): info.writing_disk.CopyFrom(disk) - if disk_handle.check_disk(self.conf_reader.data_args.get('output_path')) == 0: - self.recorder_status &= (~recorder_info_pb2.DISK_SPACE_WARNNING) - self.recorder_status &= (~recorder_info_pb2.DISK_SPACE_ALERT) - if disk_handle.check_disk(self.conf_reader.data_args.get('output_path')) == -1: - self.recorder_status |= recorder_info_pb2.DISK_SPACE_WARNNING - if disk_handle.check_disk(self.conf_reader.data_args.get('output_path')) == -2: - self.recorder_status |= recorder_info_pb2.DISK_SPACE_WARNNING - self.recorder_status |= recorder_info_pb2.DISK_SPACE_ALERT + if disk_handle.check_disk( + self.conf_reader.data_args.get('output_path')) == 0: + self.recorder_status &= ( + ~recorder_info_pb2.DISK_SPACE_WARNNING) + self.recorder_status &= ( + ~recorder_info_pb2.DISK_SPACE_ALERT) + if disk_handle.check_disk( + self.conf_reader.data_args.get('output_path')) == -1: + self.recorder_status |= \ + recorder_info_pb2.DISK_SPACE_WARNNING + if disk_handle.check_disk( + self.conf_reader.data_args.get('output_path')) == -2: + self.recorder_status |= \ + recorder_info_pb2.DISK_SPACE_WARNNING + self.recorder_status |= \ + recorder_info_pb2.DISK_SPACE_ALERT if self.rlock.acquire(): self.record_enable = False self.sync_enable = False self.rlock.release() if not self.sync_enable and self.rlock.acquire(): - self.recorder_status &= (~recorder_info_pb2.DATA_SYNC_ENABLE) + self.recorder_status &= ( + ~recorder_info_pb2.DATA_SYNC_ENABLE) self.rlock.release() data = recorder_info_pb2.Data() info.status = self.recorder_status @@ -308,8 +335,10 @@ class DataRecorderManager(object): rosbag_buffer_size = rosbag_args['action_args']['rosbag_buffer_size'] rosbag_chunk_size = rosbag_args['action_args']['rosbag_chunk_size'] rosbag_split = rosbag_args['action_args']['rosbag_split'] - rosbag_split_duration = rosbag_args['action_args']['rosbag_split_duration'] - rosbag_compress_type = rosbag_args['action_args']['rosbag_compress_type'] + rosbag_split_duration = \ + rosbag_args['action_args']['rosbag_split_duration'] + rosbag_compress_type = \ + rosbag_args['action_args']['rosbag_compress_type'] rosbag_groups = rosbag_args['action_args']['rosbag_topic_group'] rosbag_path = self.output_directory + "/" + rosbag_dst record_instance = () @@ -318,7 +347,8 @@ class DataRecorderManager(object): group_name = group['group_name'] group_topic_match_regex = group['group_topic_match_re'] group_topic_exclude_regex = group['group_topic_exclude_re'] - prefix = "rosbag_" + self.conf_reader.vehicle['vehicle_id'] + "_" + group_name + prefix = "rosbag_" + \ + self.conf_reader.vehicle['vehicle_id'] + "_" + group_name opts = recorder.RecorderOptions() opts.record_path = rosbag_path opts.record_prefix = prefix @@ -345,7 +375,8 @@ class DataRecorderManager(object): self.recorder_status |= 7 # running && record && sync self.listener() - timer_publish = rospy.Timer(rospy.Duration(2), self.publish_recorder_info) + timer_publish = rospy.Timer(rospy.Duration(2), + self.publish_recorder_info) while not rospy.is_shutdown(): rospy.sleep(1) is_alive = False @@ -360,21 +391,28 @@ class DataRecorderManager(object): if record_pid.exitcode == -1024: del self.recorder_list[index] break - if record_pid.exitcode is not None and not record_pid.is_alive(): - logging.warn("Record group %s is not alive, try to restart.", record_name) + if (record_pid.exitcode is not None and + not record_pid.is_alive()): + logging.warn( + "Record group %s is not alive, try to restart.", + record_name) try: record_pid.start() except Exception as e: - logging.warn("Record group %s is not alive, restart, %s", record_name, str(e)) + logging.warn( + "Record group %s is not alive, restart, %s", + record_name, str(e)) if record_pid.exitcode is not None and record_pid.exitcode != 0: logging.info("Record exit, name=%s, exitcode=%s", record_name, str(record_pid.exitcode)) - logging.warn("Record group %s is terminated with exception, try to restart.", record_name) + logging.warn("Record group %s is terminated with " + "exception, try to restart.", record_name) record = recorder.Recorder(self, record_opts) record.start() del self.recorder_list[index] - self.recorder_list.append((record_name, record, record_opts)) + self.recorder_list.append( + (record_name, record, record_opts)) for worker in self.worker_list: is_alive = is_alive or worker.isAlive() if not is_alive: @@ -416,7 +454,8 @@ def launch_data_recorder(cp): def main(): """main function""" - usage = "python modules/data/recorder/tool/data_recorder_manager.py -c modules/data/conf/recorder.conf" + usage = ("python modules/data/tools/recorder/data_recorder_manager.py -c " + "modules/data/conf/recorder.conf") parser = optparse.OptionParser(usage) parser.add_option("-c", "--conf", dest = "conf_file", @@ -431,17 +470,20 @@ def main(): if len(sys.argv) == 1: parser.error("Incorrect numbers of arguments") if len(args): - parser.error("Incorrect numbers of arguments, please type python data_recorder.py -h") + parser.error("Incorrect numbers of arguments, " + "please type python data_recorder.py -h") if options.my_version: return print_version(G_VERSION) if options.conf_file is not None: if not os.path.exists(options.conf_file): - parser.error("The config file you given does not exists, please check!") + parser.error( + "The config file you given does not exists, please check!") else: cp =config_parser.ConfigParser() - global_conf = cp.load_config("modules/data/conf/recorder.global.yaml") + global_conf = cp.load_config( + "modules/data/conf/recorder.global.yaml") task_conf = cp.load_config(options.conf_file) if global_conf is None: print("Load recorder.global.yaml error!") @@ -450,10 +492,13 @@ def main(): print("Load recorder.debug.yaml error!") sys.exit(-1) if not cp.get_global_config(global_conf) == 0: - print("Get global parameters from modules/data/conf/recorder.global.yaml encounters error!") + print("Get global parameters from " + "modules/data/conf/recorder.global.yaml " + "encounters error!") sys.exit(-1) if not cp.get_task_from_yaml(task_conf) == 0: - print("Get task parameters from %s encounters error!" % (options.conf_file)) + print("Get task parameters from %s encounters error!" % ( + options.conf_file)) rospy.init_node('data_recorder', anonymous=False) launch_data_recorder(cp) diff --git a/modules/data/recorder/tool/data_sync.py b/modules/data/tools/recorder/data_sync.py similarity index 86% rename from modules/data/recorder/tool/data_sync.py rename to modules/data/tools/recorder/data_sync.py index eec50c7f533cd5b3f6582ada319d6a0e03cab931..a6b09b327bd4dca1b70836ea1a5bace930e56d45 100755 --- a/modules/data/recorder/tool/data_sync.py +++ b/modules/data/tools/recorder/data_sync.py @@ -68,8 +68,8 @@ class DataSync(threading.Thread): sync_src = src if src.endswith('/') else src + "/" sync_dst = self.recorder_manager.output_directory + "/" + dst + "/" cmd = "mkdir -p " + sync_dst \ - + " && echo \"rootpass\n\" |sudo -S /usr/bin/rsync -auvrtzopgP" \ - + " --bwlimit=" \ + + " && echo \"rootpass\n\" |sudo -S /usr/bin/rsync " \ + + "-auvrtzopgP --bwlimit=" \ + str(limit) \ + " " \ + sync_src \ @@ -87,20 +87,24 @@ class DataSync(threading.Thread): args=(backup=%s, limit=%s, with_remove=%s)", sync_src, sync_dst, backup, str(limit), with_remove) else: - logging.error("Sync %s to %s failed, cmd=%s, stderr=%s, errorcode=%s", + logging.error( + "Sync %s to %s failed, cmd=%s, stderr=%s, errorcode=%s", sync_src, sync_dst, cmd, err, errcode) return if not with_remove: return # backup and remove. - cmd = "echo \"rootpass\n\"|sudo -S find " + sync_src + " -mmin +1 -type f" - process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmd = "echo \"rootpass\n\"|sudo -S find " + sync_src + \ + " -mmin +1 -type f" + process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) out, err = process.communicate() errcode = process.returncode logging.info("Find files in src=%s, stdout=%s, stderr=%s, errorcode=%s", sync_src, out, err, errcode) if errcode != 0: - logging.error("Find files failed, cmd=%s, stdout=%s, stderr=%s, errorcode=%s", + logging.error( + "Find files failed, cmd=%s, stdout=%s, stderr=%s, errorcode=%s", cmd, out, err, errcode) return for f in out.splitlines(): @@ -126,7 +130,8 @@ class DataSync(threading.Thread): sync_src_tmp = sync_src.rstrip('/') backup_dst = backup \ + "/" \ - + os.path.split(os.path.abspath(f))[0].replace(sync_src_tmp, '') + + os.path.split( + os.path.abspath(f))[0].replace(sync_src_tmp, '') cmd = "mkdir -p " + backup_dst \ + " && echo \"rootpass\n\" |sudo -S mv " \ + f \ @@ -140,11 +145,13 @@ class DataSync(threading.Thread): out, err = process.communicate() errcode = process.returncode if errcode == 0: - logging.info("Sync %s to %s with remove source files successfully", + logging.info( + "Sync %s to %s with remove source files successfully", f, backup_dst) else: logging.info("Sync @cmptnode %s to %s with remove failed, \ - cmd=%s, stderr=%s, errorcode=%s", f, backup_dst, cmd, err, errcode) + cmd=%s, stderr=%s, errorcode=%s", + f, backup_dst, cmd, err, errcode) def sync_data(self): """Sync data.""" @@ -160,23 +167,27 @@ class DataSync(threading.Thread): dst = data_args.get('data_property').get('dst') bwlimit = data_args.get('action_args').get('sync_bwlimit') if data_args['action_args']['with_remove']: - backup_dir = self.recorder_manager.backup_directory + "/" + dst - self.do_sync(src, dst, backup=backup_dir, limit=bwlimit, with_remove=True) + backup_dir = self.recorder_manager.backup_directory + "/" \ + + dst + self.do_sync(src, dst, backup=backup_dir, limit=bwlimit, + with_remove=True) else: - self.do_sync(src, dst, backup=None, limit=bwlimit, with_remove=False) + self.do_sync(src, dst, backup=None, limit=bwlimit, + with_remove=False) def clean_backup(self): """Clean backup dir.""" backup = self.conf_reader.data_args.get('backup_path') backup_days = self.conf_reader.data_args.get('backup_keep_days') backup_max_size = self.conf_reader.data_args.get('backup_max_size') - cmd = "cd " + backup + " && find . -maxdepth 1 -ctime +" + str(backup_days) \ - + " |xargs -i rm -rf {}" + cmd = "cd " + backup + " && find . -maxdepth 1 -ctime +" + str( + backup_days) + " |xargs -i rm -rf {}" ret, rst = commands.getstatusoutput(cmd) logging.info("cmd=%s, return=%s, result=%s", cmd, ret, rst) backup_size = disk_handle.get_folder_size(backup) while (backup_size > int(backup_max_size) * 1024 * 1024): - cmd = "cd " + backup + " && find -type f -printf \'%p\n\' | sort|head -n 5| xargs -i rm {}" + cmd = "cd " + backup + " && find -type f -printf \'%p\n\' | " + \ + "sort|head -n 5| xargs -i rm {}" try: ret, rst = commands.getstatusoutput(cmd) logging.info("cmd=%s, return=%s, result=%s", cmd, ret, rst) @@ -199,7 +210,8 @@ class DataSync(threading.Thread): next_time = iter_now_time + period continue logging.info("Catch stop signal and begin to sync log") - print("\nCatch stop signal and begin to sync log, it will take a few minuts, please wait!") + print("\nCatch stop signal and begin to sync log, " + "it will take a few minuts, please wait!") self.sync_data() logging.info("Catch stop signal and sync log finished.") print("Sync data finished!") diff --git a/modules/data/recorder/tool/disk_handle.py b/modules/data/tools/recorder/disk_handle.py similarity index 89% rename from modules/data/recorder/tool/disk_handle.py rename to modules/data/tools/recorder/disk_handle.py index a8f9ba5b645ee318290c8db1158f232d905946fb..6a6c03342cfcb47c197cb66b1c71eaf30f428ad4 100755 --- a/modules/data/recorder/tool/disk_handle.py +++ b/modules/data/tools/recorder/disk_handle.py @@ -59,12 +59,16 @@ def check_disk(output_path, min_space=1024): disk_mp = get_mount_point(output_path) free_space = get_disk_free_size(disk_mp) if free_space < min_space * 1024 * 1024: - print("Less than %s of space free on disk %s." % (min_space, disk_mp)) - logging.error("Less than %s of space free on disk %s.", min_space, disk_mp) + print("Less than %s of space free on disk %s." % + (min_space, disk_mp)) + logging.error("Less than %s of space free on disk %s.", + min_space, disk_mp) return -2 elif free_space < min_space * 1024 * 1024 * 5: - print("Less than 5 * %s of space free on disk %s." % (min_space, disk_mp)) - logging.warn("Less than 5 * %s of space free on disk %s.", min_space, disk_mp) + print("Less than 5 * %s of space free on disk %s." % + (min_space, disk_mp)) + logging.warn("Less than 5 * %s of space free on disk %s.", + min_space, disk_mp) return -1 return 0 @@ -99,7 +103,8 @@ def get_disk_usage_info(path): vfs = os.statvfs(path) free = (vfs[statvfs.F_BAVAIL] * vfs[statvfs.F_BSIZE]) total = (vfs[statvfs.F_BLOCKS] * vfs[statvfs.F_BSIZE]) - used = (vfs[statvfs.F_BLOCKS] - vfs[statvfs.F_BFREE]) * vfs[statvfs.F_FRSIZE] + used = (vfs[statvfs.F_BLOCKS] - vfs[statvfs.F_BFREE]) * \ + vfs[statvfs.F_FRSIZE] try: percent = (float(used) / total) * 100 except ZeroDivisionError: diff --git a/modules/data/recorder/tool/logger.py b/modules/data/tools/recorder/logger.py old mode 100644 new mode 100755 similarity index 97% rename from modules/data/recorder/tool/logger.py rename to modules/data/tools/recorder/logger.py index 1965a75aedb444ace48d4b79ddef06593afc6dc9..94a91b6f556f36421f2515c3f5f04b0f13cd5243 --- a/modules/data/recorder/tool/logger.py +++ b/modules/data/tools/recorder/logger.py @@ -25,9 +25,11 @@ import os class Logger(object): - """The logger factory class. It is a template to help quickly create a log utility. + """The logger factory class. It is a template to help quickly create a log + utility. Attributes: - set_conf(log_file, use_stdout, log_level): this is a static method that returns a configured logger. + set_conf(log_file, use_stdout, log_level): this is a static method that + returns a configured logger. get_logger(tag): this is a static method that returns a configured logger. """ __loggers = {} diff --git a/modules/data/recorder/tool/meta_manager.py b/modules/data/tools/recorder/meta_manager.py similarity index 100% rename from modules/data/recorder/tool/meta_manager.py rename to modules/data/tools/recorder/meta_manager.py diff --git a/modules/data/recorder/tool/recorder.py b/modules/data/tools/recorder/recorder.py similarity index 90% rename from modules/data/recorder/tool/recorder.py rename to modules/data/tools/recorder/recorder.py index c7c2850ccf2ab00340172e923d2f830d97ce55af..73b3b456dbed10bdfd666af7f68f89742e860cca 100755 --- a/modules/data/recorder/tool/recorder.py +++ b/modules/data/tools/recorder/recorder.py @@ -119,7 +119,8 @@ class Recorder(threading.Thread): split_arg = "--split --size " \ + str(self.recorder_opts.record_split_size) else: - logging.error("Invalid compression parameter, only support duration or size") + logging.error("Invalid compression parameter, " + "only support duration or size") return (-1, 'arguments error') recordpath = roslib.packages.find_node('rosbag', 'record') @@ -141,12 +142,14 @@ class Recorder(threading.Thread): cmd = cmd \ + "--all" \ + " " \ - + "--exclude " + "\'" + self.recorder_opts.record_topic_exclude_regex + "\'" \ + + "--exclude " + "\'" \ + + self.recorder_opts.record_topic_exclude_regex + "\'" \ + " " \ + if_quiet_arg else: cmd = cmd \ - + "--regex " + "\'" + self.recorder_opts.record_topic_match_regex + "\'" \ + + "--regex " + "\'" \ + + self.recorder_opts.record_topic_match_regex + "\'" \ + " " \ + if_quiet_arg return (0, cmd) @@ -182,12 +185,14 @@ class Recorder(threading.Thread): break if not self.recorder_manager.record_enable: os.killpg(self.record_process.pid, signal.SIGINT) - self.exitcode = -1024 # Stop record because of record has been disabled. + # Stop record because of record has been disabled. + self.exitcode = -1024 break output = self.ssr.readline(0.1) if output is not None: logging.info("Record subprocess stream: %s", output) - if output in ['Aborted\n', 'Terminated\n'] and self.exitcode != -1024: + if (output in ['Aborted\n', 'Terminated\n'] and + self.exitcode != -1024): os.killpg(self.record_process.pid, signal.SIGINT) self.exitcode = -2 break @@ -198,7 +203,8 @@ class Recorder(threading.Thread): self.exitcode = 0 break logging.info("Record exit: stdout=%s, stderr=%s, errorcode=%s", - self.record_process.stdout, self.record_process.stderr, self.record_process.returncode) + self.record_process.stdout, self.record_process.stderr, + self.record_process.returncode) def run(self): """Thread run.""" @@ -208,4 +214,5 @@ class Recorder(threading.Thread): logging.error("Record process exit with exception.") self.exitcode = -1 self.exception = e - self.exc_traceback = ''.join(traceback.format_exception(*sys.exc_info())) + self.exc_traceback = ''.join( + traceback.format_exception(*sys.exc_info())) diff --git a/modules/data/recorder/BUILD b/modules/data/util/BUILD similarity index 100% rename from modules/data/recorder/BUILD rename to modules/data/util/BUILD diff --git a/modules/data/recorder/info_collector.cc b/modules/data/util/info_collector.cc similarity index 98% rename from modules/data/recorder/info_collector.cc rename to modules/data/util/info_collector.cc index a3400817aeec04a6efa33a1c1c4c819a22b2d8dc..744b77e3c4a79cdd9f7e1eaff669dcb6d90a73b0 100644 --- a/modules/data/recorder/info_collector.cc +++ b/modules/data/util/info_collector.cc @@ -14,7 +14,7 @@ * limitations under the License. *****************************************************************************/ -#include "modules/data/recorder/info_collector.h" +#include "modules/data/util/info_collector.h" #include diff --git a/modules/data/recorder/info_collector.h b/modules/data/util/info_collector.h similarity index 91% rename from modules/data/recorder/info_collector.h rename to modules/data/util/info_collector.h index b34018bee5f03a741e39e7544e5820dff7e2638b..87319108dfa9d738db3804862efba44a1a30fb62 100644 --- a/modules/data/recorder/info_collector.h +++ b/modules/data/util/info_collector.h @@ -14,8 +14,8 @@ * limitations under the License. *****************************************************************************/ -#ifndef MODULES_DATA_RECORDER_INFO_COLLECTOR_H_ -#define MODULES_DATA_RECORDER_INFO_COLLECTOR_H_ +#ifndef MODULES_DATA_UTIL_INFO_COLLECTOR_H_ +#define MODULES_DATA_UTIL_INFO_COLLECTOR_H_ #include "modules/data/proto/recorder_conf.pb.h" #include "modules/data/proto/task.pb.h" @@ -54,4 +54,4 @@ class InfoCollector { } // namespace data } // namespace apollo -#endif // MODULES_DATA_RECORDER_INFO_COLLECTOR_H_ +#endif // MODULES_DATA_UTIL_INFO_COLLECTOR_H_ diff --git a/modules/data/recorder/info_collector_test.cc b/modules/data/util/info_collector_test.cc similarity index 95% rename from modules/data/recorder/info_collector_test.cc rename to modules/data/util/info_collector_test.cc index 544fe6f87271876d3f932841e150db4f27d0da91..88b9004a1d689657b0ba11db93527986e766fb21 100644 --- a/modules/data/recorder/info_collector_test.cc +++ b/modules/data/util/info_collector_test.cc @@ -14,7 +14,7 @@ * limitations under the License. *****************************************************************************/ -#include "modules/data/recorder/info_collector.h" +#include "modules/data/util/info_collector.h" #include "gtest/gtest.h" diff --git a/modules/data/recorder/update_task_info.cc b/modules/data/util/update_task_info.cc similarity index 96% rename from modules/data/recorder/update_task_info.cc rename to modules/data/util/update_task_info.cc index d92d06feb81e247fbef84888cc716485403f89a5..59e5cc201be40fa4c2e3ddc2eb28c2af5de21b69 100644 --- a/modules/data/recorder/update_task_info.cc +++ b/modules/data/util/update_task_info.cc @@ -16,7 +16,7 @@ #include "gflags/gflags.h" #include "modules/common/log.h" -#include "modules/data/recorder/info_collector.h" +#include "modules/data/util/info_collector.h" DEFINE_string(commit_id, "", "Current commit ID.");