提交 34594d7c 编写于 作者: F Fankux

fix compile version

上级 c7f46a60
...@@ -194,6 +194,18 @@ execute_process( ...@@ -194,6 +194,18 @@ execute_process(
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
) )
if (NOT GIT_VERSION)
message(WARNING "oblogproxy fetch git version empty, use current time as program version")
STRING(TIMESTAMP GIT_VERSION "%Y-%m-%d %H:%M:%S")
endif ()
if (NOT GIT_VERSION)
message(WARNING "oblogproxy fetch current time failed")
SET(GIT_VERSION "2.0.0")
endif ()
message("oblogproxy version: ${GIT_VERSION}")
if (WITH_DEBUG) if (WITH_DEBUG)
SET(DEBUG_SYMBOL "-ggdb") SET(DEBUG_SYMBOL "-ggdb")
else () else ()
......
...@@ -52,7 +52,7 @@ EventResult Arranger::on_msg(const PeerInfo& peer, const Message& msg) ...@@ -52,7 +52,7 @@ EventResult Arranger::on_msg(const PeerInfo& peer, const Message& msg)
{ {
OMS_INFO << "Arranger on_msg fired: " << peer.to_string(); OMS_INFO << "Arranger on_msg fired: " << peer.to_string();
if (msg.type() == MessageType::HANDSHAKE_REQUEST_CLIENT) { if (msg.type() == MessageType::HANDSHAKE_REQUEST_CLIENT) {
ClientHandshakeRequestMessage& handshake = (ClientHandshakeRequestMessage&)msg; auto& handshake = (ClientHandshakeRequestMessage&)msg;
OMS_INFO << "Handshake request from peer: " << peer.to_string() << ", msg: " << handshake.to_string(); OMS_INFO << "Handshake request from peer: " << peer.to_string() << ", msg: " << handshake.to_string();
ClientMeta client = ClientMeta::from_handshake(peer, handshake); ClientMeta client = ClientMeta::from_handshake(peer, handshake);
...@@ -128,7 +128,7 @@ int Arranger::create(const ClientMeta& client) ...@@ -128,7 +128,7 @@ int Arranger::create(const ClientMeta& client)
int ret = start_source(client, client.configuration); int ret = start_source(client, client.configuration);
if (ret != OMS_OK) { if (ret != OMS_OK) {
close_client_locked(client, ""); close_client_locked(client, "failed to invoke");
return ret; return ret;
} }
...@@ -182,7 +182,6 @@ int Arranger::close_client_locked(const ClientMeta& client, const std::string& m ...@@ -182,7 +182,6 @@ int Arranger::close_client_locked(const ClientMeta& client, const std::string& m
if (ret != OMS_OK) { if (ret != OMS_OK) {
OMS_WARN << "Failed to send error response message. client=" << client.peer.id(); OMS_WARN << "Failed to send error response message. client=" << client.peer.id();
} }
_accepter.remove_channel(channel_entry->second); _accepter.remove_channel(channel_entry->second);
_client_peers.erase(channel_entry); _client_peers.erase(channel_entry);
} }
......
...@@ -37,7 +37,7 @@ public: ...@@ -37,7 +37,7 @@ public:
private: private:
EventResult on_msg(const PeerInfo&, const Message&); EventResult on_msg(const PeerInfo&, const Message&);
int auth(ClientMeta& client, std::string& errmsg); static int auth(ClientMeta& client, std::string& errmsg);
int start_source(const ClientMeta& client, const std::string& configuration); int start_source(const ClientMeta& client, const std::string& configuration);
......
...@@ -99,7 +99,6 @@ public: ...@@ -99,7 +99,6 @@ public:
::exit(-1); ::exit(-1);
} else { // parent; } else { // parent;
OMS_INFO << "+++ create oblogreader with pid: " << pid; OMS_INFO << "+++ create oblogreader with pid: " << pid;
SourceWaiter::instance().add(pid, _client); SourceWaiter::instance().add(pid, _client);
} }
...@@ -116,7 +115,6 @@ private: ...@@ -116,7 +115,6 @@ private:
static int start_oblogreader(Communicator& communicator, const ClientMeta& client, const std::string& configuration) static int start_oblogreader(Communicator& communicator, const ClientMeta& client, const std::string& configuration)
{ {
communicator.fork_prepare(); communicator.fork_prepare();
// we create new thread for fork() acting as children process's main thread // we create new thread for fork() acting as children process's main thread
ForkThread fork_thd(communicator, client, configuration); ForkThread fork_thd(communicator, client, configuration);
fork_thd.start(); fork_thd.start();
...@@ -168,15 +166,16 @@ void SourceWaiter::WaitThread::run() ...@@ -168,15 +166,16 @@ void SourceWaiter::WaitThread::run()
{ {
int retval = OMS_OK; int retval = OMS_OK;
waitpid(_pid, &retval, 0); waitpid(_pid, &retval, 0);
OMS_WARN << "--- oblogreader exit with ret: " << retval << ", try to close fd: " << _client.peer.file_desc; if (WIFEXITED(retval)) {
if (retval != OMS_OK) { OMS_INFO << "--- oblogreader exit succeed, try to close fd: " << _client.peer.file_desc;
} else {
OMS_ERROR << "oblogreader exit failed:" << WEXITSTATUS(retval);
// TODO... response to client with _client.channel // TODO... response to client with _client.channel
} }
shutdown(_client.peer.file_desc, SHUT_RDWR); shutdown(_client.peer.file_desc, SHUT_RDWR);
// use a thread to remove avoid join dead lock // use a thread to remove avoid join dead lock
Arranger::instance().close_client(_client); Arranger::instance().close_client(_client, "oblogreader exit");
SourceWaiter::instance().remove(_pid); SourceWaiter::instance().remove(_pid);
OMS_WARN << "--- oblogreader WaiterThread(" << tid() << ") exit for pid: " << _pid; OMS_WARN << "--- oblogreader WaiterThread(" << tid() << ") exit for pid: " << _pid;
......
...@@ -212,7 +212,9 @@ LegacyEncoder::LegacyEncoder() ...@@ -212,7 +212,9 @@ LegacyEncoder::LegacyEncoder()
memcpy(buf, &code_be, 4); memcpy(buf, &code_be, 4);
uint32_t varlen_be = cpu_to_be<uint32_t>(msg.message.size()); uint32_t varlen_be = cpu_to_be<uint32_t>(msg.message.size());
memcpy(buf + 4, &varlen_be, 4); memcpy(buf + 4, &varlen_be, 4);
memcpy(buf + 8, msg.message.c_str(), msg.message.size()); if (msg.message.size() != 0) {
memcpy(buf + 8, msg.message.c_str(), msg.message.size());
}
// buf's ownership transfered to buffer // buf's ownership transfered to buffer
buffer.push_back(buf, len); buffer.push_back(buf, len);
...@@ -238,7 +240,6 @@ int LegacyEncoder::encode(const Message& msg, MsgBuf& buffer) ...@@ -238,7 +240,6 @@ int LegacyEncoder::encode(const Message& msg, MsgBuf& buffer)
uint32_t msg_type_be = cpu_to_be<uint32_t>((uint32_t)msg.type()); uint32_t msg_type_be = cpu_to_be<uint32_t>((uint32_t)msg.type());
memcpy(buf + 2, &msg_type_be, 4); memcpy(buf + 2, &msg_type_be, 4);
buffer.push_front(buf, len); buffer.push_front(buf, len);
return ret; return ret;
} }
......
...@@ -46,6 +46,7 @@ Channel* Channel::get() ...@@ -46,6 +46,7 @@ Channel* Channel::get()
void Channel::put() void Channel::put()
{ {
if (1 == _refcount.fetch_sub(1)) { if (1 == _refcount.fetch_sub(1)) {
OMS_DEBUG << "delete Channel";
delete this; delete this;
} }
} }
......
...@@ -186,7 +186,6 @@ int Communicator::add_channel(const PeerInfo& peer, Channel* ch /* = nullptr */) ...@@ -186,7 +186,6 @@ int Communicator::add_channel(const PeerInfo& peer, Channel* ch /* = nullptr */)
} }
std::lock_guard<std::mutex> lock_guard(_lock); std::lock_guard<std::mutex> lock_guard(_lock);
auto iter = _channels.find(peer); auto iter = _channels.find(peer);
if (iter != _channels.end()) { if (iter != _channels.end()) {
OMS_WARN << "Add channel twice: " << peer.file_desc; OMS_WARN << "Add channel twice: " << peer.file_desc;
...@@ -263,7 +262,6 @@ int Communicator::remove_channel(const PeerInfo& peer, bool steal /* = false */) ...@@ -263,7 +262,6 @@ int Communicator::remove_channel(const PeerInfo& peer, bool steal /* = false */)
Channel* ch = nullptr; Channel* ch = nullptr;
{ {
std::lock_guard<std::mutex> lock_guard(_lock); std::lock_guard<std::mutex> lock_guard(_lock);
auto iter = _channels.find(peer); auto iter = _channels.find(peer);
if (iter == _channels.end()) { if (iter == _channels.end()) {
OMS_WARN << "No channel found of peer: " << peer.to_string(); OMS_WARN << "No channel found of peer: " << peer.to_string();
...@@ -284,7 +282,6 @@ int Communicator::remove_channel(const PeerInfo& peer, bool steal /* = false */) ...@@ -284,7 +282,6 @@ int Communicator::remove_channel(const PeerInfo& peer, bool steal /* = false */)
int Communicator::clear_channels() int Communicator::clear_channels()
{ {
std::lock_guard<std::mutex> lock_guard(_lock); std::lock_guard<std::mutex> lock_guard(_lock);
for (auto& channel : _channels) { for (auto& channel : _channels) {
Channel* ch = channel.second; Channel* ch = channel.second;
release_channel_event(*ch, false); release_channel_event(*ch, false);
...@@ -296,7 +293,6 @@ int Communicator::clear_channels() ...@@ -296,7 +293,6 @@ int Communicator::clear_channels()
Channel* Communicator::get_channel(const PeerInfo& peer) Channel* Communicator::get_channel(const PeerInfo& peer)
{ {
const std::lock_guard<std::mutex> lock_guard(_lock); const std::lock_guard<std::mutex> lock_guard(_lock);
return get_channel_locked(peer); return get_channel_locked(peer);
} }
...@@ -468,8 +464,7 @@ void Communicator::on_event(int fd, short event, void* arg) ...@@ -468,8 +464,7 @@ void Communicator::on_event(int fd, short event, void* arg)
delete msg; delete msg;
} }
} }
//对于ER_CLOSE_CHANNEL,先处理错误再释放内存
ch->put();
switch (err) { switch (err) {
case EventResult::ER_CLOSE_CHANNEL: case EventResult::ER_CLOSE_CHANNEL:
c.remove_channel(ch->_peer); c.remove_channel(ch->_peer);
...@@ -478,6 +473,7 @@ void Communicator::on_event(int fd, short event, void* arg) ...@@ -478,6 +473,7 @@ void Communicator::on_event(int fd, short event, void* arg)
// do nothing // do nothing
break; break;
} }
ch->put();
} }
void Communicator::close_listen() void Communicator::close_listen()
......
...@@ -29,6 +29,10 @@ int main(int argc, char** argv) ...@@ -29,6 +29,10 @@ int main(int argc, char** argv)
options.usage(); options.usage();
exit(0); exit(0);
})); }));
options.add(OmsOption('v', "version", false, "program version", [&](const std::string&) {
printf("version: " __OMS_VERSION__ "\n");
exit(0);
}));
options.add(OmsOption('f', "file", true, "configuration json file", [&](const std::string& optarg) { options.add(OmsOption('f', "file", true, "configuration json file", [&](const std::string& optarg) {
if (conf.load(optarg) != OMS_OK) { if (conf.load(optarg) != OMS_OK) {
OMS_INFO << "failed to load config: " << optarg; OMS_INFO << "failed to load config: " << optarg;
......
...@@ -46,6 +46,7 @@ void ReaderRoutine::stop() ...@@ -46,6 +46,7 @@ void ReaderRoutine::stop()
void ReaderRoutine::run() void ReaderRoutine::run()
{ {
if (_oblog.start() != OMS_OK) { if (_oblog.start() != OMS_OK) {
OMS_ERROR << "Failed to start ReaderRoutine";
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册