Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
lijianghuflute
canal
提交
6b7109c6
canal
项目概览
lijianghuflute
/
canal
与 Fork 源项目一致
从无法访问的项目Fork
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
canal
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
6b7109c6
编写于
3月 12, 2018
作者:
A
agapple
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fixed issue #539 , support bio&netty
上级
b90c9fac
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
531 addition
and
258 deletion
+531
-258
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
...java/com/alibaba/otter/canal/deployer/CanalConstants.java
+2
-0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
...ava/com/alibaba/otter/canal/deployer/CanalController.java
+6
-0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java
...ter/canal/parse/driver/mysql/socket/BioSocketChannel.java
+136
-0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java
...canal/parse/driver/mysql/socket/BioSocketChannelPool.java
+24
-0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java
...r/canal/parse/driver/mysql/socket/NettySocketChannel.java
+143
-0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannelPool.java
...nal/parse/driver/mysql/socket/NettySocketChannelPool.java
+111
-0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java
.../otter/canal/parse/driver/mysql/socket/SocketChannel.java
+9
-129
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java
...er/canal/parse/driver/mysql/socket/SocketChannelPool.java
+35
-111
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java
...a/otter/canal/parse/driver/mysql/utils/PacketManager.java
+2
-2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
...baba/otter/canal/parse/inbound/mysql/MysqlConnection.java
+8
-7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java
...er/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java
+2
-2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
...tter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
+52
-6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
...ter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
+1
-1
未找到文件。
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
浏览文件 @
6b7109c6
...
...
@@ -32,6 +32,8 @@ public class CanalConstants {
public
static
final
String
CANAL_DESTINATION_PROPERTY
=
ROOT
+
".instance.destination"
;
public
static
final
String
CANAL_SOCKETCHANNEL
=
ROOT
+
"."
+
"socketChannel"
;
public
static
String
getInstanceModeKey
(
String
destination
)
{
return
MessageFormat
.
format
(
INSTANCE_MODE_TEMPLATE
,
destination
);
}
...
...
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
浏览文件 @
6b7109c6
...
...
@@ -85,6 +85,12 @@ public class CanalController {
// 初始化instance config
initInstanceConfig
(
properties
);
// init socketChannel
String
socketChannel
=
getProperty
(
properties
,
CanalConstants
.
CANAL_SOCKETCHANNEL
);
if
(
StringUtils
.
isNotEmpty
(
socketChannel
))
{
System
.
setProperty
(
CanalConstants
.
CANAL_SOCKETCHANNEL
,
socketChannel
);
}
// 准备canal server
cid
=
Long
.
valueOf
(
getProperty
(
properties
,
CanalConstants
.
CANAL_ID
));
ip
=
getProperty
(
properties
,
CanalConstants
.
CANAL_IP
);
...
...
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java
0 → 100644
浏览文件 @
6b7109c6
package
com.alibaba.otter.canal.parse.driver.mysql.socket
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.io.InterruptedIOException
;
import
java.io.OutputStream
;
import
java.net.Socket
;
import
java.net.SocketAddress
;
import
java.net.SocketException
;
import
java.net.SocketTimeoutException
;
/**
* 使用BIO进行dump
*
* @author chuanyi
*/
public
class
BioSocketChannel
implements
SocketChannel
{
static
final
int
DEFAULT_CONNECT_TIMEOUT
=
10
*
1000
;
static
final
int
SO_TIMEOUT
=
1000
;
private
Socket
socket
;
private
InputStream
input
;
private
OutputStream
output
;
BioSocketChannel
(
Socket
socket
)
throws
IOException
{
this
.
socket
=
socket
;
this
.
input
=
socket
.
getInputStream
();
this
.
output
=
socket
.
getOutputStream
();
}
public
void
write
(
byte
[]...
buf
)
throws
IOException
{
OutputStream
output
=
this
.
output
;
if
(
output
!=
null
)
{
for
(
byte
[]
bs
:
buf
)
{
output
.
write
(
bs
);
}
}
else
{
throw
new
SocketException
(
"Socket already closed."
);
}
}
public
byte
[]
read
(
int
readSize
)
throws
IOException
{
InputStream
input
=
this
.
input
;
byte
[]
data
=
new
byte
[
readSize
];
int
remain
=
readSize
;
if
(
input
==
null
)
{
throw
new
SocketException
(
"Socket already closed."
);
}
while
(
remain
>
0
)
{
try
{
int
read
=
input
.
read
(
data
,
readSize
-
remain
,
remain
);
if
(
read
>
-
1
)
{
remain
-=
read
;
}
else
{
throw
new
IOException
(
"EOF encountered."
);
}
}
catch
(
SocketTimeoutException
te
)
{
if
(
Thread
.
interrupted
())
{
throw
new
InterruptedIOException
(
"Interrupted while reading."
);
}
}
}
return
data
;
}
public
byte
[]
read
(
int
readSize
,
int
timeout
)
throws
IOException
{
InputStream
input
=
this
.
input
;
byte
[]
data
=
new
byte
[
readSize
];
int
remain
=
readSize
;
int
accTimeout
=
0
;
if
(
input
==
null
)
{
throw
new
SocketException
(
"Socket already closed."
);
}
while
(
remain
>
0
&&
accTimeout
<
timeout
)
{
try
{
int
read
=
input
.
read
(
data
,
readSize
-
remain
,
remain
);
if
(
read
>
-
1
)
{
remain
-=
read
;
}
else
{
throw
new
IOException
(
"EOF encountered."
);
}
}
catch
(
SocketTimeoutException
te
)
{
if
(
Thread
.
interrupted
())
{
throw
new
InterruptedIOException
(
"Interrupted while reading."
);
}
accTimeout
+=
SO_TIMEOUT
;
}
}
if
(
remain
>
0
&&
accTimeout
>=
timeout
)
{
throw
new
SocketTimeoutException
(
"Timeout occurred, failed to read "
+
readSize
+
" bytes in "
+
timeout
+
" milliseconds."
);
}
return
data
;
}
public
boolean
isConnected
()
{
Socket
socket
=
this
.
socket
;
if
(
socket
!=
null
)
{
return
socket
.
isConnected
();
}
return
false
;
}
public
SocketAddress
getRemoteSocketAddress
()
{
Socket
socket
=
this
.
socket
;
if
(
socket
!=
null
)
{
return
socket
.
getRemoteSocketAddress
();
}
return
null
;
}
public
void
close
()
{
Socket
socket
=
this
.
socket
;
if
(
socket
!=
null
)
{
try
{
socket
.
shutdownInput
();
}
catch
(
IOException
e
)
{
// Ignore, could not do anymore
}
try
{
socket
.
shutdownOutput
();
}
catch
(
IOException
e
)
{
// Ignore, could not do anymore
}
try
{
socket
.
close
();
}
catch
(
IOException
e
)
{
// Ignore, could not do anymore
}
}
this
.
input
=
null
;
this
.
output
=
null
;
this
.
socket
=
null
;
}
}
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java
0 → 100644
浏览文件 @
6b7109c6
package
com.alibaba.otter.canal.parse.driver.mysql.socket
;
import
java.net.Socket
;
import
java.net.SocketAddress
;
/**
* @author luoyaogui 实现channel的管理(监听连接、读数据、回收) 2016-12-28
* @author chuanyi 2018-3-3 保留<code>open</code>减少文件变更数量
*/
public
abstract
class
BioSocketChannelPool
{
public
static
BioSocketChannel
open
(
SocketAddress
address
)
throws
Exception
{
Socket
socket
=
new
Socket
();
socket
.
setReceiveBufferSize
(
32
*
1024
);
socket
.
setSendBufferSize
(
32
*
1024
);
socket
.
setSoTimeout
(
BioSocketChannel
.
SO_TIMEOUT
);
socket
.
setTcpNoDelay
(
true
);
socket
.
setKeepAlive
(
true
);
socket
.
setReuseAddress
(
true
);
socket
.
connect
(
address
,
BioSocketChannel
.
DEFAULT_CONNECT_TIMEOUT
);
return
new
BioSocketChannel
(
socket
);
}
}
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java
0 → 100644
浏览文件 @
6b7109c6
package
com.alibaba.otter.canal.parse.driver.mysql.socket
;
import
io.netty.buffer.ByteBuf
;
import
io.netty.buffer.PooledByteBufAllocator
;
import
io.netty.buffer.Unpooled
;
import
io.netty.channel.Channel
;
import
java.io.IOException
;
import
java.net.SocketAddress
;
/**
* 封装netty的通信channel和数据接收缓存,实现读、写、连接校验的功能。 2016-12-28
*
* @author luoyaogui
*/
public
class
NettySocketChannel
implements
SocketChannel
{
private
static
final
int
WAIT_PERIOD
=
10
;
// milliseconds
private
static
final
int
DEFAULT_INIT_BUFFER_SIZE
=
1024
*
1024
;
// 1MB,默认初始缓存大小
private
static
final
int
DEFAULT_MAX_BUFFER_SIZE
=
4
*
DEFAULT_INIT_BUFFER_SIZE
;
// 4MB,默认最大缓存大小
private
Channel
channel
=
null
;
private
Object
lock
=
new
Object
();
private
ByteBuf
cache
=
PooledByteBufAllocator
.
DEFAULT
.
directBuffer
(
DEFAULT_INIT_BUFFER_SIZE
);
// 缓存大小
public
Channel
getChannel
()
{
return
channel
;
}
public
void
setChannel
(
Channel
channel
)
{
this
.
channel
=
channel
;
}
public
void
writeCache
(
ByteBuf
buf
)
throws
InterruptedException
,
IOException
{
synchronized
(
lock
)
{
while
(
true
)
{
if
(
null
==
cache
)
{
throw
new
IOException
(
"socket is closed !"
);
}
cache
.
discardReadBytes
();
// 回收内存
// source buffer is empty.
if
(!
buf
.
isReadable
())
{
break
;
}
if
(
cache
.
isWritable
())
{
cache
.
writeBytes
(
buf
,
Math
.
min
(
cache
.
writableBytes
(),
buf
.
readableBytes
()));
}
else
{
// dest buffer is full.
lock
.
wait
(
WAIT_PERIOD
);
}
}
}
}
public
void
write
(
byte
[]...
buf
)
throws
IOException
{
if
(
channel
!=
null
&&
channel
.
isWritable
())
{
channel
.
writeAndFlush
(
Unpooled
.
copiedBuffer
(
buf
));
}
else
{
throw
new
IOException
(
"write failed ! please checking !"
);
}
}
public
byte
[]
read
(
int
readSize
)
throws
IOException
{
return
read
(
readSize
,
0
);
}
public
byte
[]
read
(
int
readSize
,
int
timeout
)
throws
IOException
{
int
accumulatedWaitTime
=
0
;
// 若读取内容较长,则自动扩充超时时间,以初始缓存大小为基准计算倍数
if
(
timeout
>
0
&&
readSize
>
DEFAULT_INIT_BUFFER_SIZE
)
{
timeout
*=
(
readSize
/
DEFAULT_INIT_BUFFER_SIZE
+
1
);
}
do
{
if
(
readSize
>
cache
.
readableBytes
())
{
if
(
null
==
channel
)
{
throw
new
IOException
(
"socket has Interrupted !"
);
}
// 默认缓存大小不够用时需自动扩充,否则将因缓存空间不足而造成I/O超时假象
if
(!
cache
.
isWritable
(
readSize
-
cache
.
readableBytes
()))
{
synchronized
(
lock
)
{
int
deltaSize
=
readSize
-
cache
.
readableBytes
();
// 同步锁后重新读取
deltaSize
=
deltaSize
-
cache
.
writableBytes
();
if
(
deltaSize
>
0
)
{
deltaSize
=
(
deltaSize
/
32
+
1
)
*
32
;
cache
.
capacity
(
cache
.
capacity
()
+
deltaSize
);
}
}
}
else
if
(
timeout
>
0
)
{
accumulatedWaitTime
+=
WAIT_PERIOD
;
if
(
accumulatedWaitTime
>
timeout
)
{
StringBuilder
sb
=
new
StringBuilder
(
"socket read timeout occured !"
);
sb
.
append
(
" readSize = "
).
append
(
readSize
);
sb
.
append
(
", readableBytes = "
).
append
(
cache
.
readableBytes
());
sb
.
append
(
", timeout = "
).
append
(
timeout
);
throw
new
IOException
(
sb
.
toString
());
}
}
synchronized
(
this
)
{
try
{
wait
(
WAIT_PERIOD
);
}
catch
(
InterruptedException
e
)
{
throw
new
IOException
(
"socket has Interrupted !"
);
}
}
}
else
{
byte
[]
back
=
new
byte
[
readSize
];
synchronized
(
lock
)
{
cache
.
readBytes
(
back
);
// 恢复自动扩充的过大缓存到默认初始大小,释放空间
if
(
cache
.
capacity
()
>
DEFAULT_MAX_BUFFER_SIZE
)
{
cache
.
discardReadBytes
();
// 回收已读空间,重置读写指针
if
(
cache
.
readableBytes
()
<
DEFAULT_INIT_BUFFER_SIZE
)
{
cache
.
capacity
(
DEFAULT_INIT_BUFFER_SIZE
);
}
}
}
return
back
;
}
}
while
(
true
);
}
public
boolean
isConnected
()
{
return
channel
!=
null
?
true
:
false
;
}
public
SocketAddress
getRemoteSocketAddress
()
{
return
channel
!=
null
?
channel
.
remoteAddress
()
:
null
;
}
public
void
close
()
{
if
(
channel
!=
null
)
{
channel
.
close
();
}
channel
=
null
;
cache
.
discardReadBytes
();
// 回收已占用的内存
cache
.
release
();
// 释放整个内存
cache
=
null
;
}
}
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannelPool.java
0 → 100644
浏览文件 @
6b7109c6
package
com.alibaba.otter.canal.parse.driver.mysql.socket
;
import
io.netty.bootstrap.Bootstrap
;
import
io.netty.buffer.ByteBuf
;
import
io.netty.buffer.PooledByteBufAllocator
;
import
io.netty.channel.AdaptiveRecvByteBufAllocator
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelFuture
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.channel.ChannelInitializer
;
import
io.netty.channel.ChannelOption
;
import
io.netty.channel.EventLoopGroup
;
import
io.netty.channel.SimpleChannelInboundHandler
;
import
io.netty.channel.nio.NioEventLoopGroup
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
import
java.io.IOException
;
import
java.net.SocketAddress
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.CountDownLatch
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author luoyaogui 实现channel的管理(监听连接、读数据、回收) 2016-12-28
*/
@SuppressWarnings
({
"rawtypes"
,
"deprecation"
})
public
abstract
class
NettySocketChannelPool
{
private
static
EventLoopGroup
group
=
new
NioEventLoopGroup
();
// 非阻塞IO线程组
private
static
Bootstrap
boot
=
new
Bootstrap
();
// 主
private
static
Map
<
Channel
,
SocketChannel
>
chManager
=
new
ConcurrentHashMap
<
Channel
,
SocketChannel
>();
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NettySocketChannelPool
.
class
);
static
{
boot
.
group
(
group
)
.
channel
(
NioSocketChannel
.
class
)
.
option
(
ChannelOption
.
SO_RCVBUF
,
32
*
1024
)
.
option
(
ChannelOption
.
SO_SNDBUF
,
32
*
1024
)
.
option
(
ChannelOption
.
TCP_NODELAY
,
true
)
// 如果是延时敏感型应用,建议关闭Nagle算法
.
option
(
ChannelOption
.
SO_KEEPALIVE
,
true
)
.
option
(
ChannelOption
.
RCVBUF_ALLOCATOR
,
AdaptiveRecvByteBufAllocator
.
DEFAULT
)
.
option
(
ChannelOption
.
ALLOCATOR
,
PooledByteBufAllocator
.
DEFAULT
)
//
.
handler
(
new
ChannelInitializer
()
{
@Override
protected
void
initChannel
(
Channel
ch
)
throws
Exception
{
ch
.
pipeline
().
addLast
(
new
BusinessHandler
());
// 命令过滤和handler添加管理
}
});
}
public
static
SocketChannel
open
(
SocketAddress
address
)
throws
Exception
{
SocketChannel
socket
=
null
;
ChannelFuture
future
=
boot
.
connect
(
address
).
sync
();
if
(
future
.
isSuccess
())
{
future
.
channel
().
pipeline
().
get
(
BusinessHandler
.
class
).
latch
.
await
();
socket
=
chManager
.
get
(
future
.
channel
());
}
if
(
null
==
socket
)
{
throw
new
IOException
(
"can't create socket!"
);
}
return
socket
;
}
public
static
class
BusinessHandler
extends
SimpleChannelInboundHandler
<
ByteBuf
>
{
private
NettySocketChannel
socket
=
null
;
private
final
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
@Override
public
void
channelInactive
(
ChannelHandlerContext
ctx
)
throws
Exception
{
socket
.
setChannel
(
null
);
chManager
.
remove
(
ctx
.
channel
());
// 移除
super
.
channelInactive
(
ctx
);
}
@Override
public
void
channelActive
(
ChannelHandlerContext
ctx
)
throws
Exception
{
socket
=
new
NettySocketChannel
();
socket
.
setChannel
(
ctx
.
channel
());
chManager
.
put
(
ctx
.
channel
(),
socket
);
latch
.
countDown
();
super
.
channelActive
(
ctx
);
}
@Override
protected
void
channelRead0
(
ChannelHandlerContext
ctx
,
ByteBuf
msg
)
throws
Exception
{
if
(
socket
!=
null
)
{
socket
.
writeCache
(
msg
);
}
else
{
// TODO: need graceful error handler.
logger
.
error
(
"no socket available."
);
}
}
@Override
public
void
exceptionCaught
(
ChannelHandlerContext
ctx
,
Throwable
cause
)
throws
Exception
{
// need output error for troubeshooting.
logger
.
error
(
"business error."
,
cause
);
ctx
.
close
();
}
}
}
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java
浏览文件 @
6b7109c6
package
com.alibaba.otter.canal.parse.driver.mysql.socket
;
import
io.netty.buffer.ByteBuf
;
import
io.netty.buffer.PooledByteBufAllocator
;
import
io.netty.buffer.Unpooled
;
import
io.netty.channel.Channel
;
import
java.io.IOException
;
import
java.net.SocketAddress
;
/**
* 封装netty的通信channel和数据接收缓存,实现读、写、连接校验的功能。 2016-12-28
*
* @author luoyaogui
* @author agapple 2018年3月12日 下午10:36:44
* @since 1.0.26
*/
public
class
SocketChannel
{
private
static
final
int
WAIT_PERIOD
=
10
;
// milliseconds
private
static
final
int
DEFAULT_INIT_BUFFER_SIZE
=
1024
*
1024
;
// 1MB,默认初始缓存大小
private
static
final
int
DEFAULT_MAX_BUFFER_SIZE
=
4
*
DEFAULT_INIT_BUFFER_SIZE
;
// 4MB,默认最大缓存大小
private
Channel
channel
=
null
;
private
Object
lock
=
new
Object
();
private
ByteBuf
cache
=
PooledByteBufAllocator
.
DEFAULT
.
directBuffer
(
DEFAULT_INIT_BUFFER_SIZE
);
// 缓存大小
public
Channel
getChannel
()
{
return
channel
;
}
public
void
setChannel
(
Channel
channel
)
{
this
.
channel
=
channel
;
}
public
void
writeCache
(
ByteBuf
buf
)
throws
InterruptedException
,
IOException
{
synchronized
(
lock
)
{
while
(
true
)
{
if
(
null
==
cache
)
{
throw
new
IOException
(
"socket is closed !"
);
}
cache
.
discardReadBytes
();
// 回收内存
// source buffer is empty.
if
(!
buf
.
isReadable
())
{
break
;
}
if
(
cache
.
isWritable
())
{
cache
.
writeBytes
(
buf
,
Math
.
min
(
cache
.
writableBytes
(),
buf
.
readableBytes
()));
}
else
{
// dest buffer is full.
lock
.
wait
(
WAIT_PERIOD
);
}
}
}
}
public
void
writeChannel
(
byte
[]...
buf
)
throws
IOException
{
if
(
channel
!=
null
&&
channel
.
isWritable
())
{
channel
.
writeAndFlush
(
Unpooled
.
copiedBuffer
(
buf
));
}
else
{
throw
new
IOException
(
"write failed ! please checking !"
);
}
}
public
byte
[]
read
(
int
readSize
)
throws
IOException
{
return
read
(
readSize
,
0
);
}
public
interface
SocketChannel
{
public
byte
[]
read
(
int
readSize
,
int
timeout
)
throws
IOException
{
int
accumulatedWaitTime
=
0
;
// 若读取内容较长,则自动扩充超时时间,以初始缓存大小为基准计算倍数
if
(
timeout
>
0
&&
readSize
>
DEFAULT_INIT_BUFFER_SIZE
)
{
timeout
*=
(
readSize
/
DEFAULT_INIT_BUFFER_SIZE
+
1
);
}
do
{
if
(
readSize
>
cache
.
readableBytes
())
{
if
(
null
==
channel
)
{
throw
new
IOException
(
"socket has Interrupted !"
);
}
public
void
write
(
byte
[]...
buf
)
throws
IOException
;
// 默认缓存大小不够用时需自动扩充,否则将因缓存空间不足而造成I/O超时假象
if
(!
cache
.
isWritable
(
readSize
-
cache
.
readableBytes
()))
{
synchronized
(
lock
)
{
int
deltaSize
=
readSize
-
cache
.
readableBytes
();
// 同步锁后重新读取
deltaSize
=
deltaSize
-
cache
.
writableBytes
();
if
(
deltaSize
>
0
)
{
deltaSize
=
(
deltaSize
/
32
+
1
)
*
32
;
cache
.
capacity
(
cache
.
capacity
()
+
deltaSize
);
}
}
}
else
if
(
timeout
>
0
)
{
accumulatedWaitTime
+=
WAIT_PERIOD
;
if
(
accumulatedWaitTime
>
timeout
)
{
StringBuilder
sb
=
new
StringBuilder
(
"socket read timeout occured !"
);
sb
.
append
(
" readSize = "
).
append
(
readSize
);
sb
.
append
(
", readableBytes = "
).
append
(
cache
.
readableBytes
());
sb
.
append
(
", timeout = "
).
append
(
timeout
);
throw
new
IOException
(
sb
.
toString
());
}
}
public
byte
[]
read
(
int
readSize
)
throws
IOException
;
synchronized
(
this
)
{
try
{
wait
(
WAIT_PERIOD
);
}
catch
(
InterruptedException
e
)
{
throw
new
IOException
(
"socket has Interrupted !"
);
}
}
}
else
{
byte
[]
back
=
new
byte
[
readSize
];
synchronized
(
lock
)
{
cache
.
readBytes
(
back
);
// 恢复自动扩充的过大缓存到默认初始大小,释放空间
if
(
cache
.
capacity
()
>
DEFAULT_MAX_BUFFER_SIZE
)
{
cache
.
discardReadBytes
();
// 回收已读空间,重置读写指针
if
(
cache
.
readableBytes
()
<
DEFAULT_INIT_BUFFER_SIZE
)
{
cache
.
capacity
(
DEFAULT_INIT_BUFFER_SIZE
);
}
}
}
return
back
;
}
}
while
(
true
);
}
public
byte
[]
read
(
int
readSize
,
int
timeout
)
throws
IOException
;
public
boolean
isConnected
()
{
return
channel
!=
null
?
true
:
false
;
}
public
boolean
isConnected
();
public
SocketAddress
getRemoteSocketAddress
()
{
return
channel
!=
null
?
channel
.
remoteAddress
()
:
null
;
}
public
SocketAddress
getRemoteSocketAddress
();
public
void
close
()
{
if
(
channel
!=
null
)
{
channel
.
close
();
}
channel
=
null
;
cache
.
discardReadBytes
();
// 回收已占用的内存
cache
.
release
();
// 释放整个内存
cache
=
null
;
}
public
void
close
();
}
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java
浏览文件 @
6b7109c6
package
com.alibaba.otter.canal.parse.driver.mysql.socket
;
import
io.netty.bootstrap.Bootstrap
;
import
io.netty.buffer.ByteBuf
;
import
io.netty.buffer.PooledByteBufAllocator
;
import
io.netty.channel.AdaptiveRecvByteBufAllocator
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelFuture
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.channel.ChannelInitializer
;
import
io.netty.channel.ChannelOption
;
import
io.netty.channel.EventLoopGroup
;
import
io.netty.channel.SimpleChannelInboundHandler
;
import
io.netty.channel.nio.NioEventLoopGroup
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
import
java.io.IOException
;
import
java.net.SocketAddress
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.CountDownLatch
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author luoyaogui 实现channel的管理(监听连接、读数据、回收) 2016-12-28
*/
@SuppressWarnings
({
"rawtypes"
,
"deprecation"
})
public
abstract
class
SocketChannelPool
{
private
static
EventLoopGroup
group
=
new
NioEventLoopGroup
();
// 非阻塞IO线程组
private
static
Bootstrap
boot
=
new
Bootstrap
();
// 主
private
static
Map
<
Channel
,
SocketChannel
>
chManager
=
new
ConcurrentHashMap
<
Channel
,
SocketChannel
>();
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
SocketChannelPool
.
class
);
static
{
boot
.
group
(
group
)
.
channel
(
NioSocketChannel
.
class
)
.
option
(
ChannelOption
.
SO_RCVBUF
,
32
*
1024
)
.
option
(
ChannelOption
.
SO_SNDBUF
,
32
*
1024
)
.
option
(
ChannelOption
.
TCP_NODELAY
,
true
)
// 如果是延时敏感型应用,建议关闭Nagle算法
.
option
(
ChannelOption
.
SO_KEEPALIVE
,
true
)
.
option
(
ChannelOption
.
RCVBUF_ALLOCATOR
,
AdaptiveRecvByteBufAllocator
.
DEFAULT
)
.
option
(
ChannelOption
.
ALLOCATOR
,
PooledByteBufAllocator
.
DEFAULT
)
//
.
handler
(
new
ChannelInitializer
()
{
@Override
protected
void
initChannel
(
Channel
ch
)
throws
Exception
{
ch
.
pipeline
().
addLast
(
new
BusinessHandler
());
// 命令过滤和handler添加管理
}
});
}
public
static
SocketChannel
open
(
SocketAddress
address
)
throws
Exception
{
SocketChannel
socket
=
null
;
ChannelFuture
future
=
boot
.
connect
(
address
).
sync
();
if
(
future
.
isSuccess
())
{
future
.
channel
().
pipeline
().
get
(
BusinessHandler
.
class
).
latch
.
await
();
socket
=
chManager
.
get
(
future
.
channel
());
}
if
(
null
==
socket
)
{
throw
new
IOException
(
"can't create socket!"
);
}
return
socket
;
}
public
static
class
BusinessHandler
extends
SimpleChannelInboundHandler
<
ByteBuf
>
{
private
SocketChannel
socket
=
null
;
private
final
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
@Override
public
void
channelInactive
(
ChannelHandlerContext
ctx
)
throws
Exception
{
socket
.
setChannel
(
null
);
chManager
.
remove
(
ctx
.
channel
());
// 移除
super
.
channelInactive
(
ctx
);
}
@Override
public
void
channelActive
(
ChannelHandlerContext
ctx
)
throws
Exception
{
socket
=
new
SocketChannel
();
socket
.
setChannel
(
ctx
.
channel
());
chManager
.
put
(
ctx
.
channel
(),
socket
);
latch
.
countDown
();
super
.
channelActive
(
ctx
);
}
@Override
protected
void
channelRead0
(
ChannelHandlerContext
ctx
,
ByteBuf
msg
)
throws
Exception
{
if
(
socket
!=
null
)
{
socket
.
writeCache
(
msg
);
}
else
{
// TODO: need graceful error handler.
logger
.
error
(
"no socket available."
);
}
}
@Override
public
void
exceptionCaught
(
ChannelHandlerContext
ctx
,
Throwable
cause
)
throws
Exception
{
// need output error for troubeshooting.
logger
.
error
(
"business error."
,
cause
);
ctx
.
close
();
}
}
}
package
com.alibaba.otter.canal.parse.driver.mysql.socket
;
import
java.net.SocketAddress
;
import
org.apache.commons.lang.StringUtils
;
/**
* @author agapple 2018年3月12日 下午10:46:22
* @since 1.0.26
*/
public
abstract
class
SocketChannelPool
{
public
static
SocketChannel
open
(
SocketAddress
address
)
throws
Exception
{
String
type
=
chooseSocketChannel
();
if
(
"netty"
.
equalsIgnoreCase
(
type
))
{
return
NettySocketChannelPool
.
open
(
address
);
}
else
{
return
BioSocketChannelPool
.
open
(
address
);
}
}
private
static
String
chooseSocketChannel
()
{
String
socketChannel
=
System
.
getenv
(
"canal.socketChannel"
);
if
(
StringUtils
.
isEmpty
(
socketChannel
))
{
socketChannel
=
System
.
getProperty
(
"canal.socketChannel"
);
}
if
(
StringUtils
.
isEmpty
(
socketChannel
))
{
socketChannel
=
"bio"
;
// bio or netty
}
return
socketChannel
;
}
}
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java
浏览文件 @
6b7109c6
...
...
@@ -28,7 +28,7 @@ public abstract class PacketManager {
}
public
static
void
writePkg
(
SocketChannel
ch
,
byte
[]...
srcs
)
throws
IOException
{
ch
.
write
Channel
(
srcs
);
ch
.
write
(
srcs
);
}
public
static
void
writeBody
(
SocketChannel
ch
,
byte
[]
body
)
throws
IOException
{
...
...
@@ -39,6 +39,6 @@ public abstract class PacketManager {
HeaderPacket
header
=
new
HeaderPacket
();
header
.
setPacketBodyLength
(
body
.
length
);
header
.
setPacketSequenceNumber
(
packetSeqNumber
);
ch
.
write
Channel
(
header
.
toBytes
(),
body
);
ch
.
write
(
header
.
toBytes
(),
body
);
}
}
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
浏览文件 @
6b7109c6
package
com.alibaba.otter.canal.parse.inbound.mysql
;
import
static
com
.
alibaba
.
otter
.
canal
.
parse
.
inbound
.
mysql
.
dbsync
.
DirectLogFetcher
.
MASTER_HEARTBEAT_PERIOD_SECONDS
;
import
java.io.IOException
;
import
java.net.InetSocketAddress
;
import
java.nio.charset.Charset
;
import
java.util.List
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
...
...
@@ -29,8 +30,6 @@ import com.taobao.tddl.dbsync.binlog.LogContext;
import
com.taobao.tddl.dbsync.binlog.LogDecoder
;
import
com.taobao.tddl.dbsync.binlog.LogEvent
;
import
static
com
.
alibaba
.
otter
.
canal
.
parse
.
inbound
.
mysql
.
dbsync
.
DirectLogFetcher
.
MASTER_HEARTBEAT_PERIOD_SECONDS
;
public
class
MysqlConnection
implements
ErosaConnection
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
MysqlConnection
.
class
);
...
...
@@ -298,10 +297,12 @@ public class MysqlConnection implements ErosaConnection {
}
/**
* MASTER_HEARTBEAT_PERIOD sets the interval in seconds between replication heartbeats.
* Whenever the master's binary log is updated with an event, the waiting period for the next heartbeat is reset.
* interval is a decimal value having the range 0 to 4294967 seconds and a resolution in milliseconds;
* the smallest nonzero value is 0.001. Heartbeats are sent by the master only if there are no unsent events
* MASTER_HEARTBEAT_PERIOD sets the interval in seconds between
* replication heartbeats. Whenever the master's binary log is updated
* with an event, the waiting period for the next heartbeat is reset.
* interval is a decimal value having the range 0 to 4294967 seconds and
* a resolution in milliseconds; the smallest nonzero value is 0.001.
* Heartbeats are sent by the master only if there are no unsent events
* in the binary log file for a period longer than interval.
*/
try
{
...
...
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java
浏览文件 @
6b7109c6
...
...
@@ -23,8 +23,8 @@ public class DirectLogFetcher extends LogFetcher {
// Master heartbeat interval
public
static
final
int
MASTER_HEARTBEAT_PERIOD_SECONDS
=
15
;
// +1s 确保 timeout > heartbeat interval
private
static
final
int
READ_TIMEOUT_MILLISECONDS
=
(
MASTER_HEARTBEAT_PERIOD_SECONDS
+
1
)
*
1000
;
// +1
0
s 确保 timeout > heartbeat interval
private
static
final
int
READ_TIMEOUT_MILLISECONDS
=
(
MASTER_HEARTBEAT_PERIOD_SECONDS
+
1
0
)
*
1000
;
/** Command to dump binlog */
public
static
final
byte
COM_BINLOG_DUMP
=
18
;
...
...
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
浏览文件 @
6b7109c6
...
...
@@ -2,15 +2,19 @@ package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.apache.commons.lang.StringUtils
;
import
com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket
;
import
com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket
;
import
com.alibaba.otter.canal.parse.exception.CanalParseException
;
import
com.alibaba.otter.canal.parse.inbound.TableMeta
;
import
com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta
;
import
com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection
;
import
com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser
;
import
com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta
;
import
com.alibaba.otter.canal.parse.inbound.mysql.tsdb.MemoryTableMeta
;
import
com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB
;
...
...
@@ -75,14 +79,22 @@ public class TableMetaCache {
}
private
TableMeta
getTableMetaByDB
(
String
fullname
)
throws
IOException
{
ResultSetPacket
packet
=
connection
.
query
(
"show create table "
+
fullname
);
String
[]
names
=
StringUtils
.
split
(
fullname
,
"`.`"
);
String
schema
=
names
[
0
];
String
table
=
names
[
1
].
substring
(
0
,
names
[
1
].
length
());
return
new
TableMeta
(
schema
,
table
,
parserTableMeta
(
schema
,
table
,
packet
));
try
{
ResultSetPacket
packet
=
connection
.
query
(
"show create table "
+
fullname
);
String
[]
names
=
StringUtils
.
split
(
fullname
,
"`.`"
);
String
schema
=
names
[
0
];
String
table
=
names
[
1
].
substring
(
0
,
names
[
1
].
length
());
return
new
TableMeta
(
schema
,
table
,
parseTableMeta
(
schema
,
table
,
packet
));
}
catch
(
Throwable
e
)
{
// fallback to desc table
ResultSetPacket
packet
=
connection
.
query
(
"desc "
+
fullname
);
String
[]
names
=
StringUtils
.
split
(
fullname
,
"`.`"
);
String
schema
=
names
[
0
];
String
table
=
names
[
1
].
substring
(
0
,
names
[
1
].
length
());
return
new
TableMeta
(
schema
,
table
,
parseTableMetaByDesc
(
packet
));
}
}
public
static
List
<
FieldMeta
>
parse
r
TableMeta
(
String
schema
,
String
table
,
ResultSetPacket
packet
)
{
public
static
List
<
FieldMeta
>
parseTableMeta
(
String
schema
,
String
table
,
ResultSetPacket
packet
)
{
if
(
packet
.
getFieldValues
().
size
()
>
1
)
{
String
createDDL
=
packet
.
getFieldValues
().
get
(
1
);
MemoryTableMeta
memoryTableMeta
=
new
MemoryTableMeta
();
...
...
@@ -94,6 +106,40 @@ public class TableMetaCache {
}
}
/**
* 处理desc table的结果
*/
public
static
List
<
FieldMeta
>
parseTableMetaByDesc
(
ResultSetPacket
packet
)
{
Map
<
String
,
Integer
>
nameMaps
=
new
HashMap
<
String
,
Integer
>(
6
,
1
f
);
int
index
=
0
;
for
(
FieldPacket
fieldPacket
:
packet
.
getFieldDescriptors
())
{
nameMaps
.
put
(
fieldPacket
.
getOriginalName
(),
index
++);
}
int
size
=
packet
.
getFieldDescriptors
().
size
();
int
count
=
packet
.
getFieldValues
().
size
()
/
packet
.
getFieldDescriptors
().
size
();
List
<
FieldMeta
>
result
=
new
ArrayList
<
FieldMeta
>();
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
FieldMeta
meta
=
new
FieldMeta
();
// 做一个优化,使用String.intern(),共享String对象,减少内存使用
meta
.
setColumnName
(
packet
.
getFieldValues
().
get
(
nameMaps
.
get
(
COLUMN_NAME
)
+
i
*
size
).
intern
());
meta
.
setColumnType
(
packet
.
getFieldValues
().
get
(
nameMaps
.
get
(
COLUMN_TYPE
)
+
i
*
size
));
meta
.
setNullable
(
StringUtils
.
equalsIgnoreCase
(
packet
.
getFieldValues
().
get
(
nameMaps
.
get
(
IS_NULLABLE
)
+
i
*
size
),
"YES"
));
meta
.
setKey
(
"PRI"
.
equalsIgnoreCase
(
packet
.
getFieldValues
().
get
(
nameMaps
.
get
(
COLUMN_KEY
)
+
i
*
size
)));
meta
.
setUnique
(
"UNI"
.
equalsIgnoreCase
(
packet
.
getFieldValues
().
get
(
nameMaps
.
get
(
COLUMN_KEY
)
+
i
*
size
)));
// 特殊处理引号
meta
.
setDefaultValue
(
DruidDdlParser
.
unescapeQuotaName
(
packet
.
getFieldValues
()
.
get
(
nameMaps
.
get
(
COLUMN_DEFAULT
)
+
i
*
size
)));
meta
.
setExtra
(
packet
.
getFieldValues
().
get
(
nameMaps
.
get
(
EXTRA
)
+
i
*
size
));
result
.
add
(
meta
);
}
return
result
;
}
public
TableMeta
getTableMeta
(
String
schema
,
String
table
)
{
return
getTableMeta
(
schema
,
table
,
true
);
}
...
...
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
浏览文件 @
6b7109c6
...
...
@@ -296,7 +296,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
ResultSetPacket
packet
=
connection
.
query
(
"show create table "
+
getFullName
(
schema
,
table
));
if
(
packet
.
getFieldValues
().
size
()
>
1
)
{
createDDL
=
packet
.
getFieldValues
().
get
(
1
);
tableMetaFromDB
.
setFields
(
TableMetaCache
.
parse
r
TableMeta
(
schema
,
table
,
packet
));
tableMetaFromDB
.
setFields
(
TableMetaCache
.
parseTableMeta
(
schema
,
table
,
packet
));
}
}
catch
(
IOException
e
)
{
if
(
e
.
getMessage
().
contains
(
"errorNumber=1146"
))
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录