Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
b5b16935
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b5b16935
编写于
1月 13, 2015
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
dbms: Cluster: cleanups [#METR-2944].
上级
a7a988bd
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
62 addition
and
57 deletion
+62
-57
dbms/include/DB/Interpreters/Cluster.h
dbms/include/DB/Interpreters/Cluster.h
+3
-3
dbms/src/Interpreters/Cluster.cpp
dbms/src/Interpreters/Cluster.cpp
+59
-54
未找到文件。
dbms/include/DB/Interpreters/Cluster.h
浏览文件 @
b5b16935
...
...
@@ -18,8 +18,8 @@ class Cluster : private boost::noncopyable
public:
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
const
String
&
cluster_name
);
/// Построить кластер по именам шардов и реплик
, локальные обрабатываются так же как удаленные
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
std
::
vector
<
std
::
vector
<
String
>
>
names
,
/// Построить кластер по именам шардов и реплик
. Локальные обрабатываются так же как удаленные.
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
std
::
vector
<
std
::
vector
<
String
>
>
names
,
const
String
&
username
,
const
String
&
password
);
/// количество узлов clickhouse сервера, расположенных локально
...
...
@@ -80,7 +80,7 @@ private:
Addresses
addresses
;
AddressesWithFailover
addresses_with_failover
;
size_t
local_nodes_num
;
size_t
local_nodes_num
=
0
;
};
struct
Clusters
...
...
dbms/src/Interpreters/Cluster.cpp
浏览文件 @
b5b16935
...
...
@@ -59,8 +59,7 @@ Clusters::Clusters(const Settings & settings, const DataTypeFactory & data_type_
}
Cluster
::
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
const
String
&
cluster_name
)
:
local_nodes_num
(
0
)
Cluster
::
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
const
String
&
cluster_name
)
{
Poco
::
Util
::
AbstractConfiguration
&
config
=
Poco
::
Util
::
Application
::
instance
().
config
();
Poco
::
Util
::
AbstractConfiguration
::
Keys
config_keys
;
...
...
@@ -81,7 +80,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
slot_to_shard
.
insert
(
std
::
end
(
slot_to_shard
),
weight
,
shard_info_vec
.
size
());
if
(
const
auto
is_local
=
isLocal
(
addresses
.
back
()))
shard_info_vec
.
push_back
({{},
weight
,
is_local
});
shard_info_vec
.
push_back
({{},
weight
,
is_local
});
else
shard_info_vec
.
push_back
({{
addressToDirName
(
addresses
.
back
())},
weight
,
is_local
});
}
...
...
@@ -149,89 +148,93 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
}
if
(
!
addresses_with_failover
.
empty
()
&&
!
addresses
.
empty
())
throw
Exception
(
"There must be either 'node' or 'shard' elements in config"
,
ErrorCodes
::
EXCESSIVE_ELEMENT_IN_CONFIG
);
throw
Exception
(
"There must be either 'node' or 'shard' elements in config"
,
ErrorCodes
::
EXCESSIVE_ELEMENT_IN_CONFIG
);
if
(
addresses_with_failover
.
size
())
if
(
addresses_with_failover
.
size
())
{
for
(
const
auto
&
shard
:
addresses_with_failover
)
{
for
(
AddressesWithFailover
::
const_iterator
it
=
addresses_with_failover
.
begin
();
it
!=
addresses_with_failover
.
end
();
++
it
)
{
ConnectionPools
replicas
;
replicas
.
reserve
(
it
->
size
());
ConnectionPools
replicas
;
replicas
.
reserve
(
shard
.
size
());
bool
has_local_replics
=
false
;
for
(
Addresses
::
const_iterator
jt
=
it
->
begin
();
jt
!=
it
->
end
();
++
jt
)
{
if
(
isLocal
(
*
jt
))
{
has_local_replics
=
true
;
break
;
}
else
{
replicas
.
emplace_back
(
new
ConnectionPool
(
settings
.
distributed_connections_pool_size
,
jt
->
host_port
.
host
().
toString
(),
jt
->
host_port
.
port
(),
""
,
jt
->
user
,
jt
->
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
saturate
(
settings
.
connect_timeout_with_failover_ms
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
receive_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
send_timeout
,
settings
.
limits
.
max_execution_time
)));
}
}
if
(
has_local_replics
)
++
local_nodes_num
;
else
pools
.
emplace_back
(
new
ConnectionPoolWithFailover
(
replicas
,
settings
.
load_balancing
,
settings
.
connections_with_failover_max_tries
));
}
}
else
if
(
addresses
.
size
())
{
for
(
Addresses
::
const_iterator
it
=
addresses
.
begin
();
it
!=
addresses
.
end
();
++
it
)
bool
has_local_replics
=
false
;
for
(
const
auto
&
replica
:
shard
)
{
if
(
isLocal
(
*
it
))
if
(
isLocal
(
replica
))
{
++
local_nodes_num
;
has_local_replics
=
true
;
break
;
}
else
{
pool
s
.
emplace_back
(
new
ConnectionPool
(
replica
s
.
emplace_back
(
new
ConnectionPool
(
settings
.
distributed_connections_pool_size
,
it
->
host_port
.
host
().
toString
(),
it
->
host_port
.
port
(),
""
,
it
->
user
,
it
->
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
saturate
(
settings
.
connect_timeout
,
settings
.
limits
.
max_execution_time
),
replica
.
host_port
.
host
().
toString
(),
replica
.
host_port
.
port
(),
""
,
replica
.
user
,
replica
.
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
saturate
(
settings
.
connect_timeout_with_failover_ms
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
receive_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
send_timeout
,
settings
.
limits
.
max_execution_time
)));
}
}
if
(
has_local_replics
)
++
local_nodes_num
;
else
pools
.
emplace_back
(
new
ConnectionPoolWithFailover
(
replicas
,
settings
.
load_balancing
,
settings
.
connections_with_failover_max_tries
));
}
}
else
if
(
addresses
.
size
())
{
for
(
const
auto
&
address
:
addresses
)
{
if
(
isLocal
(
address
))
{
++
local_nodes_num
;
}
else
{
pools
.
emplace_back
(
new
ConnectionPool
(
settings
.
distributed_connections_pool_size
,
address
.
host_port
.
host
().
toString
(),
address
.
host_port
.
port
(),
""
,
address
.
user
,
address
.
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
saturate
(
settings
.
connect_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
receive_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
send_timeout
,
settings
.
limits
.
max_execution_time
)));
}
}
else
throw
Exception
(
"No addresses listed in config"
,
ErrorCodes
::
NO_ELEMENTS_IN_CONFIG
);
}
else
throw
Exception
(
"No addresses listed in config"
,
ErrorCodes
::
NO_ELEMENTS_IN_CONFIG
);
}
Cluster
::
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
std
::
vector
<
std
::
vector
<
String
>
>
names
,
const
String
&
username
,
const
String
&
password
)
:
local_nodes_num
(
0
)
Cluster
::
Cluster
(
const
Settings
&
settings
,
const
DataTypeFactory
&
data_type_factory
,
std
::
vector
<
std
::
vector
<
String
>
>
names
,
const
String
&
username
,
const
String
&
password
)
{
for
(
size_t
i
=
0
;
i
<
names
.
size
();
++
i
)
for
(
const
auto
&
shard
:
names
)
{
Addresses
current
;
for
(
size_t
j
=
0
;
j
<
names
[
i
].
size
();
++
j
)
current
.
emplace_back
(
names
[
i
][
j
]
,
username
,
password
);
for
(
auto
&
replica
:
shard
)
current
.
emplace_back
(
replica
,
username
,
password
);
addresses_with_failover
.
emplace_back
(
current
);
}
for
(
AddressesWithFailover
::
const_iterator
it
=
addresses_with_failover
.
begin
();
it
!=
addresses_with_failover
.
end
();
++
it
)
for
(
const
auto
&
shard
:
addresses_with_failover
)
{
ConnectionPools
replicas
;
replicas
.
reserve
(
it
->
size
());
replicas
.
reserve
(
shard
.
size
());
for
(
Addresses
::
const_iterator
jt
=
it
->
begin
();
jt
!=
it
->
end
();
++
jt
)
for
(
const
auto
&
replica
:
shard
)
{
replicas
.
emplace_back
(
new
ConnectionPool
(
settings
.
distributed_connections_pool_size
,
jt
->
host_port
.
host
().
toString
(),
jt
->
host_port
.
port
(),
""
,
jt
->
user
,
jt
->
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
replica
.
host_port
.
host
().
toString
(),
replica
.
host_port
.
port
(),
""
,
replica
.
user
,
replica
.
password
,
data_type_factory
,
"server"
,
Protocol
::
Compression
::
Enable
,
saturate
(
settings
.
connect_timeout_with_failover_ms
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
receive_timeout
,
settings
.
limits
.
max_execution_time
),
saturate
(
settings
.
send_timeout
,
settings
.
limits
.
max_execution_time
)));
}
pools
.
emplace_back
(
new
ConnectionPoolWithFailover
(
replicas
,
settings
.
load_balancing
,
settings
.
connections_with_failover_max_tries
));
}
}
...
...
@@ -259,9 +262,11 @@ bool Cluster::isLocal(const Address & address)
interfaces
.
end
()
!=
std
::
find_if
(
interfaces
.
begin
(),
interfaces
.
end
(),
[
&
](
const
Poco
::
Net
::
NetworkInterface
&
interface
)
{
return
interface
.
address
()
==
address
.
host_port
.
host
();
}))
{
LOG_INFO
(
&
Poco
::
Util
::
Application
::
instance
().
logger
(),
"Replica with address "
<<
address
.
host_port
.
toString
()
<<
" will be processed as local."
);
LOG_INFO
(
&
Poco
::
Util
::
Application
::
instance
().
logger
(),
"Replica with address "
<<
address
.
host_port
.
toString
()
<<
" will be processed as local."
);
return
true
;
}
return
false
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录