Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
oceanbase
oblogproxy
提交
a0fc3ce6
O
oblogproxy
项目概览
oceanbase
/
oblogproxy
大约 1 年 前同步成功
通知
24
Star
29
Fork
10
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oblogproxy
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
a0fc3ce6
编写于
4月 19, 2022
作者:
F
Fankux
提交者:
GitHub
4月 19, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #27 from fankux/master
fix compile version
上级
c7f46a60
34594d7c
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
31 addition
and
18 deletion
+31
-18
CMakeLists.txt
CMakeLists.txt
+12
-0
src/arranger/arranger.cpp
src/arranger/arranger.cpp
+2
-3
src/arranger/arranger.h
src/arranger/arranger.h
+1
-1
src/arranger/source_invoke.cpp
src/arranger/source_invoke.cpp
+5
-6
src/codec/legacy_encoder.cpp
src/codec/legacy_encoder.cpp
+3
-2
src/communication/channel.cpp
src/communication/channel.cpp
+1
-0
src/communication/communicator.cpp
src/communication/communicator.cpp
+2
-6
src/entry.cpp
src/entry.cpp
+4
-0
src/oblogreader/reader_routine.cpp
src/oblogreader/reader_routine.cpp
+1
-0
未找到文件。
CMakeLists.txt
浏览文件 @
a0fc3ce6
...
@@ -194,6 +194,18 @@ execute_process(
...
@@ -194,6 +194,18 @@ execute_process(
WORKING_DIRECTORY
${
PROJECT_SOURCE_DIR
}
WORKING_DIRECTORY
${
PROJECT_SOURCE_DIR
}
)
)
if
(
NOT GIT_VERSION
)
message
(
WARNING
"oblogproxy fetch git version empty, use current time as program version"
)
STRING
(
TIMESTAMP GIT_VERSION
"%Y-%m-%d %H:%M:%S"
)
endif
()
if
(
NOT GIT_VERSION
)
message
(
WARNING
"oblogproxy fetch current time failed"
)
SET
(
GIT_VERSION
"2.0.0"
)
endif
()
message
(
"oblogproxy version:
${
GIT_VERSION
}
"
)
if
(
WITH_DEBUG
)
if
(
WITH_DEBUG
)
SET
(
DEBUG_SYMBOL
"-ggdb"
)
SET
(
DEBUG_SYMBOL
"-ggdb"
)
else
()
else
()
...
...
src/arranger/arranger.cpp
浏览文件 @
a0fc3ce6
...
@@ -52,7 +52,7 @@ EventResult Arranger::on_msg(const PeerInfo& peer, const Message& msg)
...
@@ -52,7 +52,7 @@ EventResult Arranger::on_msg(const PeerInfo& peer, const Message& msg)
{
{
OMS_INFO
<<
"Arranger on_msg fired: "
<<
peer
.
to_string
();
OMS_INFO
<<
"Arranger on_msg fired: "
<<
peer
.
to_string
();
if
(
msg
.
type
()
==
MessageType
::
HANDSHAKE_REQUEST_CLIENT
)
{
if
(
msg
.
type
()
==
MessageType
::
HANDSHAKE_REQUEST_CLIENT
)
{
ClientHandshakeRequestMessage
&
handshake
=
(
ClientHandshakeRequestMessage
&
)
msg
;
auto
&
handshake
=
(
ClientHandshakeRequestMessage
&
)
msg
;
OMS_INFO
<<
"Handshake request from peer: "
<<
peer
.
to_string
()
<<
", msg: "
<<
handshake
.
to_string
();
OMS_INFO
<<
"Handshake request from peer: "
<<
peer
.
to_string
()
<<
", msg: "
<<
handshake
.
to_string
();
ClientMeta
client
=
ClientMeta
::
from_handshake
(
peer
,
handshake
);
ClientMeta
client
=
ClientMeta
::
from_handshake
(
peer
,
handshake
);
...
@@ -128,7 +128,7 @@ int Arranger::create(const ClientMeta& client)
...
@@ -128,7 +128,7 @@ int Arranger::create(const ClientMeta& client)
int
ret
=
start_source
(
client
,
client
.
configuration
);
int
ret
=
start_source
(
client
,
client
.
configuration
);
if
(
ret
!=
OMS_OK
)
{
if
(
ret
!=
OMS_OK
)
{
close_client_locked
(
client
,
""
);
close_client_locked
(
client
,
"
failed to invoke
"
);
return
ret
;
return
ret
;
}
}
...
@@ -182,7 +182,6 @@ int Arranger::close_client_locked(const ClientMeta& client, const std::string& m
...
@@ -182,7 +182,6 @@ int Arranger::close_client_locked(const ClientMeta& client, const std::string& m
if
(
ret
!=
OMS_OK
)
{
if
(
ret
!=
OMS_OK
)
{
OMS_WARN
<<
"Failed to send error response message. client="
<<
client
.
peer
.
id
();
OMS_WARN
<<
"Failed to send error response message. client="
<<
client
.
peer
.
id
();
}
}
_accepter
.
remove_channel
(
channel_entry
->
second
);
_accepter
.
remove_channel
(
channel_entry
->
second
);
_client_peers
.
erase
(
channel_entry
);
_client_peers
.
erase
(
channel_entry
);
}
}
...
...
src/arranger/arranger.h
浏览文件 @
a0fc3ce6
...
@@ -37,7 +37,7 @@ public:
...
@@ -37,7 +37,7 @@ public:
private:
private:
EventResult
on_msg
(
const
PeerInfo
&
,
const
Message
&
);
EventResult
on_msg
(
const
PeerInfo
&
,
const
Message
&
);
int
auth
(
ClientMeta
&
client
,
std
::
string
&
errmsg
);
static
int
auth
(
ClientMeta
&
client
,
std
::
string
&
errmsg
);
int
start_source
(
const
ClientMeta
&
client
,
const
std
::
string
&
configuration
);
int
start_source
(
const
ClientMeta
&
client
,
const
std
::
string
&
configuration
);
...
...
src/arranger/source_invoke.cpp
浏览文件 @
a0fc3ce6
...
@@ -99,7 +99,6 @@ public:
...
@@ -99,7 +99,6 @@ public:
::
exit
(
-
1
);
::
exit
(
-
1
);
}
else
{
// parent;
}
else
{
// parent;
OMS_INFO
<<
"+++ create oblogreader with pid: "
<<
pid
;
OMS_INFO
<<
"+++ create oblogreader with pid: "
<<
pid
;
SourceWaiter
::
instance
().
add
(
pid
,
_client
);
SourceWaiter
::
instance
().
add
(
pid
,
_client
);
}
}
...
@@ -116,7 +115,6 @@ private:
...
@@ -116,7 +115,6 @@ private:
static
int
start_oblogreader
(
Communicator
&
communicator
,
const
ClientMeta
&
client
,
const
std
::
string
&
configuration
)
static
int
start_oblogreader
(
Communicator
&
communicator
,
const
ClientMeta
&
client
,
const
std
::
string
&
configuration
)
{
{
communicator
.
fork_prepare
();
communicator
.
fork_prepare
();
// we create new thread for fork() acting as children process's main thread
// we create new thread for fork() acting as children process's main thread
ForkThread
fork_thd
(
communicator
,
client
,
configuration
);
ForkThread
fork_thd
(
communicator
,
client
,
configuration
);
fork_thd
.
start
();
fork_thd
.
start
();
...
@@ -168,15 +166,16 @@ void SourceWaiter::WaitThread::run()
...
@@ -168,15 +166,16 @@ void SourceWaiter::WaitThread::run()
{
{
int
retval
=
OMS_OK
;
int
retval
=
OMS_OK
;
waitpid
(
_pid
,
&
retval
,
0
);
waitpid
(
_pid
,
&
retval
,
0
);
OMS_WARN
<<
"--- oblogreader exit with ret: "
<<
retval
<<
", try to close fd: "
<<
_client
.
peer
.
file_desc
;
if
(
WIFEXITED
(
retval
))
{
if
(
retval
!=
OMS_OK
)
{
OMS_INFO
<<
"--- oblogreader exit succeed, try to close fd: "
<<
_client
.
peer
.
file_desc
;
}
else
{
OMS_ERROR
<<
"oblogreader exit failed:"
<<
WEXITSTATUS
(
retval
);
// TODO... response to client with _client.channel
// TODO... response to client with _client.channel
}
}
shutdown
(
_client
.
peer
.
file_desc
,
SHUT_RDWR
);
shutdown
(
_client
.
peer
.
file_desc
,
SHUT_RDWR
);
// use a thread to remove avoid join dead lock
// use a thread to remove avoid join dead lock
Arranger
::
instance
().
close_client
(
_client
);
Arranger
::
instance
().
close_client
(
_client
,
"oblogreader exit"
);
SourceWaiter
::
instance
().
remove
(
_pid
);
SourceWaiter
::
instance
().
remove
(
_pid
);
OMS_WARN
<<
"--- oblogreader WaiterThread("
<<
tid
()
<<
") exit for pid: "
<<
_pid
;
OMS_WARN
<<
"--- oblogreader WaiterThread("
<<
tid
()
<<
") exit for pid: "
<<
_pid
;
...
...
src/codec/legacy_encoder.cpp
浏览文件 @
a0fc3ce6
...
@@ -212,7 +212,9 @@ LegacyEncoder::LegacyEncoder()
...
@@ -212,7 +212,9 @@ LegacyEncoder::LegacyEncoder()
memcpy
(
buf
,
&
code_be
,
4
);
memcpy
(
buf
,
&
code_be
,
4
);
uint32_t
varlen_be
=
cpu_to_be
<
uint32_t
>
(
msg
.
message
.
size
());
uint32_t
varlen_be
=
cpu_to_be
<
uint32_t
>
(
msg
.
message
.
size
());
memcpy
(
buf
+
4
,
&
varlen_be
,
4
);
memcpy
(
buf
+
4
,
&
varlen_be
,
4
);
memcpy
(
buf
+
8
,
msg
.
message
.
c_str
(),
msg
.
message
.
size
());
if
(
msg
.
message
.
size
()
!=
0
)
{
memcpy
(
buf
+
8
,
msg
.
message
.
c_str
(),
msg
.
message
.
size
());
}
// buf's ownership transfered to buffer
// buf's ownership transfered to buffer
buffer
.
push_back
(
buf
,
len
);
buffer
.
push_back
(
buf
,
len
);
...
@@ -238,7 +240,6 @@ int LegacyEncoder::encode(const Message& msg, MsgBuf& buffer)
...
@@ -238,7 +240,6 @@ int LegacyEncoder::encode(const Message& msg, MsgBuf& buffer)
uint32_t
msg_type_be
=
cpu_to_be
<
uint32_t
>
((
uint32_t
)
msg
.
type
());
uint32_t
msg_type_be
=
cpu_to_be
<
uint32_t
>
((
uint32_t
)
msg
.
type
());
memcpy
(
buf
+
2
,
&
msg_type_be
,
4
);
memcpy
(
buf
+
2
,
&
msg_type_be
,
4
);
buffer
.
push_front
(
buf
,
len
);
buffer
.
push_front
(
buf
,
len
);
return
ret
;
return
ret
;
}
}
...
...
src/communication/channel.cpp
浏览文件 @
a0fc3ce6
...
@@ -46,6 +46,7 @@ Channel* Channel::get()
...
@@ -46,6 +46,7 @@ Channel* Channel::get()
void
Channel
::
put
()
void
Channel
::
put
()
{
{
if
(
1
==
_refcount
.
fetch_sub
(
1
))
{
if
(
1
==
_refcount
.
fetch_sub
(
1
))
{
OMS_DEBUG
<<
"delete Channel"
;
delete
this
;
delete
this
;
}
}
}
}
...
...
src/communication/communicator.cpp
浏览文件 @
a0fc3ce6
...
@@ -186,7 +186,6 @@ int Communicator::add_channel(const PeerInfo& peer, Channel* ch /* = nullptr */)
...
@@ -186,7 +186,6 @@ int Communicator::add_channel(const PeerInfo& peer, Channel* ch /* = nullptr */)
}
}
std
::
lock_guard
<
std
::
mutex
>
lock_guard
(
_lock
);
std
::
lock_guard
<
std
::
mutex
>
lock_guard
(
_lock
);
auto
iter
=
_channels
.
find
(
peer
);
auto
iter
=
_channels
.
find
(
peer
);
if
(
iter
!=
_channels
.
end
())
{
if
(
iter
!=
_channels
.
end
())
{
OMS_WARN
<<
"Add channel twice: "
<<
peer
.
file_desc
;
OMS_WARN
<<
"Add channel twice: "
<<
peer
.
file_desc
;
...
@@ -263,7 +262,6 @@ int Communicator::remove_channel(const PeerInfo& peer, bool steal /* = false */)
...
@@ -263,7 +262,6 @@ int Communicator::remove_channel(const PeerInfo& peer, bool steal /* = false */)
Channel
*
ch
=
nullptr
;
Channel
*
ch
=
nullptr
;
{
{
std
::
lock_guard
<
std
::
mutex
>
lock_guard
(
_lock
);
std
::
lock_guard
<
std
::
mutex
>
lock_guard
(
_lock
);
auto
iter
=
_channels
.
find
(
peer
);
auto
iter
=
_channels
.
find
(
peer
);
if
(
iter
==
_channels
.
end
())
{
if
(
iter
==
_channels
.
end
())
{
OMS_WARN
<<
"No channel found of peer: "
<<
peer
.
to_string
();
OMS_WARN
<<
"No channel found of peer: "
<<
peer
.
to_string
();
...
@@ -284,7 +282,6 @@ int Communicator::remove_channel(const PeerInfo& peer, bool steal /* = false */)
...
@@ -284,7 +282,6 @@ int Communicator::remove_channel(const PeerInfo& peer, bool steal /* = false */)
int
Communicator
::
clear_channels
()
int
Communicator
::
clear_channels
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock_guard
(
_lock
);
std
::
lock_guard
<
std
::
mutex
>
lock_guard
(
_lock
);
for
(
auto
&
channel
:
_channels
)
{
for
(
auto
&
channel
:
_channels
)
{
Channel
*
ch
=
channel
.
second
;
Channel
*
ch
=
channel
.
second
;
release_channel_event
(
*
ch
,
false
);
release_channel_event
(
*
ch
,
false
);
...
@@ -296,7 +293,6 @@ int Communicator::clear_channels()
...
@@ -296,7 +293,6 @@ int Communicator::clear_channels()
Channel
*
Communicator
::
get_channel
(
const
PeerInfo
&
peer
)
Channel
*
Communicator
::
get_channel
(
const
PeerInfo
&
peer
)
{
{
const
std
::
lock_guard
<
std
::
mutex
>
lock_guard
(
_lock
);
const
std
::
lock_guard
<
std
::
mutex
>
lock_guard
(
_lock
);
return
get_channel_locked
(
peer
);
return
get_channel_locked
(
peer
);
}
}
...
@@ -468,8 +464,7 @@ void Communicator::on_event(int fd, short event, void* arg)
...
@@ -468,8 +464,7 @@ void Communicator::on_event(int fd, short event, void* arg)
delete
msg
;
delete
msg
;
}
}
}
}
//对于ER_CLOSE_CHANNEL,先处理错误再释放内存
ch
->
put
();
switch
(
err
)
{
switch
(
err
)
{
case
EventResult
::
ER_CLOSE_CHANNEL
:
case
EventResult
::
ER_CLOSE_CHANNEL
:
c
.
remove_channel
(
ch
->
_peer
);
c
.
remove_channel
(
ch
->
_peer
);
...
@@ -478,6 +473,7 @@ void Communicator::on_event(int fd, short event, void* arg)
...
@@ -478,6 +473,7 @@ void Communicator::on_event(int fd, short event, void* arg)
// do nothing
// do nothing
break
;
break
;
}
}
ch
->
put
();
}
}
void
Communicator
::
close_listen
()
void
Communicator
::
close_listen
()
...
...
src/entry.cpp
浏览文件 @
a0fc3ce6
...
@@ -29,6 +29,10 @@ int main(int argc, char** argv)
...
@@ -29,6 +29,10 @@ int main(int argc, char** argv)
options
.
usage
();
options
.
usage
();
exit
(
0
);
exit
(
0
);
}));
}));
options
.
add
(
OmsOption
(
'v'
,
"version"
,
false
,
"program version"
,
[
&
](
const
std
::
string
&
)
{
printf
(
"version: "
__OMS_VERSION__
"
\n
"
);
exit
(
0
);
}));
options
.
add
(
OmsOption
(
'f'
,
"file"
,
true
,
"configuration json file"
,
[
&
](
const
std
::
string
&
optarg
)
{
options
.
add
(
OmsOption
(
'f'
,
"file"
,
true
,
"configuration json file"
,
[
&
](
const
std
::
string
&
optarg
)
{
if
(
conf
.
load
(
optarg
)
!=
OMS_OK
)
{
if
(
conf
.
load
(
optarg
)
!=
OMS_OK
)
{
OMS_INFO
<<
"failed to load config: "
<<
optarg
;
OMS_INFO
<<
"failed to load config: "
<<
optarg
;
...
...
src/oblogreader/reader_routine.cpp
浏览文件 @
a0fc3ce6
...
@@ -46,6 +46,7 @@ void ReaderRoutine::stop()
...
@@ -46,6 +46,7 @@ void ReaderRoutine::stop()
void
ReaderRoutine
::
run
()
void
ReaderRoutine
::
run
()
{
{
if
(
_oblog
.
start
()
!=
OMS_OK
)
{
if
(
_oblog
.
start
()
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to start ReaderRoutine"
;
return
;
return
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录