Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
oceanbase
oblogproxy
提交
d7614332
O
oblogproxy
项目概览
oceanbase
/
oblogproxy
大约 1 年 前同步成功
通知
24
Star
29
Fork
10
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oblogproxy
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d7614332
编写于
1月 01, 2022
作者:
F
Fankux
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
lz4 compressed by default of clientdata packet && use git rev as server version
上级
461af08f
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
299 addition
and
56 deletion
+299
-56
src/arranger/arranger.cpp
src/arranger/arranger.cpp
+12
-7
src/arranger/arranger.h
src/arranger/arranger.h
+1
-1
src/arranger/source_invoke.cpp
src/arranger/source_invoke.cpp
+10
-2
src/codec/legacy_encoder.cpp
src/codec/legacy_encoder.cpp
+96
-11
src/codec/message.cpp
src/codec/message.cpp
+3
-5
src/codec/message.h
src/codec/message.h
+62
-5
src/codec/protobuf_encoder.cpp
src/codec/protobuf_encoder.cpp
+2
-2
src/common/common.cpp
src/common/common.cpp
+2
-5
src/common/version.h
src/common/version.h
+17
-0
src/communication/communicator.cpp
src/communication/communicator.cpp
+2
-1
src/obaccess/mysql_protocol.cpp
src/obaccess/mysql_protocol.cpp
+3
-0
src/obaccess/ob_access.cpp
src/obaccess/ob_access.cpp
+15
-13
src/obaccess/oblog_config.h
src/obaccess/oblog_config.h
+4
-3
src/oblogreader/sender_routine.cpp
src/oblogreader/sender_routine.cpp
+1
-1
src/test/test_compress.cpp
src/test/test_compress.cpp
+68
-0
src/test/test_entry.cpp
src/test/test_entry.cpp
+1
-0
未找到文件。
src/arranger/arranger.cpp
浏览文件 @
d7614332
...
...
@@ -12,6 +12,7 @@
#include <mutex>
#include <memory>
#include "common/version.h"
#include "common/log.h"
#include "arranger/source_invoke.h"
#include "arranger/arranger.h"
...
...
@@ -25,7 +26,10 @@ static Config& _s_conf = Config::instance();
int
Arranger
::
init
()
{
localhostip
(
_localhost
,
_localip
);
if
(
!
localhostip
(
_localhost
,
_localip
)
||
_localhost
.
empty
()
||
_localip
.
empty
())
{
OMS_ERROR
<<
"Failed to fetch localhost name or localip"
;
return
OMS_FAILED
;
}
int
ret
=
_accepter
.
init
();
if
(
ret
==
OMS_FAILED
)
{
...
...
@@ -56,20 +60,21 @@ EventResult Arranger::on_msg(const PeerInfo& peer, const Message& msg)
std
::
string
errmsg
;
if
(
auth
(
client
,
errmsg
)
!=
OMS_OK
)
{
response_error
(
peer
,
msg
.
version
(),
errmsg
);
response_error
(
peer
,
msg
.
version
(),
ErrorCode
::
NO_AUTH
,
errmsg
);
return
EventResult
::
ER_CLOSE_CHANNEL
;
}
ClientHandshakeResponseMessage
resp
(
0
,
_localip
.
c_str
(),
"1.0.0"
);
ClientHandshakeResponseMessage
resp
(
0
,
_localip
,
__OMS_VERSION__
);
resp
.
set_version
(
msg
.
version
());
int
ret
=
_accepter
.
send_message
(
peer
,
resp
,
true
);
if
(
ret
!=
OMS_OK
)
{
OMS_WARN
<<
"Failed to send handshake response message. peer="
<<
peer
.
to_string
();
return
EventResult
::
ER_CLOSE_CHANNEL
;
}
ret
=
create
(
client
);
if
(
ret
!=
OMS_OK
)
{
response_error
(
peer
,
msg
.
version
(),
"Failed to create oblogreader"
);
response_error
(
peer
,
msg
.
version
(),
E_INNER
,
"Failed to create oblogreader"
);
return
EventResult
::
ER_CLOSE_CHANNEL
;
}
...
...
@@ -149,11 +154,11 @@ int Arranger::start_source(const ClientMeta& client, const std::string& configur
return
OMS_OK
;
}
void
Arranger
::
response_error
(
const
PeerInfo
&
peer
,
MessageVersion
version
,
const
std
::
string
&
errmsg
)
void
Arranger
::
response_error
(
const
PeerInfo
&
peer
,
MessageVersion
version
,
ErrorCode
code
,
const
std
::
string
&
errmsg
)
{
ErrorMessage
error
(
-
1
,
errmsg
);
ErrorMessage
error
(
code
,
errmsg
);
error
.
set_version
(
version
);
int
ret
=
_accepter
.
send_message
(
peer
,
error
);
int
ret
=
_accepter
.
send_message
(
peer
,
error
,
true
);
if
(
ret
!=
OMS_OK
)
{
OMS_WARN
<<
"Failed to send error response message to peer:"
<<
peer
.
to_string
()
<<
" for message:"
<<
errmsg
;
}
...
...
src/arranger/arranger.h
浏览文件 @
d7614332
...
...
@@ -41,7 +41,7 @@ private:
int
start_source
(
const
ClientMeta
&
client
,
const
std
::
string
&
configuration
);
void
response_error
(
const
PeerInfo
&
,
MessageVersion
version
,
const
std
::
string
&
);
void
response_error
(
const
PeerInfo
&
,
MessageVersion
version
,
ErrorCode
code
,
const
std
::
string
&
);
int
close_client_locked
(
const
ClientMeta
&
client
,
const
std
::
string
&
msg
);
...
...
src/arranger/source_invoke.cpp
浏览文件 @
d7614332
...
...
@@ -75,8 +75,16 @@ public:
ObLogReader
&
reader
=
ObLogReader
::
instance
();
OblogConfig
oblog_config
(
_config
);
oblog_config
.
user
.
set
(
Config
::
instance
().
ob_sys_username
.
val
());
oblog_config
.
password
.
set
(
Config
::
instance
().
ob_sys_password
.
val
());
if
(
!
oblog_config
.
sys_user
.
empty
())
{
oblog_config
.
user
.
set
(
oblog_config
.
sys_user
.
val
());
}
else
{
oblog_config
.
user
.
set
(
Config
::
instance
().
ob_sys_username
.
val
());
}
if
(
!
oblog_config
.
sys_password
.
empty
())
{
oblog_config
.
password
.
set
(
oblog_config
.
sys_password
.
val
());
}
else
{
oblog_config
.
password
.
set
(
Config
::
instance
().
ob_sys_password
.
val
());
}
reader
.
init
(
_client
.
id
.
get
(),
_client
.
packet_version
,
ch
,
oblog_config
);
reader
.
start
();
reader
.
join
();
...
...
src/codec/legacy_encoder.cpp
浏览文件 @
d7614332
...
...
@@ -10,9 +10,11 @@
* See the Mulan PubL v2 for more details.
*/
#include "lz4.h"
#include "MsgHeader.h"
#include "common/config.h"
#include "common/guard.hpp"
#include "codec/encoder.h"
namespace
oceanbase
{
...
...
@@ -20,6 +22,80 @@ namespace logproxy {
static
Config
&
_s_config
=
Config
::
instance
();
static
int
compress_data
(
const
RecordDataMessage
&
msg
,
MsgBuf
&
buffer
)
{
std
::
vector
<
std
::
pair
<
const
char
*
,
size_t
>>
ptrs
;
ptrs
.
reserve
(
msg
.
records
.
size
());
uint32_t
total_size
=
0
;
for
(
auto
record
:
msg
.
records
)
{
size_t
size
=
0
;
// got independ address
const
char
*
logmsg_buf
=
record
->
getFormatedString
(
&
size
);
if
(
logmsg_buf
==
nullptr
)
{
OMS_ERROR
<<
"Failed to serialize logmsg"
;
return
OMS_FAILED
;
}
if
(
_s_config
.
verbose_packet
.
val
())
{
const
MsgHeader
*
header
=
(
const
MsgHeader
*
)(
logmsg_buf
);
OMS_DEBUG
<<
"Encode logmsg Header, type: "
<<
header
->
m_msgType
<<
", version: "
<<
header
->
m_version
<<
", size: "
<<
header
->
m_size
;
}
ptrs
.
emplace_back
(
logmsg_buf
,
size
);
total_size
+=
(
size
+
8
);
}
char
*
raw
=
(
char
*
)
malloc
(
total_size
);
FreeGuard
<
char
*>
fg_raw
(
raw
);
if
(
raw
==
nullptr
)
{
OMS_ERROR
<<
"Failed to allocate raw buffer to compress, size:"
<<
total_size
;
return
OMS_FAILED
;
}
int
bound_size
=
LZ4_COMPRESSBOUND
(
total_size
);
char
*
compressed
=
(
char
*
)
malloc
(
bound_size
);
FreeGuard
<
char
*>
fg
(
compressed
);
if
(
compressed
==
nullptr
)
{
OMS_ERROR
<<
"Failed to allocate LZ4 bound buffer, size:"
<<
bound_size
;
return
OMS_FAILED
;
}
size_t
offset
=
0
;
for
(
size_t
i
=
0
;
i
<
ptrs
.
size
();
++
i
)
{
size_t
block_size
=
ptrs
[
i
].
second
;
uint32_t
seq_be
=
cpu_to_be
<
uint32_t
>
(
i
);
uint32_t
size_be
=
cpu_to_be
<
uint32_t
>
(
block_size
);
memcpy
(
raw
+
offset
,
&
seq_be
,
4
);
memcpy
(
raw
+
offset
+
4
,
&
size_be
,
4
);
memcpy
(
raw
+
offset
+
8
,
ptrs
[
i
].
first
,
block_size
);
offset
+=
(
block_size
+
8
);
}
int
compressed_size
=
LZ4_compress_fast
(
raw
,
compressed
,
total_size
,
bound_size
,
1
);
if
(
compressed_size
<=
0
)
{
OMS_ERROR
<<
"Failed to compress logmsg, raw size:"
<<
total_size
<<
", bound size:"
<<
bound_size
;
return
OMS_FAILED
;
}
if
(
_s_config
.
verbose
.
val
())
{
OMS_DEBUG
<<
"compress packet raw from size:"
<<
total_size
<<
" to compressed size:"
<<
compressed_size
;
}
uint32_t
packet_len_be
=
cpu_to_be
<
uint32_t
>
(
compressed_size
+
9
);
uint32_t
orginal_size_be
=
cpu_to_be
<
uint32_t
>
(
total_size
);
uint32_t
compressed_size_be
=
cpu_to_be
<
uint32_t
>
(
compressed_size
);
char
*
buf
=
(
char
*
)
malloc
(
13
);
memcpy
(
buf
,
&
packet_len_be
,
4
);
memset
(
buf
+
4
,
(
uint8_t
)
CompressType
::
LZ4
,
1
);
memcpy
(
buf
+
5
,
&
orginal_size_be
,
4
);
memcpy
(
buf
+
9
,
&
compressed_size_be
,
4
);
buffer
.
push_back
(
buf
,
13
);
// transfer ownership to Msgbuf
fg
.
release
();
buffer
.
push_back
(
compressed
,
compressed_size
);
return
OMS_OK
;
}
LegacyEncoder
::
LegacyEncoder
()
{
/*
...
...
@@ -29,29 +105,31 @@ LegacyEncoder::LegacyEncoder()
*/
_funcs
.
emplace
((
int8_t
)
MessageType
::
HANDSHAKE_RESPONSE_CLIENT
,
[](
const
Message
&
in_msg
,
MsgBuf
&
buffer
)
{
const
ClientHandshakeResponseMessage
&
msg
=
(
const
ClientHandshakeResponseMessage
&
)
in_msg
;
size_t
len
=
4
+
1
+
msg
.
ip
.
size
()
+
1
+
msg
.
version
.
size
();
size_t
len
=
4
+
1
+
msg
.
server_ip
.
size
()
+
1
+
msg
.
server_
version
.
size
();
char
*
buf
=
(
char
*
)
malloc
(
len
);
if
(
buf
==
nullptr
)
{
OMS_ERROR
<<
"Failed to encode handshake request due to failed to alloc memory"
;
return
OMS_FAILED
;
}
OMS_INFO
<<
"Encode handshake response to send:"
<<
msg
.
debug_string
();
// Response code
memset
(
buf
,
0
,
4
);
// Server IP
size_t
offset
=
4
;
uint8_t
varlen
=
msg
.
ip
.
size
();
uint8_t
varlen
=
msg
.
server_
ip
.
size
();
memcpy
(
buf
+
offset
,
&
varlen
,
1
);
offset
+=
1
;
memcpy
(
buf
+
offset
,
msg
.
ip
.
c_str
(),
varlen
);
memcpy
(
buf
+
offset
,
msg
.
server_
ip
.
c_str
(),
varlen
);
offset
+=
varlen
;
// Server version
varlen
=
msg
.
version
.
size
();
varlen
=
msg
.
server_
version
.
size
();
memcpy
(
buf
+
offset
,
&
varlen
,
1
);
offset
+=
1
;
memcpy
(
buf
+
offset
,
msg
.
version
.
c_str
(),
varlen
);
memcpy
(
buf
+
offset
,
msg
.
server_
version
.
c_str
(),
varlen
);
buffer
.
push_back
(
buf
,
len
);
return
OMS_OK
;
...
...
@@ -71,6 +149,10 @@ LegacyEncoder::LegacyEncoder()
_funcs
.
emplace
((
int8_t
)
MessageType
::
DATA_CLIENT
,
[](
const
Message
&
in_msg
,
MsgBuf
&
buffer
)
{
const
RecordDataMessage
&
msg
=
(
const
RecordDataMessage
&
)
in_msg
;
if
(
msg
.
compress_type
==
CompressType
::
LZ4
)
{
return
compress_data
(
msg
,
buffer
);
}
uint32_t
total_size
=
0
;
for
(
size_t
i
=
0
;
i
<
msg
.
records
.
size
();
++
i
)
{
ILogRecord
*
record
=
msg
.
records
[
i
];
...
...
@@ -84,7 +166,7 @@ LegacyEncoder::LegacyEncoder()
}
if
(
_s_config
.
verbose_packet
.
val
())
{
const
MsgHeader
*
header
=
(
const
MsgHeader
*
)(
logmsg_buf
);
OMS_DEBUG
<<
"Encode
LogMessage
Header, type: "
<<
header
->
m_msgType
<<
", version: "
<<
header
->
m_version
OMS_DEBUG
<<
"Encode
logmsg
Header, type: "
<<
header
->
m_msgType
<<
", version: "
<<
header
->
m_version
<<
", size: "
<<
header
->
m_size
;
}
...
...
@@ -102,7 +184,7 @@ LegacyEncoder::LegacyEncoder()
char
*
buf
=
(
char
*
)
malloc
(
4
+
1
+
4
+
4
);
memcpy
(
buf
,
&
packet_len_be
,
4
);
memset
(
buf
+
4
,
0
,
1
);
// CompressType::PLAIN
memset
(
buf
+
4
,
(
uint8_t
)
CompressType
::
PLAIN
,
1
);
memcpy
(
buf
+
5
,
&
total_size
,
4
);
memcpy
(
buf
+
9
,
&
total_size
,
4
);
buffer
.
push_front
(
buf
,
13
);
...
...
@@ -118,7 +200,7 @@ LegacyEncoder::LegacyEncoder()
_funcs
.
emplace
((
int8_t
)
MessageType
::
ERROR_RESPONSE
,
[](
const
Message
&
in_msg
,
MsgBuf
&
buffer
)
{
const
ErrorMessage
&
msg
=
(
const
ErrorMessage
&
)
in_msg
;
size_t
len
=
4
+
msg
.
message
.
size
();
size_t
len
=
4
+
4
+
msg
.
message
.
size
();
char
*
buf
=
(
char
*
)
malloc
(
len
);
if
(
buf
==
nullptr
)
{
OMS_ERROR
<<
"Failed to encode error message due to failed to alloc memory"
;
...
...
@@ -126,10 +208,13 @@ LegacyEncoder::LegacyEncoder()
}
// Error message
uint32_t
varlen
=
cpu_to_be
<
uint32_t
>
(
msg
.
message
.
size
());
memcpy
(
buf
,
&
varlen
,
4
);
memcpy
(
buf
+
4
,
msg
.
message
.
c_str
(),
varlen
);
uint32_t
code_be
=
cpu_to_be
<
uint32_t
>
(
msg
.
code
);
memcpy
(
buf
,
&
code_be
,
4
);
uint32_t
varlen_be
=
cpu_to_be
<
uint32_t
>
(
msg
.
message
.
size
());
memcpy
(
buf
+
4
,
&
varlen_be
,
4
);
memcpy
(
buf
+
8
,
msg
.
message
.
c_str
(),
msg
.
message
.
size
());
// buf's ownership transfered to buffer
buffer
.
push_back
(
buf
,
len
);
return
OMS_OK
;
});
...
...
src/codec/message.cpp
浏览文件 @
d7614332
...
...
@@ -91,11 +91,9 @@ ClientHandshakeRequestMessage::ClientHandshakeRequestMessage(
configuration
(
configuration
)
{}
ClientHandshakeResponseMessage
::
ClientHandshakeResponseMessage
()
:
Message
(
MessageType
::
HANDSHAKE_RESPONSE_CLIENT
)
{}
ClientHandshakeResponseMessage
::
ClientHandshakeResponseMessage
(
int
in_code
,
const
char
*
in_ip
,
const
char
*
in_version
)
:
Message
(
MessageType
::
HANDSHAKE_RESPONSE_CLIENT
),
code
(
in_code
),
ip
(
in_ip
),
version
(
in_version
)
ClientHandshakeResponseMessage
::
ClientHandshakeResponseMessage
(
int
in_code
,
const
std
::
string
&
in_ip
,
const
std
::
string
&
in_version
)
:
Message
(
MessageType
::
HANDSHAKE_RESPONSE_CLIENT
),
code
(
in_code
),
server_ip
(
in_ip
),
server_version
(
in_version
)
{}
RuntimeStatusMessage
::
RuntimeStatusMessage
()
:
Message
(
MessageType
::
STATUS
)
...
...
src/codec/message.h
浏览文件 @
d7614332
...
...
@@ -64,6 +64,65 @@ enum class PacketError {
NETWORK_ERROR
,
};
enum
ErrorCode
{
////////// 0~499: process error ////////////
/**
* general error
*/
NONE
=
0
,
/**
* inner error
*/
E_INNER
=
1
,
/**
* failed to connect
*/
E_CONNECT
=
2
,
/**
* exceed max retry connect count
*/
E_MAX_RECONNECT
=
3
,
/**
* user callback throws exception
*/
E_USER
=
4
,
////////// 500~: recv data error ////////////
/**
* unknown data protocol
*/
E_PROTOCOL
=
500
,
/**
* unknown header type
*/
E_HEADER_TYPE
=
501
,
/**
* failed to auth
*/
NO_AUTH
=
502
,
/**
* unknown compress type
*/
E_COMPRESS_TYPE
=
503
,
/**
* length not match
*/
E_LEN
=
504
,
/**
* failed to parse data
*/
E_PARSE
=
505
};
constexpr
char
PACKET_MAGIC
[]
=
{
'x'
,
'i'
,
'5'
,
'3'
,
'g'
,
']'
,
'q'
};
constexpr
size_t
PACKET_MAGIC_SIZE
=
sizeof
(
PACKET_MAGIC
);
const
size_t
PACKET_VERSION_SIZE
=
2
;
...
...
@@ -126,16 +185,14 @@ public:
class
ClientHandshakeResponseMessage
:
public
Message
,
public
Model
{
public:
ClientHandshakeResponseMessage
();
ClientHandshakeResponseMessage
(
int
code
,
const
char
*
ip
,
const
char
*
version
);
ClientHandshakeResponseMessage
(
int
code
,
const
std
::
string
&
in_ip
,
const
std
::
string
&
in_version
);
~
ClientHandshakeResponseMessage
()
override
=
default
;
private:
OMS_MF_DFT
(
int
,
code
,
-
1
);
OMS_MF
(
std
::
string
,
ip
);
OMS_MF
(
std
::
string
,
version
);
OMS_MF
(
std
::
string
,
server_
ip
);
OMS_MF
(
std
::
string
,
server_
version
);
};
class
RuntimeStatusMessage
:
public
Message
,
public
Model
{
...
...
src/codec/protobuf_encoder.cpp
浏览文件 @
d7614332
...
...
@@ -139,8 +139,8 @@ int ProtobufEncoder::encode_client_handshake_response(const Message& msg, MsgBuf
const
ClientHandshakeResponseMessage
&
response_message
=
(
const
ClientHandshakeResponseMessage
&
)
msg
;
ClientHandshakeResponse
pb_msg
;
pb_msg
.
set_code
(
response_message
.
code
);
pb_msg
.
set_ip
(
response_message
.
ip
);
pb_msg
.
set_version
(
response_message
.
version
);
pb_msg
.
set_ip
(
response_message
.
server_
ip
);
pb_msg
.
set_version
(
response_message
.
server_
version
);
return
encode_message
(
pb_msg
,
response_message
.
type
(),
buffer
,
false
);
}
...
...
src/common/common.cpp
浏览文件 @
d7614332
...
...
@@ -60,23 +60,20 @@ static char* localhost()
bool
localhostip
(
std
::
string
&
hostname
,
std
::
string
&
ip
)
{
struct
hostent
*
hp
;
char
*
hname
=
localhost
();
if
(
hname
==
nullptr
)
{
return
false
;
}
hostname
=
hname
;
if
(
!
(
hp
=
gethostbyname
(
hname
)))
{
struct
hostent
*
hp
=
gethostbyname
(
hname
);
if
(
hp
==
nullptr
)
{
return
false
;
}
while
(
hp
->
h_addr_list
[
0
])
{
ip
=
inet_ntoa
(
*
(
struct
in_addr
*
)
*
hp
->
h_addr_list
++
);
}
return
true
;
}
...
...
src/common/version.h
0 → 100644
浏览文件 @
d7614332
/**
* Copyright (c) 2021 OceanBase
* OceanBase Migration Service LogProxy is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#pragma once
#ifndef __OMS_VERSION__
#define __OMS_VERSION__ "2.0.0"
#endif
src/communication/communicator.cpp
浏览文件 @
d7614332
...
...
@@ -345,7 +345,8 @@ int Communicator::send_message(const PeerInfo& peer, const Message& msg, bool di
int
Communicator
::
write_message
(
Channel
*
ch
,
const
Message
&
msg
)
{
if
(
_s_config
.
verbose_packet
.
val
())
{
OMS_INFO
<<
"About to write mssage, ch: "
<<
ch
->
peer
().
id
()
<<
", msg type: "
<<
(
int
)
msg
.
type
();
OMS_INFO
<<
"About to write mssage: "
<<
msg
.
debug_string
()
<<
", ch: "
<<
ch
->
peer
().
id
()
<<
", msg type: "
<<
(
int
)
msg
.
type
();
}
MsgBuf
buffer
;
...
...
src/obaccess/mysql_protocol.cpp
浏览文件 @
d7614332
...
...
@@ -217,6 +217,9 @@ int MysqlProtocol::login(const std::string& host, int port, const std::string& u
int
MysqlProtocol
::
query
(
const
std
::
string
&
sql
,
MysqlResultSet
&
rs
)
{
OMS_INFO
<<
"query obmysql:"
<<
sql
;
MysqlQueryPacket
packet
(
sql
);
MsgBuf
msgbuf
;
int
ret
=
packet
.
encode_inplace
(
msgbuf
);
...
...
src/obaccess/ob_access.cpp
浏览文件 @
d7614332
...
...
@@ -150,7 +150,7 @@ int ObAccess::init(const OblogConfig& config)
if
(
config
.
sys_password
.
empty
())
{
MysqlProtocol
::
do_sha_password
(
Config
::
instance
().
ob_sys_password
.
val
(),
_sys_password_sha1
);
}
else
{
_sys_password_sha1
=
config
.
sys_password
.
val
();
// already sha1
MysqlProtocol
::
do_sha_password
(
config
.
sys_password
.
val
(),
_sys_password_sha1
);
}
if
(
_sys_user
.
empty
()
||
_sys_password_sha1
.
empty
())
{
OMS_ERROR
<<
"Failed to init ObAccess caused by empty sys_user or sys_password"
;
...
...
@@ -204,8 +204,8 @@ int ObAccess::auth_sys(const ServerInfo& server)
int
ObAccess
::
auth_tenant
(
const
ServerInfo
&
server
)
{
// 1. found tenant server using sys
MysqlProtocol
auther
;
int
ret
=
auther
.
login
(
server
.
host
,
server
.
port
,
_sys_user
,
_sys_password_sha1
);
MysqlProtocol
sys_
auther
;
int
ret
=
sys_
auther
.
login
(
server
.
host
,
server
.
port
,
_sys_user
,
_sys_password_sha1
);
if
(
ret
!=
OMS_OK
)
{
return
ret
;
}
...
...
@@ -215,15 +215,17 @@ int ObAccess::auth_tenant(const ServerInfo& server)
// 2. for each of tenant servers, login it.
MysqlResultSet
rs
;
for
(
auto
&
tenant_entry
:
_table_whites
.
tenants
)
{
OMS_INFO
<<
"About to auth tenant:"
<<
tenant_entry
.
first
<<
" of user:"
<<
_user
;
rs
.
reset
();
ret
=
auther
.
query
(
"SELECT server.svr_ip, server.inner_port, server.zone, tenant.tenant_id, tenant.tenant_name from
"
"oceanbase.__all_resource_pool AS pool, oceanbase.__all_unit AS unit, oceanbase.__all_server AS "
"server, oceanbase.__all_tenant AS tenant WHERE tenant.tenant_id =
pool.tenant_id AND "
"unit.resource_pool_id = pool.resource_pool_id AND unit.svr_ip =
server.svr_ip AND "
"unit.svr_port =
server.svr_port AND tenant.tenant_name='"
+
tenant_entry
.
first
+
"'"
,
rs
);
ret
=
sys_auther
.
query
(
"SELECT server.svr_ip, server.inner_port, server.zone, tenant.tenant_id, tenant.tenant_name FROM
"
"oceanbase.__all_resource_pool AS pool, oceanbase.__all_unit AS unit, oceanbase.__all_server AS "
"server, oceanbase.__all_tenant AS tenant WHERE tenant.tenant_id=
pool.tenant_id AND "
"unit.resource_pool_id=pool.resource_pool_id AND unit.svr_ip=
server.svr_ip AND "
"unit.svr_port=
server.svr_port AND tenant.tenant_name='"
+
tenant_entry
.
first
+
"'"
,
rs
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to auth, failed to query tenant server for:"
<<
tenant_entry
.
first
<<
", ret:"
<<
ret
;
return
OMS_FAILED
;
...
...
@@ -237,11 +239,11 @@ int ObAccess::auth_tenant(const ServerInfo& server)
const
std
::
string
&
host
=
row
.
fields
()[
0
];
const
uint16_t
sql_port
=
atoi
(
row
.
fields
()[
1
].
c_str
());
MysqlProtocol
sys
_auther
;
MysqlProtocol
user
_auther
;
std
::
string
conn_user
=
ob_user
.
username
;
conn_user
.
append
(
"@"
);
conn_user
.
append
(
ob_user
.
tenant
.
empty
()
?
tenant_entry
.
first
:
ob_user
.
tenant
);
ret
=
sys
_auther
.
login
(
host
,
sql_port
,
conn_user
,
_password_sha1
);
ret
=
user
_auther
.
login
(
host
,
sql_port
,
conn_user
,
_password_sha1
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to auth from tenant server: "
<<
host
<<
":"
<<
sql_port
<<
", ret:"
<<
ret
;
return
ret
;
...
...
src/obaccess/oblog_config.h
浏览文件 @
d7614332
...
...
@@ -22,18 +22,19 @@ namespace logproxy {
struct
OblogConfig
:
protected
ConfigBase
{
public:
// client defined params
OMS_CONFIG_STR
(
id
,
""
);
OMS_CONFIG_STR_K
(
sys_user
,
"sys_user"
,
""
);
OMS_CONFIG_STR_K
(
sys_password
,
"sys_password"
,
""
);
// from here to beflow, params use to send to liboblog
OMS_CONFIG_UINT64_K
(
start_timestamp
,
"first_start_timestamp"
,
0
);
OMS_CONFIG_STR_K
(
cluster_url
,
"cluster_url"
,
""
);
// syntax: rs1:rpc_port1:sql_port1;rs2:rpc_port2:sql_port2
OMS_CONFIG_STR_K
(
root_servers
,
"rootserver_list"
,
""
);
OMS_CONFIG_STR_K
(
user
,
"cluster_user"
,
""
);
OMS_CONFIG_STR_K
(
password
,
"cluster_password"
,
""
);
OMS_CONFIG_STR_K
(
sys_user
,
"sys_user"
,
""
);
OMS_CONFIG_STR_K
(
sys_password
,
"sys_password"
,
""
);
OMS_CONFIG_STR_K
(
table_whites
,
"tb_white_list"
,
""
);
public:
...
...
src/oblogreader/sender_routine.cpp
浏览文件 @
d7614332
...
...
@@ -182,7 +182,7 @@ int SenderRoutine::do_send(const std::vector<ILogRecord*>& records, size_t offse
_stage_timer
.
reset
();
RecordDataMessage
msg
(
records
,
offset
,
count
);
msg
.
set_version
(
_packet_version
);
msg
.
compress_type
=
CompressType
::
PLAIN
;
msg
.
compress_type
=
CompressType
::
LZ4
;
int
ret
=
_comm
.
send_message
(
_client_peer
,
msg
,
true
);
Counter
::
instance
().
count_key
(
Counter
::
SENDER_SEND_US
,
_stage_timer
.
elapsed
());
...
...
src/test/test_compress.cpp
0 → 100644
浏览文件 @
d7614332
/**
* Copyright (c) 2021 OceanBase
* OceanBase Migration Service LogProxy is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "gtest/gtest.h"
#include "lz4.h"
#include "common/log.h"
using
namespace
oceanbase
::
logproxy
;
TEST
(
COMPRESS
,
lz4_flow
)
{
// std::vector<std::string> texts = {{"01234567899876543210"}, {"abcdefghijabcdefghij"}};
std
::
vector
<
std
::
string
>
texts
=
{{
"11111111111111111111"
},
{
"abcdefghijabcdefghij"
}};
char
compressed
[
100
]
=
"
\0
"
;
LZ4_stream_t
lz4s
;
LZ4_resetStream
(
&
lz4s
);
uint32_t
compressed_size
=
0
;
for
(
auto
&
text
:
texts
)
{
size_t
block_size
=
text
.
size
();
memcpy
(
compressed
+
compressed_size
,
&
block_size
,
4
);
int
bound_size
=
LZ4_COMPRESSBOUND
(
block_size
);
char
*
block_compressed
=
(
char
*
)
malloc
(
bound_size
);
int
compressed_block_size
=
LZ4_compress_fast_continue
(
&
lz4s
,
text
.
c_str
(),
block_compressed
,
block_size
,
bound_size
,
1
);
ASSERT_TRUE
(
compressed_block_size
>
0
);
memcpy
(
compressed
+
compressed_size
+
4
,
&
compressed_block_size
,
4
);
memcpy
(
compressed
+
compressed_size
+
8
,
block_compressed
,
compressed_block_size
);
compressed_size
+=
(
compressed_block_size
+
8
);
free
(
block_compressed
);
}
std
::
string
compressed_hex
;
dumphex
(
compressed
,
compressed_size
,
compressed_hex
);
OMS_INFO
<<
"compressed buffer:"
<<
compressed_hex
<<
", size: "
<<
compressed_size
;
size_t
offset
=
0
;
while
(
offset
<
compressed_size
)
{
uint32_t
block_size
=
0
;
uint32_t
compressed_block_size
=
0
;
memcpy
(
&
block_size
,
compressed
+
offset
,
4
);
memcpy
(
&
compressed_block_size
,
compressed
+
offset
+
4
,
4
);
char
*
raw_block
=
(
char
*
)
malloc
(
block_size
+
1
);
ASSERT_TRUE
(
raw_block
!=
nullptr
);
int
decompressed_size
=
LZ4_decompress_safe
(
compressed
+
offset
+
8
,
raw_block
,
compressed_block_size
,
block_size
);
ASSERT_EQ
(
decompressed_size
,
block_size
);
raw_block
[
decompressed_size
]
=
'\0'
;
OMS_INFO
<<
"decompress block: "
<<
raw_block
<<
", size:"
<<
block_size
<<
", compressed size:"
<<
compressed_block_size
;
free
(
raw_block
);
offset
+=
(
compressed_block_size
+
8
);
}
}
src/test/test_entry.cpp
浏览文件 @
d7614332
...
...
@@ -22,6 +22,7 @@
#include "test/test_ob_mysql.cpp"
//#include "test/test_codec.cpp"
#include "test/test_http.cpp"
#include "test/test_compress.cpp"
int
main
(
int
argc
,
char
*
argv
[])
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录