Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Shardingsphere
提交
81c1b891
Shardingsphere
项目概览
apache
/
Shardingsphere
通知
56
Star
3
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Shardingsphere
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
81c1b891
编写于
5月 09, 2018
作者:
ShardingSphere
提交者:
GitHub
5月 09, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #813 from tristaZero/dev
handle master-rule for sharding-proxy
上级
b9f49bc8
a28aefcb
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
156 addition
and
35 deletion
+156
-35
sharding-core/src/main/java/io/shardingjdbc/core/parsing/parser/clause/InsertValuesClauseParser.java
.../core/parsing/parser/clause/InsertValuesClauseParser.java
+1
-0
sharding-core/src/main/java/io/shardingjdbc/core/yaml/proxy/YamlProxyConfiguration.java
.../shardingjdbc/core/yaml/proxy/YamlProxyConfiguration.java
+28
-8
sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/common/SQLExecuteBackendHandler.java
...ngjdbc/proxy/backend/common/SQLExecuteBackendHandler.java
+54
-11
sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/common/StatementExecuteBackendHandler.java
.../proxy/backend/common/StatementExecuteBackendHandler.java
+56
-10
sharding-proxy/src/main/java/io/shardingjdbc/proxy/config/ShardingRuleRegistry.java
...va/io/shardingjdbc/proxy/config/ShardingRuleRegistry.java
+17
-6
未找到文件。
sharding-core/src/main/java/io/shardingjdbc/core/parsing/parser/clause/InsertValuesClauseParser.java
浏览文件 @
81c1b891
...
...
@@ -49,6 +49,7 @@ import java.util.List;
*
* @author zhangliang
* @author maxiaoguang
* @author panjuan
*/
public
class
InsertValuesClauseParser
implements
SQLClauseParser
{
...
...
sharding-core/src/main/java/io/shardingjdbc/core/yaml/
sharding/YamlShardingConfigurationForProxy
.java
→
sharding-core/src/main/java/io/shardingjdbc/core/yaml/
proxy/YamlProxyConfiguration
.java
浏览文件 @
81c1b891
...
...
@@ -15,9 +15,14 @@
* </p>
*/
package
io.shardingjdbc.core.yaml.
sharding
;
package
io.shardingjdbc.core.yaml.
proxy
;
import
io.shardingjdbc.core.api.config.MasterSlaveRuleConfiguration
;
import
io.shardingjdbc.core.rule.MasterSlaveRule
;
import
io.shardingjdbc.core.rule.ShardingRule
;
import
io.shardingjdbc.core.yaml.masterslave.YamlMasterSlaveRuleConfiguration
;
import
io.shardingjdbc.core.yaml.sharding.DataSourceParameter
;
import
io.shardingjdbc.core.yaml.sharding.YamlShardingRuleConfiguration
;
import
lombok.Getter
;
import
lombok.Setter
;
import
org.yaml.snakeyaml.Yaml
;
...
...
@@ -29,6 +34,7 @@ import java.io.FileInputStream;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.io.InputStreamReader
;
import
java.util.Arrays
;
import
java.util.Collection
;
import
java.util.HashMap
;
import
java.util.Map
;
...
...
@@ -37,14 +43,17 @@ import java.util.Map;
* Yaml sharding configuration for proxy.
*
* @author zhangyonglun
* @author panjuan
*/
@Getter
@Setter
public
class
Yaml
ShardingConfigurationForProxy
{
public
class
Yaml
ProxyConfiguration
{
private
Map
<
String
,
DataSourceParameter
>
dataSources
=
new
HashMap
<>();
private
YamlShardingRuleConfiguration
shardingRule
;
private
YamlMasterSlaveRuleConfiguration
masterSlaveRule
=
new
YamlMasterSlaveRuleConfiguration
();
private
YamlShardingRuleConfiguration
shardingRule
=
new
YamlShardingRuleConfiguration
();
/**
* Unmarshal yaml sharding configuration from yaml file.
...
...
@@ -52,13 +61,14 @@ public class YamlShardingConfigurationForProxy {
* @param yamlFile yaml file
* @return yaml sharding configuration
* @throws IOException IO Exception
*
*/
public
static
Yaml
ShardingConfigurationForProxy
unmarshal
(
final
File
yamlFile
)
throws
IOException
{
public
static
Yaml
ProxyConfiguration
unmarshal
(
final
File
yamlFile
)
throws
IOException
{
try
(
FileInputStream
fileInputStream
=
new
FileInputStream
(
yamlFile
);
InputStreamReader
inputStreamReader
=
new
InputStreamReader
(
fileInputStream
,
"UTF-8"
)
)
{
return
new
Yaml
(
new
Constructor
(
Yaml
ShardingConfigurationForProxy
.
class
)).
loadAs
(
inputStreamReader
,
YamlShardingConfigurationForProxy
.
class
);
return
new
Yaml
(
new
Constructor
(
Yaml
ProxyConfiguration
.
class
)).
loadAs
(
inputStreamReader
,
YamlProxyConfiguration
.
class
);
}
}
...
...
@@ -69,9 +79,9 @@ public class YamlShardingConfigurationForProxy {
* @return yaml sharding configuration
* @throws IOException IO Exception
*/
public
static
Yaml
ShardingConfigurationForProxy
unmarshal
(
final
byte
[]
yamlBytes
)
throws
IOException
{
public
static
Yaml
ProxyConfiguration
unmarshal
(
final
byte
[]
yamlBytes
)
throws
IOException
{
try
(
InputStream
inputStream
=
new
ByteArrayInputStream
(
yamlBytes
))
{
return
new
Yaml
(
new
Constructor
(
Yaml
ShardingConfigurationForProxy
.
class
)).
loadAs
(
inputStream
,
YamlShardingConfigurationForProxy
.
class
);
return
new
Yaml
(
new
Constructor
(
Yaml
ProxyConfiguration
.
class
)).
loadAs
(
inputStream
,
YamlProxyConfiguration
.
class
);
}
}
...
...
@@ -81,7 +91,17 @@ public class YamlShardingConfigurationForProxy {
* @param dataSourceNames data source names
* @return sharding rule from yaml
*/
public
ShardingRule
get
ShardingRule
(
final
Collection
<
String
>
dataSourceNames
)
{
public
ShardingRule
obtain
ShardingRule
(
final
Collection
<
String
>
dataSourceNames
)
{
return
new
ShardingRule
(
shardingRule
.
getShardingRuleConfiguration
(),
dataSourceNames
.
isEmpty
()
?
dataSources
.
keySet
()
:
dataSourceNames
);
}
/**
* Get master slave rule from yaml.
*
* @return master slave rule.
*/
public
MasterSlaveRule
obtainMasterSlaveRule
()
{
return
null
==
masterSlaveRule
.
getMasterDataSourceName
()
?
new
MasterSlaveRule
(
new
MasterSlaveRuleConfiguration
(
""
,
""
,
Arrays
.
asList
(
""
),
null
))
:
new
MasterSlaveRule
(
masterSlaveRule
.
getMasterSlaveRuleConfiguration
());
}
}
sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/common/SQLExecuteBackendHandler.java
浏览文件 @
81c1b891
...
...
@@ -22,20 +22,23 @@ import io.shardingjdbc.core.constant.SQLType;
import
io.shardingjdbc.core.merger.MergeEngineFactory
;
import
io.shardingjdbc.core.merger.MergedResult
;
import
io.shardingjdbc.core.merger.QueryResult
;
import
io.shardingjdbc.core.parsing.SQLJudgeEngine
;
import
io.shardingjdbc.core.parsing.parser.sql.SQLStatement
;
import
io.shardingjdbc.core.parsing.parser.sql.dml.insert.InsertStatement
;
import
io.shardingjdbc.core.routing.SQLExecutionUnit
;
import
io.shardingjdbc.core.routing.SQLRouteResult
;
import
io.shardingjdbc.core.routing.StatementRoutingEngine
;
import
io.shardingjdbc.core.routing.router.masterslave.MasterSlaveRouter
;
import
io.shardingjdbc.core.routing.router.masterslave.MasterVisitedManager
;
import
io.shardingjdbc.proxy.backend.mysql.MySQLPacketQueryResult
;
import
io.shardingjdbc.proxy.config.ShardingRuleRegistry
;
import
io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket
;
import
io.shardingjdbc.proxy.transport.mysql.constant.ColumnType
;
import
io.shardingjdbc.proxy.transport.mysql.constant.StatusFlag
;
import
io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.CommandResponsePackets
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.text.query.ColumnDefinition41Packet
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.text.query.FieldCountPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.text.query.TextResultSetRowPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.text.query.ColumnDefinition41Packet
;
import
io.shardingjdbc.proxy.transport.mysql.packet.generic.EofPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.generic.ErrPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.generic.OKPacket
;
...
...
@@ -62,9 +65,7 @@ public final class SQLExecuteBackendHandler implements BackendHandler {
private
static
final
Integer
FETCH_ONE_ROW_A_TIME
=
Integer
.
MIN_VALUE
;
private
final
String
sql
;
private
final
StatementRoutingEngine
routingEngine
;
private
List
<
Connection
>
connections
;
private
List
<
ResultSet
>
resultSets
;
...
...
@@ -79,17 +80,36 @@ public final class SQLExecuteBackendHandler implements BackendHandler {
private
boolean
hasMoreResultValueFlag
;
private
final
DatabaseType
databaseType
;
private
final
boolean
showSQL
;
public
SQLExecuteBackendHandler
(
final
String
sql
,
final
DatabaseType
databaseType
,
final
boolean
showSQL
)
{
this
.
sql
=
sql
;
routingEngine
=
new
StatementRoutingEngine
(
ShardingRuleRegistry
.
getInstance
().
getShardingRule
(),
ShardingRuleRegistry
.
getInstance
().
getShardingMetaData
(),
databaseType
,
showSQL
);
connections
=
new
ArrayList
<>(
1024
);
resultSets
=
new
ArrayList
<>(
1024
);
isMerged
=
false
;
hasMoreResultValueFlag
=
true
;
this
.
databaseType
=
databaseType
;
this
.
showSQL
=
showSQL
;
}
@Override
public
CommandResponsePackets
execute
()
{
return
ShardingRuleRegistry
.
getInstance
().
isOnlyMasterSlave
()
?
executeForMasterSlave
()
:
executeForSharding
();
}
private
CommandResponsePackets
executeForMasterSlave
()
{
MasterSlaveRouter
masterSlaveRouter
=
new
MasterSlaveRouter
(
ShardingRuleRegistry
.
getInstance
().
getMasterSlaveRule
());
SQLStatement
sqlStatement
=
new
SQLJudgeEngine
(
sql
).
judge
();
String
dataSourceName
=
masterSlaveRouter
.
route
(
sqlStatement
.
getType
()).
iterator
().
next
();
List
<
CommandResponsePackets
>
result
=
new
LinkedList
<>();
result
.
add
(
execute
(
sqlStatement
,
dataSourceName
,
sql
));
return
merge
(
sqlStatement
,
result
);
}
private
CommandResponsePackets
executeForSharding
()
{
StatementRoutingEngine
routingEngine
=
new
StatementRoutingEngine
(
ShardingRuleRegistry
.
getInstance
().
getShardingRule
(),
ShardingRuleRegistry
.
getInstance
().
getShardingMetaData
(),
databaseType
,
showSQL
);
SQLRouteResult
routeResult
=
routingEngine
.
route
(
sql
);
if
(
routeResult
.
getExecutionUnits
().
isEmpty
())
{
return
new
CommandResponsePackets
(
new
OKPacket
(
1
,
0
,
0
,
StatusFlag
.
SERVER_STATUS_AUTOCOMMIT
.
getValue
(),
0
,
""
));
...
...
@@ -97,21 +117,22 @@ public final class SQLExecuteBackendHandler implements BackendHandler {
List
<
CommandResponsePackets
>
result
=
new
LinkedList
<>();
for
(
SQLExecutionUnit
each
:
routeResult
.
getExecutionUnits
())
{
// TODO multiple threads
result
.
add
(
execute
(
routeResult
.
getSqlStatement
(),
each
));
result
.
add
(
execute
(
routeResult
.
getSqlStatement
(),
each
.
getDataSource
(),
each
.
getSqlUnit
().
getSql
()
));
}
return
merge
(
routeResult
.
getSqlStatement
(),
result
);
}
private
CommandResponsePackets
execute
(
final
SQLStatement
sqlStatement
,
final
S
QLExecutionUnit
sqlExecutionUnit
)
{
private
CommandResponsePackets
execute
(
final
SQLStatement
sqlStatement
,
final
S
tring
dataSourceName
,
final
String
sql
)
{
switch
(
sqlStatement
.
getType
())
{
case
DQL:
case
DAL:
return
executeQuery
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
sqlExecutionUnit
.
getDataSource
()),
sqlExecutionUnit
.
getSqlUnit
().
getSql
()
);
return
executeQuery
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
dataSourceName
),
sql
);
case
DML:
case
DDL:
return
executeUpdate
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
sqlExecutionUnit
.
getDataSource
()),
sqlExecutionUnit
.
getSqlUnit
().
getSql
(),
sqlStatement
);
return
ShardingRuleRegistry
.
getInstance
().
isOnlyMasterSlave
()
?
executeUpdate
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
dataSourceName
),
sql
)
:
executeUpdate
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
dataSourceName
),
sql
,
sqlStatement
);
default
:
return
executeCommon
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
sqlExecutionUnit
.
getDataSource
()),
sqlExecutionUnit
.
getSqlUnit
().
getSql
()
);
return
executeCommon
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
dataSourceName
),
sql
);
}
}
...
...
@@ -143,6 +164,25 @@ public final class SQLExecuteBackendHandler implements BackendHandler {
return
new
CommandResponsePackets
(
new
OKPacket
(
1
,
affectedRows
,
lastInsertId
,
StatusFlag
.
SERVER_STATUS_AUTOCOMMIT
.
getValue
(),
0
,
""
));
}
catch
(
final
SQLException
ex
)
{
return
new
CommandResponsePackets
(
new
ErrPacket
(
1
,
ex
.
getErrorCode
(),
""
,
ex
.
getSQLState
(),
ex
.
getMessage
()));
}
finally
{
MasterVisitedManager
.
clear
();
}
}
private
CommandResponsePackets
executeUpdate
(
final
DataSource
dataSource
,
final
String
sql
)
{
try
(
Connection
connection
=
dataSource
.
getConnection
();
Statement
statement
=
connection
.
createStatement
())
{
int
affectedRows
=
statement
.
executeUpdate
(
sql
,
Statement
.
RETURN_GENERATED_KEYS
);
ResultSet
resultSet
=
statement
.
getGeneratedKeys
();
long
lastInsertId
=
0
;
while
(
resultSet
.
next
())
{
lastInsertId
=
resultSet
.
getLong
(
1
);
}
return
new
CommandResponsePackets
(
new
OKPacket
(
1
,
affectedRows
,
lastInsertId
,
StatusFlag
.
SERVER_STATUS_AUTOCOMMIT
.
getValue
(),
0
,
""
));
}
catch
(
final
SQLException
ex
)
{
return
new
CommandResponsePackets
(
new
ErrPacket
(
1
,
ex
.
getErrorCode
(),
""
,
ex
.
getSQLState
(),
ex
.
getMessage
()));
}
finally
{
MasterVisitedManager
.
clear
();
}
}
...
...
@@ -158,6 +198,8 @@ public final class SQLExecuteBackendHandler implements BackendHandler {
}
}
catch
(
final
SQLException
ex
)
{
return
new
CommandResponsePackets
(
new
ErrPacket
(
1
,
ex
.
getErrorCode
(),
""
,
ex
.
getSQLState
(),
ex
.
getMessage
()));
}
finally
{
MasterVisitedManager
.
clear
();
}
}
...
...
@@ -326,6 +368,7 @@ public final class SQLExecuteBackendHandler implements BackendHandler {
if
(
null
!=
each
)
{
try
{
each
.
close
();
MasterVisitedManager
.
clear
();
}
catch
(
final
SQLException
ignore
)
{
}
}
...
...
sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/common/StatementExecuteBackendHandler.java
浏览文件 @
81c1b891
...
...
@@ -22,17 +22,21 @@ import io.shardingjdbc.core.constant.SQLType;
import
io.shardingjdbc.core.merger.MergeEngineFactory
;
import
io.shardingjdbc.core.merger.MergedResult
;
import
io.shardingjdbc.core.merger.QueryResult
;
import
io.shardingjdbc.core.parsing.SQLJudgeEngine
;
import
io.shardingjdbc.core.parsing.parser.sql.SQLStatement
;
import
io.shardingjdbc.core.parsing.parser.sql.dml.insert.InsertStatement
;
import
io.shardingjdbc.core.routing.PreparedStatementRoutingEngine
;
import
io.shardingjdbc.core.routing.SQLExecutionUnit
;
import
io.shardingjdbc.core.routing.SQLRouteResult
;
import
io.shardingjdbc.core.routing.router.masterslave.MasterSlaveRouter
;
import
io.shardingjdbc.core.routing.router.masterslave.MasterVisitedManager
;
import
io.shardingjdbc.proxy.backend.mysql.MySQLPacketStatementExecuteQueryResult
;
import
io.shardingjdbc.proxy.config.ShardingRuleRegistry
;
import
io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket
;
import
io.shardingjdbc.proxy.transport.mysql.constant.ColumnType
;
import
io.shardingjdbc.proxy.transport.mysql.constant.StatusFlag
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.CommandResponsePackets
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.statement.PreparedStatementRegistry
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.statement.execute.BinaryResultSetRowPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.statement.execute.PreparedStatementParameter
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.text.query.ColumnDefinition41Packet
;
...
...
@@ -40,7 +44,6 @@ import io.shardingjdbc.proxy.transport.mysql.packet.command.text.query.FieldCoun
import
io.shardingjdbc.proxy.transport.mysql.packet.generic.EofPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.generic.ErrPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.generic.OKPacket
;
import
io.shardingjdbc.proxy.transport.mysql.packet.command.statement.PreparedStatementRegistry
;
import
javax.sql.DataSource
;
import
java.sql.Connection
;
...
...
@@ -65,8 +68,6 @@ public final class StatementExecuteBackendHandler implements BackendHandler {
private
final
List
<
PreparedStatementParameter
>
preparedStatementParameters
;
private
final
PreparedStatementRoutingEngine
routingEngine
;
private
List
<
Connection
>
connections
;
private
List
<
ResultSet
>
resultSets
;
...
...
@@ -83,19 +84,41 @@ public final class StatementExecuteBackendHandler implements BackendHandler {
private
boolean
hasMoreResultValueFlag
;
private
final
DatabaseType
databaseType
;
private
final
boolean
showSQL
;
private
final
String
sql
;
public
StatementExecuteBackendHandler
(
final
List
<
PreparedStatementParameter
>
preparedStatementParameters
,
final
int
statementId
,
final
DatabaseType
databaseType
,
final
boolean
showSQL
)
{
this
.
preparedStatementParameters
=
preparedStatementParameters
;
routingEngine
=
new
PreparedStatementRoutingEngine
(
PreparedStatementRegistry
.
getInstance
().
getSQL
(
statementId
),
ShardingRuleRegistry
.
getInstance
().
getShardingRule
(),
ShardingRuleRegistry
.
getInstance
().
getShardingMetaData
(),
databaseType
,
showSQL
);
connections
=
new
ArrayList
<>(
1024
);
resultSets
=
new
ArrayList
<>(
1024
);
columnTypes
=
new
ArrayList
<>(
32
);
isMerged
=
false
;
hasMoreResultValueFlag
=
true
;
this
.
databaseType
=
databaseType
;
this
.
showSQL
=
showSQL
;
sql
=
PreparedStatementRegistry
.
getInstance
().
getSQL
(
statementId
);
}
@Override
public
CommandResponsePackets
execute
()
{
return
ShardingRuleRegistry
.
getInstance
().
isOnlyMasterSlave
()
?
executeForMasterSlave
()
:
executeForSharding
();
}
private
CommandResponsePackets
executeForMasterSlave
()
{
MasterSlaveRouter
masterSlaveRouter
=
new
MasterSlaveRouter
(
ShardingRuleRegistry
.
getInstance
().
getMasterSlaveRule
());
SQLStatement
sqlStatement
=
new
SQLJudgeEngine
(
sql
).
judge
();
String
dataSourceName
=
masterSlaveRouter
.
route
(
sqlStatement
.
getType
()).
iterator
().
next
();
List
<
CommandResponsePackets
>
result
=
new
LinkedList
<>();
result
.
add
(
execute
(
sqlStatement
,
dataSourceName
,
sql
));
return
merge
(
sqlStatement
,
result
);
}
private
CommandResponsePackets
executeForSharding
()
{
PreparedStatementRoutingEngine
routingEngine
=
new
PreparedStatementRoutingEngine
(
sql
,
ShardingRuleRegistry
.
getInstance
().
getShardingRule
(),
ShardingRuleRegistry
.
getInstance
().
getShardingMetaData
(),
databaseType
,
showSQL
);
// TODO support null value parameter
SQLRouteResult
routeResult
=
routingEngine
.
route
(
getComStmtExecuteParameters
());
if
(
routeResult
.
getExecutionUnits
().
isEmpty
())
{
...
...
@@ -104,21 +127,22 @@ public final class StatementExecuteBackendHandler implements BackendHandler {
List
<
CommandResponsePackets
>
result
=
new
LinkedList
<>();
for
(
SQLExecutionUnit
each
:
routeResult
.
getExecutionUnits
())
{
// TODO multiple threads
result
.
add
(
execute
(
routeResult
.
getSqlStatement
(),
each
));
result
.
add
(
execute
(
routeResult
.
getSqlStatement
(),
each
.
getDataSource
(),
each
.
getSqlUnit
().
getSql
()
));
}
return
merge
(
routeResult
.
getSqlStatement
(),
result
);
}
private
CommandResponsePackets
execute
(
final
SQLStatement
sqlStatement
,
final
S
QLExecutionUnit
sqlExecutionUnit
)
{
private
CommandResponsePackets
execute
(
final
SQLStatement
sqlStatement
,
final
S
tring
dataSourceName
,
final
String
sql
)
{
switch
(
sqlStatement
.
getType
())
{
case
DQL:
case
DAL:
return
executeQuery
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
sqlExecutionUnit
.
getDataSource
()),
sqlExecutionUnit
.
getSqlUnit
().
getSql
()
);
return
executeQuery
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
dataSourceName
),
sql
);
case
DML:
case
DDL:
return
executeUpdate
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
sqlExecutionUnit
.
getDataSource
()),
sqlExecutionUnit
.
getSqlUnit
().
getSql
(),
sqlStatement
);
return
ShardingRuleRegistry
.
getInstance
().
isOnlyMasterSlave
()
?
executeUpdate
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
dataSourceName
),
sql
)
:
executeUpdate
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
dataSourceName
),
sql
,
sqlStatement
);
default
:
return
executeCommon
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
sqlExecutionUnit
.
getDataSource
()),
sqlExecutionUnit
.
getSqlUnit
().
getSql
()
);
return
executeCommon
(
ShardingRuleRegistry
.
getInstance
().
getDataSourceMap
().
get
(
dataSourceName
),
sql
);
}
}
...
...
@@ -170,6 +194,7 @@ public final class StatementExecuteBackendHandler implements BackendHandler {
}
catch
(
final
SQLException
ex
)
{
return
new
CommandResponsePackets
(
new
ErrPacket
(
1
,
ex
.
getErrorCode
(),
""
,
ex
.
getSQLState
(),
ex
.
getMessage
()));
}
finally
{
MasterVisitedManager
.
clear
();
if
(
null
!=
preparedStatement
)
{
try
{
preparedStatement
.
close
();
...
...
@@ -179,6 +204,24 @@ public final class StatementExecuteBackendHandler implements BackendHandler {
}
}
private
CommandResponsePackets
executeUpdate
(
final
DataSource
dataSource
,
final
String
sql
)
{
try
(
Connection
connection
=
dataSource
.
getConnection
();
PreparedStatement
preparedStatement
=
connection
.
prepareStatement
(
sql
,
Statement
.
RETURN_GENERATED_KEYS
))
{
setJDBCPreparedStatementParameters
(
preparedStatement
);
int
affectedRows
=
preparedStatement
.
executeUpdate
();
ResultSet
resultSet
=
preparedStatement
.
getGeneratedKeys
();
long
lastInsertId
=
0
;
while
(
resultSet
.
next
())
{
lastInsertId
=
resultSet
.
getLong
(
1
);
}
return
new
CommandResponsePackets
(
new
OKPacket
(
1
,
affectedRows
,
lastInsertId
,
StatusFlag
.
SERVER_STATUS_AUTOCOMMIT
.
getValue
(),
0
,
""
));
}
catch
(
final
SQLException
ex
)
{
return
new
CommandResponsePackets
(
new
ErrPacket
(
1
,
ex
.
getErrorCode
(),
""
,
ex
.
getSQLState
(),
ex
.
getMessage
()));
}
finally
{
MasterVisitedManager
.
clear
();
}
}
private
CommandResponsePackets
executeCommon
(
final
DataSource
dataSource
,
final
String
sql
)
{
try
(
Connection
connection
=
dataSource
.
getConnection
();
...
...
@@ -192,6 +235,8 @@ public final class StatementExecuteBackendHandler implements BackendHandler {
}
}
catch
(
final
SQLException
ex
)
{
return
new
CommandResponsePackets
(
new
ErrPacket
(
1
,
ex
.
getErrorCode
(),
""
,
ex
.
getSQLState
(),
ex
.
getMessage
()));
}
finally
{
MasterVisitedManager
.
clear
();
}
}
...
...
@@ -364,6 +409,7 @@ public final class StatementExecuteBackendHandler implements BackendHandler {
if
(
null
!=
each
)
{
try
{
each
.
close
();
MasterVisitedManager
.
clear
();
}
catch
(
final
SQLException
ignore
)
{
}
}
...
...
sharding-proxy/src/main/java/io/shardingjdbc/proxy/config/ShardingRuleRegistry.java
浏览文件 @
81c1b891
...
...
@@ -21,9 +21,10 @@ import com.zaxxer.hikari.HikariConfig;
import
com.zaxxer.hikari.HikariDataSource
;
import
io.shardingjdbc.core.exception.ShardingJdbcException
;
import
io.shardingjdbc.core.metadata.ShardingMetaData
;
import
io.shardingjdbc.core.rule.MasterSlaveRule
;
import
io.shardingjdbc.core.rule.ShardingRule
;
import
io.shardingjdbc.core.yaml.proxy.YamlProxyConfiguration
;
import
io.shardingjdbc.core.yaml.sharding.DataSourceParameter
;
import
io.shardingjdbc.core.yaml.sharding.YamlShardingConfigurationForProxy
;
import
io.shardingjdbc.proxy.metadata.ProxyShardingMetaData
;
import
lombok.Getter
;
...
...
@@ -40,6 +41,8 @@ import java.util.Map;
* Sharding rule registry.
*
* @author zhangliang
* @author zhangyonglun
* @author panjuan
*/
@Getter
public
final
class
ShardingRuleRegistry
{
...
...
@@ -50,24 +53,32 @@ public final class ShardingRuleRegistry {
private
final
ShardingRule
shardingRule
;
private
final
MasterSlaveRule
masterSlaveRule
;
private
final
ShardingMetaData
shardingMetaData
;
private
final
boolean
isOnlyMasterSlave
;
private
ShardingRuleRegistry
()
{
Yaml
ShardingConfigurationForProxy
yamlShardingConfigurationForProxy
;
Yaml
ProxyConfiguration
yamlProxyConfiguration
;
try
{
yaml
ShardingConfigurationForProxy
=
YamlShardingConfigurationForProxy
.
unmarshal
(
new
File
(
getClass
().
getResource
(
"/conf/sharding-config.yaml"
).
toURI
().
getPath
()));
yaml
ProxyConfiguration
=
YamlProxyConfiguration
.
unmarshal
(
new
File
(
getClass
().
getResource
(
"/conf/sharding-config.yaml"
).
toURI
().
getPath
()));
}
catch
(
final
IOException
|
URISyntaxException
ex
)
{
throw
new
ShardingJdbcException
(
ex
);
}
dataSourceMap
=
new
HashMap
<>(
128
,
1
);
Map
<
String
,
DataSourceParameter
>
dataSourceParameters
=
yaml
ShardingConfigurationForProxy
.
getDataSources
();
Map
<
String
,
DataSourceParameter
>
dataSourceParameters
=
yaml
ProxyConfiguration
.
getDataSources
();
for
(
String
each
:
dataSourceParameters
.
keySet
())
{
dataSourceMap
.
put
(
each
,
getDataSource
(
dataSourceParameters
.
get
(
each
)));
}
shardingRule
=
yamlShardingConfigurationForProxy
.
getShardingRule
(
Collections
.<
String
>
emptyList
());
shardingRule
=
yamlProxyConfiguration
.
obtainShardingRule
(
Collections
.<
String
>
emptyList
());
masterSlaveRule
=
yamlProxyConfiguration
.
obtainMasterSlaveRule
();
isOnlyMasterSlave
=
shardingRule
.
getTableRules
().
isEmpty
()
&&
!
masterSlaveRule
.
getMasterDataSourceName
().
isEmpty
();
try
{
shardingMetaData
=
new
ProxyShardingMetaData
(
dataSourceMap
);
shardingMetaData
.
init
(
shardingRule
);
if
(!
isOnlyMasterSlave
)
{
shardingMetaData
.
init
(
shardingRule
);
}
}
catch
(
final
SQLException
ex
)
{
throw
new
ShardingJdbcException
(
ex
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录