提交 3cdab991 编写于 作者: V Vivian Lin 提交者: GitHub

Dreamview: handle fragmented message from websocket (#2929)

* Dreamview: handle fragmented message from websocket

* fixup
上级 756835d6
......@@ -144,6 +144,9 @@ bool WebSocketHandler::SendData(Connection *conn, const std::string &data,
return true;
}
thread_local unsigned char WebSocketHandler::current_opcode_ = 0x00;
thread_local std::stringstream WebSocketHandler::data_;
bool WebSocketHandler::handleData(CivetServer *server, Connection *conn,
int bits, char *data, size_t data_len) {
// Ignore connection close request.
......@@ -151,22 +154,44 @@ bool WebSocketHandler::handleData(CivetServer *server, Connection *conn,
return false;
}
switch (bits &= 0x7f) {
case WEBSOCKET_OPCODE_TEXT:
return handleJsonData(conn, data, data_len);
case WEBSOCKET_OPCODE_BINARY:
return handleBinaryData(conn, data, data_len);
default:
AERROR << "Unknown WebSocket bits flag: " << bits;
return true;
data_.write(data, data_len);
if (current_opcode_ == 0x00) {
current_opcode_ = bits & 0x7f;
}
bool result = true;
// The FIN bit (the left most significant bit) is used to indicates
// the final fragment in a message. Note, the first fragment MAY
// also be the final fragment.
bool is_final_fragment = bits & 0x80;
if (is_final_fragment) {
switch (current_opcode_) {
case WEBSOCKET_OPCODE_TEXT:
result = handleJsonData(conn, data_.str());
break;
case WEBSOCKET_OPCODE_BINARY:
result = handleBinaryData(conn, data_.str());
break;
default:
AERROR << "Unknown WebSocket bits flag: " << bits;
break;
}
// reset opcode and data
current_opcode_ = 0x00;
data_.clear();
data_.str(std::string());
}
return result;
}
bool WebSocketHandler::handleJsonData(Connection *conn, char *data,
size_t data_len) {
bool WebSocketHandler::handleJsonData(Connection *conn,
const std::string &data) {
Json json;
try {
json = Json::parse(data, data + data_len);
json = Json::parse(data.begin(), data.end());
} catch (const std::exception &e) {
AERROR << "Failed to parse JSON data: " << e.what();
return false;
......@@ -187,11 +212,10 @@ bool WebSocketHandler::handleJsonData(Connection *conn, char *data,
return true;
}
bool WebSocketHandler::handleBinaryData(Connection *conn, char *data,
size_t data_len) {
bool WebSocketHandler::handleBinaryData(Connection *conn,
const std::string &data) {
auto type = "Binary";
std::string data_string(data, data_len);
message_handlers_[type](data_string, conn);
message_handlers_[type](data, conn);
return true;
}
......
......@@ -48,6 +48,11 @@ namespace dreamview {
* to all the connected clients.
*/
class WebSocketHandler : public CivetWebSocketHandler {
// In case of receiving fragmented message,
// websocket opcode and accumulated data are stored.
thread_local static unsigned char current_opcode_;
thread_local static std::stringstream data_;
public:
using Json = nlohmann::json;
using Connection = struct mg_connection;
......@@ -79,6 +84,14 @@ class WebSocketHandler : public CivetWebSocketHandler {
* @brief Callback method for when a data frame has been received from the
* client.
*
* @details In the websocket protocol, data is transmitted using a sequence of
* frames, and each frame received invokes this callback method. Since the the type
* of opcode (text, binary, etc) is given in the first frame, this method stores
* the opcode in a thread_local variable named current_opcode_. And data from each
* frame is accumulated to data_ until the final fragment is detected. See websocket
* RFC at http://tools.ietf.org/html/rfc6455, section 5.4 for more protocol and
* fragmentation details.
*
* @param server the calling server
* @param conn the connection information
* @param bits first byte of the websocket frame, see websocket RFC at
......@@ -90,8 +103,8 @@ class WebSocketHandler : public CivetWebSocketHandler {
bool handleData(CivetServer *server, Connection *conn, int bits, char *data,
size_t data_len) override;
bool handleJsonData(Connection *conn, char *data, size_t data_len);
bool handleBinaryData(Connection *conn, char *data, size_t data_len);
bool handleJsonData(Connection *conn, const std::string &data);
bool handleBinaryData(Connection *conn, const std::string &data);
/**
* @brief Callback method for when the connection is closed.
......
......@@ -80,7 +80,7 @@ void SimulationWorldUpdater::RegisterMessageHandlers() {
websocket_->RegisterMessageHandler(
"Binary",
[this](const std::string data, WebSocketHandler::Connection *conn) {
[this](const std::string &data, WebSocketHandler::Connection *conn) {
apollo::relative_map::NavigationInfo navigation_info;
if (navigation_info.ParseFromString(data)) {
AdapterManager::FillNavigationHeader(FLAGS_dreamview_module_name,
......
......@@ -16,7 +16,6 @@
# limitations under the License.
###############################################################################
set -x
mkdir -p proto_bundle
# proto dependencies
......
......@@ -900,106 +900,6 @@
}
}
},
"VehicleState": {
"fields": {
"x": {
"type": "double",
"id": 1,
"options": {
"default": 0
}
},
"y": {
"type": "double",
"id": 2,
"options": {
"default": 0
}
},
"z": {
"type": "double",
"id": 3,
"options": {
"default": 0
}
},
"timestamp": {
"type": "double",
"id": 4,
"options": {
"default": 0
}
},
"roll": {
"type": "double",
"id": 5,
"options": {
"default": 0
}
},
"pitch": {
"type": "double",
"id": 6,
"options": {
"default": 0
}
},
"yaw": {
"type": "double",
"id": 7,
"options": {
"default": 0
}
},
"heading": {
"type": "double",
"id": 8,
"options": {
"default": 0
}
},
"kappa": {
"type": "double",
"id": 9,
"options": {
"default": 0
}
},
"linearVelocity": {
"type": "double",
"id": 10,
"options": {
"default": 0
}
},
"angularVelocity": {
"type": "double",
"id": 11,
"options": {
"default": 0
}
},
"linearAcceleration": {
"type": "double",
"id": 12,
"options": {
"default": 0
}
},
"gear": {
"type": "apollo.canbus.Chassis.GearPosition",
"id": 13
},
"drivingMode": {
"type": "apollo.canbus.Chassis.DrivingMode",
"id": 14
},
"pose": {
"type": "apollo.localization.Pose",
"id": 15
}
}
},
"monitor": {
"nested": {
"MonitorMessageItem": {
......
......@@ -369,8 +369,8 @@ class GmapNavigator {
controlUI.addEventListener("click", () => {
const start = this.vehicleMarker.getPosition();
const endLat = 37.506039;
const endLng = -122.340299;
const endLat = 37.50582457077844;
const endLng = -122.34000922633726;
this.requestRouting(start.lat(), start.lng(), endLat, endLng);
});
}
......
import PARAMETERS from "store/config/parameters.yml";
import GMAP_NAVIGATOR from "utils/navigation/gmap_navigator";
import GMAP_NAVIGATOR from "components/Navigation/GmapNavigator";
import NavigationWebSocketEndpoint from "store/websocket/websocket_navigation";
function deduceWebsocketServerAddr() {
......
import Worker from 'worker-loader!utils/webworker.js';
import GMAP_NAVIGATOR from "utils/navigation/gmap_navigator";
import GMAP_NAVIGATOR from "components/Navigation/GmapNavigator";
export default class NavigationWebSocketEndpoint {
constructor(serverAddr) {
......
......@@ -13,7 +13,7 @@ module.exports = {
entry: {
app: "./app.js",
navigation: "./utils/navigation/index.js"
navigation: "./components/Navigation/entry.js"
},
output: {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册