Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
镜像
OpenAtomFoundation
pika
提交
8d83b996
pika
项目概览
镜像
/
OpenAtomFoundation
/
pika
10 个月 前同步成功
通知
0
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
pika
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
8d83b996
编写于
11月 25, 2019
作者:
Z
Zhao Minghuan
提交者:
GitHub
11月 25, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Support identify-binlog-type option for lower version pika upgrade usage (#812)
上级
8adee081
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
188 addition
and
3 deletion
+188
-3
conf/pika.conf
conf/pika.conf
+5
-0
include/pika_binlog_receiver_thread.h
include/pika_binlog_receiver_thread.h
+8
-2
include/pika_conf.h
include/pika_conf.h
+7
-0
include/pika_master_conn.h
include/pika_master_conn.h
+23
-0
src/pika_admin.cc
src/pika_admin.cc
+20
-1
src/pika_conf.cc
src/pika_conf.cc
+5
-0
src/pika_master_conn.cc
src/pika_master_conn.cc
+120
-0
未找到文件。
conf/pika.conf
浏览文件 @
8d83b996
...
...
@@ -94,6 +94,11 @@ double-master-server-id :
write
-
binlog
:
yes
# binlog file size: default is 100M, limited in [1K, 2G]
binlog
-
file
-
size
:
104857600
# When it becomes slave, the type of binlog it receives from the master
# if this option is set to 'new', that means I will be a slave to Pika who's version 3.0
# if this opsion is set to 'old', that means I will be a slave to Pika who's version 2.3.3 ~ 2.3.8
# identify-binlog-type [new | old]
identify
-
binlog
-
type
:
new
# Automatically triggers a small compaction according statistics
# Use the cache to store up to 'max-cache-statistic-keys' keys
# if 'max-cache-statistic-keys' set to '0', that means turn off the statistics function
...
...
include/pika_binlog_receiver_thread.h
浏览文件 @
8d83b996
...
...
@@ -14,6 +14,7 @@
#include "slash/include/env.h"
#include "include/pika_define.h"
#include "include/pika_binlog_receiver_conn.h"
#include "include/pika_master_conn.h"
#include "include/pika_command.h"
#include "include/pika_conf.h"
...
...
@@ -49,8 +50,13 @@ class PikaBinlogReceiverThread {
pink
::
Thread
*
thread
,
void
*
worker_specific_data
,
pink
::
PinkEpoll
*
pink_epoll
)
const
override
{
LOG
(
INFO
)
<<
"Master conn factory creat pika binlog conn ip_port"
<<
ip_port
;
return
std
::
make_shared
<
PikaBinlogReceiverConn
>
(
connfd
,
ip_port
,
binlog_receiver_
);
if
(
g_pika_conf
->
identify_binlog_type
()
==
"old"
)
{
LOG
(
INFO
)
<<
"Master conn factory create pika master conn"
;
return
std
::
make_shared
<
PikaMasterConn
>
(
connfd
,
ip_port
,
binlog_receiver_
);
}
else
{
LOG
(
INFO
)
<<
"Master conn factory creat pika binlog conn ip_port"
<<
ip_port
;
return
std
::
make_shared
<
PikaBinlogReceiverConn
>
(
connfd
,
ip_port
,
binlog_receiver_
);
}
}
private:
...
...
include/pika_conf.h
浏览文件 @
8d83b996
...
...
@@ -32,6 +32,7 @@ class PikaConf : public slash::BaseConf {
std
::
string
slaveof
()
{
RWLock
l
(
&
rwlock_
,
false
);
return
slaveof_
;}
int
slave_priority
()
{
RWLock
l
(
&
rwlock_
,
false
);
return
slave_priority_
;}
bool
write_binlog
()
{
RWLock
l
(
&
rwlock_
,
false
);
return
write_binlog_
;}
std
::
string
identify_binlog_type
()
{
RWLock
l
(
&
rwlock_
,
false
);
return
identify_binlog_type_
;}
int
thread_num
()
{
RWLock
l
(
&
rwlock_
,
false
);
return
thread_num_
;
}
int
thread_pool_size
()
{
RWLock
l
(
&
rwlock_
,
false
);
return
thread_pool_size_
;
}
int
sync_thread_num
()
{
RWLock
l
(
&
rwlock_
,
false
);
return
sync_thread_num_
;
}
...
...
@@ -120,6 +121,11 @@ class PikaConf : public slash::BaseConf {
TryPushDiffCommands
(
"write-binlog"
,
value
);
write_binlog_
=
(
value
==
"yes"
)
?
true
:
false
;
}
void
SetIdentifyBinlogType
(
const
std
::
string
&
value
)
{
RWLock
l
(
&
rwlock_
,
true
);
TryPushDiffCommands
(
"identify-binlog-type"
,
value
);
identify_binlog_type_
=
value
;
}
void
SetMaxCacheStatisticKeys
(
const
int
value
)
{
RWLock
l
(
&
rwlock_
,
true
);
TryPushDiffCommands
(
"max-cache-statistic-keys"
,
std
::
to_string
(
value
));
...
...
@@ -264,6 +270,7 @@ private:
std
::
string
bgsave_path_
;
std
::
string
bgsave_prefix_
;
std
::
string
pidfile_
;
std
::
string
identify_binlog_type_
;
//char pidfile_[PIKA_WORD_SIZE];
std
::
string
compression_
;
...
...
include/pika_master_conn.h
0 → 100644
浏览文件 @
8d83b996
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef PIKA_MASTER_CONN_H_
#define PIKA_MASTER_CONN_H_
#include "pink/include/redis_conn.h"
#include "include/pika_command.h"
class
PikaBinlogReceiverThread
;
class
PikaMasterConn
:
public
pink
::
RedisConn
{
public:
PikaMasterConn
(
int
fd
,
std
::
string
ip_port
,
void
*
worker_specific_data
);
virtual
int
DealMessage
(
const
PikaCmdArgsType
&
argv
,
std
::
string
*
response
);
private:
bool
is_first_send_
;
PikaBinlogReceiverThread
*
binlog_receiver_
;
};
#endif
src/pika_admin.cc
浏览文件 @
8d83b996
...
...
@@ -1253,6 +1253,12 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt32
(
&
config_body
,
g_pika_conf
->
binlog_file_size
());
}
if
(
slash
::
stringmatch
(
pattern
.
data
(),
"identify-binlog-type"
,
1
))
{
elements
+=
2
;
EncodeString
(
&
config_body
,
"identify-binlog-type"
);
EncodeString
(
&
config_body
,
g_pika_conf
->
identify_binlog_type
());
}
if
(
slash
::
stringmatch
(
pattern
.
data
(),
"compression"
,
1
))
{
elements
+=
2
;
EncodeString
(
&
config_body
,
"compression"
);
...
...
@@ -1309,7 +1315,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
void
ConfigCmd
::
ConfigSet
(
std
::
string
&
ret
)
{
std
::
string
set_item
=
config_args_v_
[
1
];
if
(
set_item
==
"*"
)
{
ret
=
"*2
3
\r\n
"
;
ret
=
"*2
4
\r\n
"
;
EncodeString
(
&
ret
,
"loglevel"
);
EncodeString
(
&
ret
,
"timeout"
);
EncodeString
(
&
ret
,
"requirepass"
);
...
...
@@ -1327,6 +1333,7 @@ void ConfigCmd::ConfigSet(std::string& ret) {
EncodeString
(
&
ret
,
"slowlog-max-len"
);
EncodeString
(
&
ret
,
"slave-read-only"
);
EncodeString
(
&
ret
,
"write-binlog"
);
EncodeString
(
&
ret
,
"identify-binlog-type"
);
EncodeString
(
&
ret
,
"max-cache-statistic-keys"
);
EncodeString
(
&
ret
,
"small-compaction-threshold"
);
EncodeString
(
&
ret
,
"db-sync-speed"
);
...
...
@@ -1455,6 +1462,18 @@ void ConfigCmd::ConfigSet(std::string& ret) {
}
g_pika_conf
->
SetSlaveReadOnly
(
is_readonly
);
ret
=
"+OK
\r\n
"
;
}
else
if
(
set_item
==
"identify-binlog-type"
)
{
int
role
=
g_pika_server
->
role
();
if
(
role
==
PIKA_ROLE_SLAVE
||
role
==
PIKA_ROLE_DOUBLE_MASTER
)
{
ret
=
"-ERR need to close master-slave or double-master mode first
\r\n
"
;
return
;
}
else
if
(
value
!=
"new"
&&
value
!=
"old"
)
{
ret
=
"-ERR invalid identify-binlog-type (new or old)
\r\n
"
;
return
;
}
else
{
g_pika_conf
->
SetIdentifyBinlogType
(
value
);
ret
=
"+OK
\r\n
"
;
}
}
else
if
(
set_item
==
"max-cache-statistic-keys"
)
{
if
(
!
slash
::
string2l
(
value
.
data
(),
value
.
size
(),
&
ival
)
||
ival
<
0
)
{
ret
=
"-ERR Invalid argument
\'
"
+
value
+
"
\'
for CONFIG SET 'max-cache-statistic-keys'
\r\n
"
;
...
...
src/pika_conf.cc
浏览文件 @
8d83b996
...
...
@@ -279,6 +279,10 @@ int PikaConf::Load()
std
::
string
wb
;
GetConfStr
(
"write-binlog"
,
&
wb
);
write_binlog_
=
(
wb
==
"no"
)
?
false
:
true
;
GetConfStr
(
"identify-binlog-type"
,
&
identify_binlog_type_
);
if
(
identify_binlog_type_
!=
"new"
&&
identify_binlog_type_
!=
"old"
)
{
identify_binlog_type_
=
"new"
;
}
GetConfInt
(
"binlog-file-size"
,
&
binlog_file_size_
);
if
(
binlog_file_size_
<
1024
||
static_cast
<
int64_t
>
(
binlog_file_size_
)
>
(
1024LL
*
1024
*
1024
))
{
...
...
@@ -360,6 +364,7 @@ int PikaConf::ConfigRewrite() {
SetConfStr
(
"write-binlog"
,
write_binlog_
?
"yes"
:
"no"
);
SetConfInt
(
"binlog-file-size"
,
binlog_file_size_
);
SetConfStr
(
"identify-binlog-type"
,
identify_binlog_type_
);
SetConfStr
(
"compression"
,
compression_
);
SetConfInt
(
"max-cache-statistic-keys"
,
max_cache_statistic_keys_
);
SetConfInt
(
"small-compaction-threshold"
,
small_compaction_threshold_
);
...
...
src/pika_master_conn.cc
0 → 100644
浏览文件 @
8d83b996
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "include/pika_master_conn.h"
#include <glog/logging.h>
#include "slash/include/slash_string.h"
#include "slash/include/slash_coding.h"
#include "include/pika_server.h"
#include "include/pika_conf.h"
#include "include/pika_binlog_receiver_thread.h"
extern
PikaServer
*
g_pika_server
;
extern
PikaConf
*
g_pika_conf
;
PikaMasterConn
::
PikaMasterConn
(
int
fd
,
std
::
string
ip_port
,
void
*
worker_specific_data
)
:
RedisConn
(
fd
,
ip_port
,
NULL
)
{
is_first_send_
=
true
;
binlog_receiver_
=
reinterpret_cast
<
PikaBinlogReceiverThread
*>
(
worker_specific_data
);
}
int
PikaMasterConn
::
DealMessage
(
const
PikaCmdArgsType
&
_argv
,
std
::
string
*
response
)
{
PikaCmdArgsType
argv
=
_argv
;
// no reply
// eq set_is_reply(false);
if
(
argv
.
empty
())
{
return
-
2
;
}
g_pika_server
->
UpdateQueryNumAndExecCountTable
(
argv
[
0
]);
// Auth
if
(
is_first_send_
)
{
if
(
argv
.
size
()
==
2
&&
argv
[
0
]
==
"auth"
)
{
if
(
argv
[
1
]
==
std
::
to_string
(
g_pika_server
->
sid
()))
{
is_first_send_
=
false
;
LOG
(
INFO
)
<<
"BinlogReceiverThread AccessHandle succeeded, My server id: "
<<
g_pika_server
->
sid
()
<<
" Master auth server id: "
<<
argv
[
1
];
return
0
;
}
LOG
(
INFO
)
<<
"BinlogReceiverThread AccessHandle failed, My server id: "
<<
g_pika_server
->
sid
()
<<
" Master auth server id: "
<<
argv
[
1
];
}
return
-
2
;
}
// TODO(shq) maybe monitor do not need these infomation
BinlogItem
binlog_item
;
std
::
string
server_id
;
std
::
string
binlog_info
;
if
(
!
g_pika_server
->
DoubleMasterMode
())
{
if
(
argv
.
size
()
>
4
&&
*
(
argv
.
end
()
-
4
)
==
kPikaBinlogMagic
)
{
// Record new binlog format
argv
.
pop_back
();
// send_to_hub flag
binlog_info
=
argv
.
back
();
// binlog_info
argv
.
pop_back
();
uint32_t
exec_time
=
0
;
uint32_t
filenum
=
0
;
uint64_t
offset
=
0
;
slash
::
GetFixed32
(
&
binlog_info
,
&
exec_time
);
slash
::
GetFixed32
(
&
binlog_info
,
&
filenum
);
slash
::
GetFixed64
(
&
binlog_info
,
&
offset
);
binlog_item
.
set_exec_time
(
exec_time
);
binlog_item
.
set_filenum
(
filenum
);
binlog_item
.
set_offset
(
offset
);
server_id
=
argv
.
back
();
// server_id
argv
.
pop_back
();
binlog_item
.
set_server_id
(
std
::
atoi
(
server_id
.
c_str
()));
argv
.
pop_back
();
// kPikaBinlogMagic
}
}
// Monitor related
std
::
string
monitor_message
;
if
(
g_pika_server
->
HasMonitorClients
())
{
std
::
string
monitor_message
=
std
::
to_string
(
1.0
*
slash
::
NowMicros
()
/
1000000
)
+
" ["
+
this
->
ip_port
()
+
"]"
;
for
(
PikaCmdArgsType
::
iterator
iter
=
argv
.
begin
();
iter
!=
argv
.
end
();
iter
++
)
{
monitor_message
+=
" "
+
slash
::
ToRead
(
*
iter
);
}
g_pika_server
->
AddMonitorMessage
(
monitor_message
);
}
bool
is_readonly
=
g_pika_server
->
readonly
();
// Here, the binlog dispatch thread, instead of the binlog bgthread takes on the task to write binlog
// Only when the server is readonly
uint64_t
serial
=
binlog_receiver_
->
GetnPlusSerial
();
if
(
is_readonly
)
{
if
(
!
g_pika_server
->
WaitTillBinlogBGSerial
(
serial
))
{
return
-
2
;
}
std
::
string
opt
=
slash
::
StringToLower
(
argv
[
0
]);
Cmd
*
c_ptr
=
binlog_receiver_
->
GetCmd
(
opt
);
g_pika_server
->
logger_
->
Lock
();
g_pika_server
->
logger_
->
Put
(
c_ptr
->
ToBinlog
(
argv
,
binlog_item
.
exec_time
(),
std
::
to_string
(
binlog_item
.
server_id
()),
binlog_item
.
logic_id
(),
binlog_item
.
filenum
(),
binlog_item
.
offset
()));
g_pika_server
->
logger_
->
Unlock
();
g_pika_server
->
SignalNextBinlogBGSerial
();
}
PikaCmdArgsType
*
v
=
new
PikaCmdArgsType
(
argv
);
BinlogItem
*
b
=
new
BinlogItem
(
binlog_item
);
std
::
string
dispatch_key
=
argv
.
size
()
>=
2
?
argv
[
1
]
:
argv
[
0
];
g_pika_server
->
DispatchBinlogBG
(
dispatch_key
,
v
,
b
,
serial
,
is_readonly
);
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录