提交 d8dab161 编写于 作者: J jiangyifei 提交者: Calvin Miao

navi: updated obstacle decider and path decider.

上级 f1608821
#!/usr/bin/env python
###############################################################################
# Copyright 2017 The Apollo Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
class ADVehicle:
def __init__(self):
self._chassis_pb = None
self._localization_pb = None
self.front_edge_to_center = 3.89
self.back_edge_to_center = 1.043
self.left_edge_to_center = 1.055
self.right_edge_to_center = 1.055
self.speed_mps = None
self.x = None
self.y = None
self.heading = None
def update_chassis(self, chassis_pb):
self._chassis_pb = chassis_pb
self.speed_mps = self._chassis_pb.speed_mps
def update_localization(self, localization_pb):
self._localization_pb = localization_pb
self.x = self._localization_pb.pose.position.x
self.y = self._localization_pb.pose.position.y
self.heading = self._localization_pb.pose.heading
def is_ready(self):
if self._chassis_pb is None or self._localization_pb is None:
return False
return True
#!/usr/bin/env python
###############################################################################
# Copyright 2017 The Apollo Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
class LocalPath:
def __init__(self, points):
self.points = points
def init_y(self):
if len(self.points) > 0:
return self.points[0][1]
return None
def get_xy(self):
x = []
y = []
for p in self.points:
x.append(p[0])
y.append(p[1])
return x, y
def range(self):
return len(self.points) - 1
def shift(self, dist):
for i in range(len(self.points)):
self.points[i][1] += dist
def cut(self, dist):
pass
def resample(self):
pass
def merge(self, local_path, weight):
for i in range(len(self.points)):
y = self.points[i][1]
if i < len(local_path.points):
y2 = local_path.points[i][1] * weight
self.points[i][1] = (y + y2) / (1 + weight)
......@@ -36,6 +36,7 @@ from provider_mobileye import MobileyeProvider
from provider_chassis import ChassisProvider
from provider_localization import LocalizationProvider
from provider_routing import RoutingProvider
from ad_vehicle import ADVehicle
gflags.DEFINE_integer('max_cruise_speed', 20,
'max speed for cruising in meter per second')
......@@ -58,9 +59,8 @@ path_decider = None
speed_decider = None
traj_generator = None
mobileye_provider = None
chassis_provider = None
localization_provider = None
routing_provider = None
adv = None
def routing_callback(routing_str):
......@@ -68,30 +68,27 @@ def routing_callback(routing_str):
def localization_callback(localization_pb):
localization_provider.update(localization_pb)
adv.update_localization(localization_pb)
def chassis_callback(chassis_pb):
chassis_provider.update(chassis_pb)
adv.update_chassis(chassis_pb)
def mobileye_callback(mobileye_pb):
start_timestamp = time.time()
if localization_provider.localization_pb is None:
return
if chassis_provider.chassis_pb is None:
def mobileye_callback2(mobileye_pb):
if not adv.is_ready():
return
start_timestamp = time.time()
mobileye_provider.update(mobileye_pb)
mobileye_provider.process_obstacles()
path_x, path_y, path_length = path_decider.get(mobileye_provider,
routing_provider,
localization_provider,
chassis_provider)
adv)
speed, final_path_length = speed_decider.get(mobileye_provider,
chassis_provider,
adv,
path_length)
adc_trajectory = traj_generator.generate(path_x, path_y, final_path_length,
......@@ -101,11 +98,33 @@ def mobileye_callback(mobileye_pb):
log_file.write("duration: " + str(time.time() - start_timestamp) + "\n")
def mobileye_callback(mobileye_pb):
if not adv.is_ready():
return
start_timestamp = time.time()
mobileye_provider.update(mobileye_pb)
mobileye_provider.process_obstacles()
path = path_decider.get_path(mobileye_provider, routing_provider, adv)
speed, final_path_length = speed_decider.get(mobileye_provider,
adv,
path.range())
adc_trajectory = traj_generator.generate(path, final_path_length,
speed,
start_timestamp=start_timestamp)
planning_pub.publish(adc_trajectory)
log_file.write("duration: " + str(time.time() - start_timestamp) + "\n")
def init():
global planning_pub, log_file
global path_decider, speed_decider, traj_generator
global mobileye_provider, chassis_provider
global localization_provider, routing_provider
global adv
path_decider = PathDecider(FLAGS.enable_routing_aid,
FLAGS.enable_nudge,
......@@ -114,9 +133,8 @@ def init():
FLAGS.enable_follow)
traj_generator = TrajectoryGenerator()
mobileye_provider = MobileyeProvider()
chassis_provider = ChassisProvider()
localization_provider = LocalizationProvider()
routing_provider = RoutingProvider()
adv = ADVehicle()
pgm_path = os.path.dirname(os.path.realpath(__file__))
log_path = pgm_path + "/logs/"
......
......@@ -26,15 +26,22 @@ class ObstacleDecider:
self.obstacle_lon_ttc = {}
self.obstacle_lat_dist = {}
self.obstacle_lon_dist = {}
self.MASTER_WIDTH = 3.4
self.MASTER_LENGTH = 5.5
self.front_edge_to_center = 3.89
self.back_edge_to_center = 1.043
self.left_edge_to_center = 1.055
self.right_edge_to_center = 1.055
self.LAT_DIST = 1.0
self.mobileye = None
self.path_obstacle_processed = False
def update(self, mobileye):
self.mobileye = mobileye
self.path_obstacle_processed = False
def process_path_obstacle(self, path_x, path_y):
if self.path_obstacle_processed:
return
self.obstacle_lat_dist = {}
path = []
......@@ -46,7 +53,7 @@ class ObstacleDecider:
for obstacle in self.mobileye.obstacles:
point = Point(obstacle.x, obstacle.y)
dist = line.distance(point)
if dist < self.MASTER_WIDTH / 2.0 + self.LAT_DIST:
if dist < self.left_edge_to_center + self.LAT_DIST:
proj_len = line.project(point)
if proj_len == 0 or proj_len >= line.length:
continue
......@@ -60,3 +67,5 @@ class ObstacleDecider:
if d < 0:
dist *= -1
self.obstacle_lat_dist[obstacle.obstacle_id] = dist
self.path_obstacle_processed = True
......@@ -16,7 +16,10 @@
# limitations under the License.
###############################################################################
import math
from reference_path import ReferencePath
from local_path import LocalPath
from numpy.polynomial.polynomial import polyval
class PathDecider:
......@@ -28,27 +31,183 @@ class PathDecider:
self.enable_routing_aid = enable_routing_aid
self.enable_nudge = enable_nudge
self.enable_change_lane = enable_change_lane
self.path_range = 10
def get_path_by_lm(self, mobileye, chassis):
return self.ref.get_ref_path_by_lm(mobileye, chassis)
def get_path_by_lm(self, mobileye, adv):
return self.ref.get_ref_path_by_lm(mobileye, adv)
def get_path_by_lmr(self, perception, routing,
localization, chassis):
def get_path_by_lmr(self, perception, routing, adv):
path_x, path_y, path_len = self.ref.get_ref_path_by_lmr(perception,
routing,
localization,
chassis)
adv)
if self.enable_nudge:
path_x, path_y, path_len = self.nudge_process(path_x, path_y,
path_len)
return path_x, path_y, path_len
def nudge_process(self, path_x, path_y, path_len):
def nudge_process(self, path_x, path_y, path_len, obstacle_decider):
obstacle_decider.process_path_obstacle(path_x, path_y)
left_dist = 999
right_dist = 999
for obs_id, lat_dist in obstacle_decider.obstacle_lat_dist.items():
if lat_dist < 0:
left_dist = lat_dist
else:
right_dist = lat_dist
return path_x, path_y, path_len
def get(self, perception, routing, localization, chassis):
def get(self, perception, routing, adv):
if self.enable_routing_aid:
return self.get_path_by_lmr(perception, routing, localization,
chassis)
return self.get_path_by_lmr(perception, routing, adv)
else:
return self.get_path_by_lm(perception, chassis)
return self.get_path_by_lm(perception, adv)
def get_path(self, perception, routing, adv):
self.path_range = self._get_path_range(adv.speed_mps)
if self.enable_routing_aid and adv.is_ready():
return self.get_routing_path(perception, routing, adv)
else:
return self.get_lane_marker_path(perception)
def get_routing_path(self, perception, routing, adv):
routing_path = routing.get_local_path(adv, self.path_range + 1)
perception_path = perception.get_lane_marker_middle_path(
self.path_range + 1)
quality = perception.right_lm_quality + perception.left_lm_quality
quality = quality / 2.0
if routing_path.range() >= self.path_range \
and routing.human \
and routing_path.init_y() <= 3:
# "routing only"
init_y_routing = routing_path.init_y()
init_y = self._smooth_init_y(init_y_routing)
routing_path.shift(init_y - routing_path.init_y())
return routing_path
init_y = self._smooth_init_y(perception_path.init_y())
if routing_path.range() < self.path_range:
# "perception only"
perception_path.shift(init_y - perception_path.init_y())
return perception_path
# "hybrid"
init_y = perception_path.init_y()
routing_path.shift(init_y - routing_path.init_y())
perception_path.shift(init_y - routing_path.init_y())
routing_path.merge(perception_path, quality)
return routing_path
def get_lane_marker_path(self, perception):
path = perception.get_lane_marker_middle_path(perception,
self.path_range)
init_y = path.init_y()
smoothed_init_y = self._smooth_init_y(init_y)
path.shift(smoothed_init_y - init_y)
return path
def _get_path_range(self, speed_mps):
path_length = self.MINIMUM_PATH_LENGTH
current_speed = speed_mps
if current_speed is not None:
if path_length < current_speed * 2:
path_length = math.ceil(current_speed * 2)
return int(path_length)
def _smooth_init_y(self, init_y):
if init_y > 0.2:
init_y = 0.2
if init_y < -0.2:
init_y = -0.2
return init_y
if __name__ == "__main__":
import rospy
from std_msgs.msg import String
import matplotlib.pyplot as plt
from modules.localization.proto import localization_pb2
from modules.canbus.proto import chassis_pb2
from ad_vehicle import ADVehicle
import matplotlib.animation as animation
from modules.drivers.proto import mobileye_pb2
from provider_routing import RoutingProvider
from provider_mobileye import MobileyeProvider
def localization_callback(localization_pb):
ad_vehicle.update_localization(localization_pb)
def routing_callback(routing_str):
routing.update(routing_str)
def chassis_callback(chassis_pb):
ad_vehicle.update_chassis(chassis_pb)
def mobileye_callback(mobileye_pb):
mobileye.update(mobileye_pb)
mobileye.process_lane_markers()
def update(frame):
if not ad_vehicle.is_ready():
return
left_path = mobileye.get_left_lane_marker_path()
left_x, left_y = left_path.get_xy()
left_lm.set_xdata(left_x)
left_lm.set_ydata(left_y)
right_path = mobileye.get_right_lane_marker_path()
right_x, right_y = right_path.get_xy()
right_lm.set_xdata(right_x)
right_lm.set_ydata(right_y)
middle_path = mobileye.get_lane_marker_middle_path(128)
middle_x, middle_y = middle_path.get_xy()
middle_lm.set_xdata(middle_x)
middle_lm.set_ydata(middle_y)
fpath = path_decider.get_path(mobileye, routing, ad_vehicle)
fpath_x, fpath_y = fpath.get_xy()
final_path.set_xdata(fpath_x)
final_path.set_ydata(fpath_y)
# ax.autoscale_view()
# ax.relim()
ad_vehicle = ADVehicle()
routing = RoutingProvider()
mobileye = MobileyeProvider()
path_decider = PathDecider(True, False, False)
rospy.init_node("path_decider_debug", anonymous=True)
rospy.Subscriber('/apollo/localization/pose',
localization_pb2.LocalizationEstimate,
localization_callback)
rospy.Subscriber('/apollo/navigation/routing',
String, routing_callback)
rospy.Subscriber('/apollo/canbus/chassis',
chassis_pb2.Chassis,
chassis_callback)
rospy.Subscriber('/apollo/sensor/mobileye',
mobileye_pb2.Mobileye,
mobileye_callback)
fig = plt.figure()
ax = plt.subplot2grid((1, 1), (0, 0), rowspan=1, colspan=1)
left_lm, = ax.plot([], [], 'b-')
right_lm, = ax.plot([], [], 'b-')
middle_lm, = ax.plot([], [], 'k-')
final_path, = ax.plot([], [], 'r-')
ani = animation.FuncAnimation(fig, update, interval=100)
ax.set_xlim([-2, 128])
ax.set_ylim([-5, 5])
# ax2.axis('equal')
plt.show()
......@@ -20,6 +20,7 @@ from shapely.geometry import Point
from shapely.geometry import LineString
from lanemarker_corrector import LaneMarkerCorrector
from numpy.polynomial.polynomial import polyval
from local_path import LocalPath
class Obstacle:
......@@ -190,3 +191,117 @@ class MobileyeProvider:
self.right_lm_coef)
self.left_lm_coef, self.right_lm_coef = \
corrector.correct(position, heading, routing_segment)
def get_right_lane_marker_path(self):
points = []
for x in range(int(self.right_lane_marker_range)):
y = -1 * polyval(x, self.right_lm_coef)
points.append([x, y])
path = LocalPath(points)
return path
def get_left_lane_marker_path(self):
points = []
for x in range(int(self.left_lane_marker_range)):
y = -1 * polyval(x, self.left_lm_coef)
points.append([x, y])
path = LocalPath(points)
return path
def get_lane_marker_middle_path(self, path_range):
init_y_perception = (self.right_lm_coef[0] +
self.left_lm_coef[0]) / -2.0
path_coef = [0, 0, 0, 0]
path_coef[0] = -1 * init_y_perception
quality = self.right_lm_quality + self.left_lm_quality
if quality > 0:
for i in range(1, 4):
path_coef[i] = (self.right_lm_coef[i] *
self.right_lm_quality +
self.left_lm_coef[i] *
self.left_lm_quality) / quality
points = []
for x in range(path_range):
y = -1 * polyval(x, path_coef)
points.append([x, y])
return LocalPath(points)
if __name__ == "__main__":
import rospy
from std_msgs.msg import String
import matplotlib.pyplot as plt
from modules.localization.proto import localization_pb2
from modules.canbus.proto import chassis_pb2
from ad_vehicle import ADVehicle
import matplotlib.animation as animation
from modules.drivers.proto import mobileye_pb2
from provider_routing import RoutingProvider
def localization_callback(localization_pb):
ad_vehicle.update_localization(localization_pb)
def routing_callback(routing_str):
routing.update(routing_str)
def chassis_callback(chassis_pb):
ad_vehicle.update_chassis(chassis_pb)
def mobileye_callback(mobileye_pb):
mobileye.update(mobileye_pb)
mobileye.process_lane_markers();
def update(frame):
left_path = mobileye.get_left_lane_marker_path()
left_x, left_y = left_path.get_xy()
left_lm.set_xdata(left_x)
left_lm.set_ydata(left_y)
right_path = mobileye.get_right_lane_marker_path()
right_x, right_y = right_path.get_xy()
right_lm.set_xdata(right_x)
right_lm.set_ydata(right_y)
middle_path = mobileye.get_lane_marker_middle_path(128)
middle_x, middle_y = middle_path.get_xy()
middle_lm.set_xdata(middle_x)
middle_lm.set_ydata(middle_y)
# ax.autoscale_view()
# ax.relim()
ad_vehicle = ADVehicle()
routing = RoutingProvider()
mobileye = MobileyeProvider()
rospy.init_node("routing_debug", anonymous=True)
rospy.Subscriber('/apollo/localization/pose',
localization_pb2.LocalizationEstimate,
localization_callback)
rospy.Subscriber('/apollo/navigation/routing',
String, routing_callback)
rospy.Subscriber('/apollo/canbus/chassis',
chassis_pb2.Chassis,
chassis_callback)
rospy.Subscriber('/apollo/sensor/mobileye',
mobileye_pb2.Mobileye,
mobileye_callback)
fig = plt.figure()
ax = plt.subplot2grid((1, 1), (0, 0), rowspan=1, colspan=1)
left_lm, = ax.plot([], [], 'b-')
right_lm, = ax.plot([], [], 'b-')
middle_lm, = ax.plot([], [], 'r-')
ani = animation.FuncAnimation(fig, update, interval=100)
ax.set_xlim([-2, 128])
ax.set_ylim([-5, 5])
# ax2.axis('equal')
plt.show()
......@@ -21,6 +21,7 @@ import threading
import math
from shapely.geometry import LineString, Point
from numpy.polynomial import polynomial as P
from local_path import LocalPath
# sudo apt-get install libgeos-dev
# sudo pip install shapely
......@@ -198,3 +199,109 @@ class RoutingProvider:
X = np.linspace(int(mono_seg_x[0]), int(mono_seg_x[-1]),
int(mono_seg_x[-1]))
return X, sp(X)
def get_local_path(self, adv, path_range):
utm_x = adv.x
utm_y = adv.y
heading = adv.heading
local_seg_x, local_seg_y = self.get_local_segment(utm_x, utm_y, heading)
if len(local_seg_x) <= 10:
return LocalPath([])
if self.human:
x, y = self.get_local_ref(local_seg_x, local_seg_y)
points = []
for i in range(path_range):
if i < len(x):
points.append([x[i], y[i]])
return LocalPath(points)
mono_seg_x, mono_seg_y = self.to_monotonic_segment(
local_seg_x, local_seg_y)
if len(mono_seg_x) <= 10:
return LocalPath([])
k = 3
n = len(mono_seg_x)
std = 0.5
sp = optimized_spline(mono_seg_x, mono_seg_y, k, s=n * std)
X = np.linspace(0, int(local_seg_x[-1]), int(local_seg_x[-1]))
y = sp(X)
points = []
for i in range(path_range):
if i < len(X):
points.append([X[i], y[i]])
return LocalPath(points)
if __name__ == "__main__":
import rospy
from std_msgs.msg import String
import matplotlib.pyplot as plt
from modules.localization.proto import localization_pb2
from modules.canbus.proto import chassis_pb2
from ad_vehicle import ADVehicle
import matplotlib.animation as animation
def localization_callback(localization_pb):
ad_vehicle.update_localization(localization_pb)
def routing_callback(routing_str):
routing.update(routing_str)
def chassis_callback(chassis_pb):
ad_vehicle.update_chassis(chassis_pb)
def update(frame):
routing_line_x = []
routing_line_y = []
for point in routing.routing_points:
routing_line_x.append(point[0])
routing_line_y.append(point[1])
routing_line.set_xdata(routing_line_x)
routing_line.set_ydata(routing_line_y)
vehicle_point.set_xdata([ad_vehicle.x])
vehicle_point.set_ydata([ad_vehicle.y])
if ad_vehicle.is_ready():
path = routing.get_local_path(ad_vehicle.x, ad_vehicle.y,
ad_vehicle.heading)
path_x, path_y = path.get_xy()
local_line.set_xdata(path_x)
local_line.set_ydata(path_y)
ax.autoscale_view()
ax.relim()
# ax2.autoscale_view()
# ax2.relim()
ad_vehicle = ADVehicle()
routing = RoutingProvider()
rospy.init_node("routing_debug", anonymous=True)
rospy.Subscriber('/apollo/localization/pose',
localization_pb2.LocalizationEstimate,
localization_callback)
rospy.Subscriber('/apollo/navigation/routing',
String, routing_callback)
rospy.Subscriber('/apollo/canbus/chassis',
chassis_pb2.Chassis,
chassis_callback)
fig = plt.figure()
ax = plt.subplot2grid((3, 1), (0, 0), rowspan=2, colspan=1)
ax2 = plt.subplot2grid((3, 1), (2, 0), rowspan=1, colspan=1)
routing_line, = ax.plot([], [], 'r-')
vehicle_point, = ax.plot([], [], 'ko')
local_line, = ax2.plot([], [], 'b-')
ani = animation.FuncAnimation(fig, update, interval=100)
ax2.set_xlim([-2, 200])
ax2.set_ylim([-50, 50])
# ax2.axis('equal')
plt.show()
......@@ -73,13 +73,13 @@ class ReferencePath:
path_y.append(y)
return path_x, path_y
def get_ref_path_by_lmr(self, perception, routing, localization, chassis):
def get_ref_path_by_lmr(self, perception, routing, adv):
path_length = self.get_path_length(chassis.get_speed_mps())
path_length = self.get_path_length(adv.speed_mps)
rpath_x, rpath_y = routing.get_local_segment_spline(localization.x,
localization.y,
localization.heading)
rpath_x, rpath_y = routing.get_local_segment_spline(adv.x,
adv.y,
adv.heading)
init_y_perception = (perception.right_lm_coef[0] +
perception.left_lm_coef[0]) / -2.0
quality = perception.right_lm_quality + perception.left_lm_quality
......@@ -105,7 +105,7 @@ class ReferencePath:
path_x = []
path_y = []
for i in range(int(path_length)):
#TODO(yifei): more accurate shift is needed.
# TODO(yifei): more accurate shift is needed.
y = (lmpath_y[i] * quality + rpath_y[i] - routing_shift) / (
1 + quality)
path_x.append(i)
......
......@@ -39,8 +39,9 @@ class TrajectoryGenerator:
def __init__(self):
self.mobileye_pb = None
def generate(self, path_x, path_y, final_path_length, speed,
def generate(self, path, final_path_length, speed,
start_timestamp):
path_x, path_y = path.get_xy()
adc_trajectory = planning_pb2.ADCTrajectory()
adc_trajectory.header.timestamp_sec = rospy.Time.now().to_sec()
adc_trajectory.header.module_name = "planning"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册