提交 7e7c9462 编写于 作者: yanghye's avatar yanghye

upgrade-dev v2.3.15

上级 529262f1
......@@ -12,6 +12,7 @@
package ipc
import (
"bytes"
"fmt"
. "github.com/energye/energy/consts"
"github.com/energye/energy/logger"
......@@ -67,9 +68,9 @@ func (m *ipcChannel) NewBrowser(memoryAddresses ...string) *browserChannel {
}
// Channel 返回指定通道链接
func (m *browserChannel) Channel(channelId int64) *channel {
func (m *browserChannel) Channel(channelId int64) *connect {
if value, ok := m.channel.Load(channelId); ok {
return value.(*channel)
return value.(*connect)
}
return nil
}
......@@ -93,23 +94,10 @@ func (m *browserChannel) Close() {
}
}
// putChannel 添加一个通道链接
func (m *browserChannel) putChannel(channelId int64, value *channel) {
m.channel.Store(channelId, value)
}
// onConnect 建立链接
func (m *browserChannel) onConnect(context IIPCContext) {
logger.Info("IPC browser on connect key_channelId:", context.ChannelId())
if chn := m.Channel(context.ChannelId()); chn != nil {
chn.IPCType = m.ipcType
chn.Conn = context.Connect()
} else {
m.putChannel(context.ChannelId(), &channel{
IPCType: m.ipcType,
Conn: context.Connect(),
})
}
func (m *browserChannel) onConnect(conn *connect) {
logger.Info("IPC browser on connect key_channelId:", conn.channelId)
m.channel.Store(conn.channelId, conn)
}
// removeChannel 删除指定通道
......@@ -147,7 +135,7 @@ func (m *browserChannel) sendMessage(messageType mt, channelId int64, data []byt
channelId = id
}
if chn := m.Channel(channelId); chn != nil {
_, _ = ipcWrite(messageType, channelId, data, chn.conn())
_, _ = chn.ipcWrite(messageType, channelId, data)
}
}
......@@ -173,36 +161,40 @@ func (m *browserChannel) accept() {
logger.Info("browser channel accept Error:", err.Error())
continue
}
go m.readHandler(conn)
go m.newConnection(conn)
}
}
// readHandler 读取数据
func (m *browserChannel) readHandler(conn net.Conn) {
// newConnection 新链接
func (m *browserChannel) newConnection(conn net.Conn) {
defer func() {
if err := recover(); err != nil {
logger.Error("IPC Server Accept Recover:", err)
}
}()
var channelId int64
var newConn *connect
defer func() {
m.removeChannel(channelId)
if newConn != nil {
m.removeChannel(newConn.channelId)
}
}()
var readHandler = &ipcReadHandler{
ct: Ct_Server,
ipcType: m.ipcType,
connect: conn,
handler: func(context IIPCContext) {
if context.Message().Type() == mt_connection {
message := json.NewJSONObject(context.Message().Data())
channelId = int64(message.GetIntByKey(key_channelId))
m.onConnect(context)
} else {
if m.handler != nil {
m.handler(context)
}
newConn = &connect{
writeBuf: new(bytes.Buffer),
ct: Ct_Server,
ipcType: m.ipcType,
conn: conn,
}
newConn.handler = func(context IIPCContext) {
if context.Message().Type() == mt_connection {
message := json.NewJSONObject(context.Message().Data())
newConn.channelId = int64(message.GetIntByKey(key_channelId))
message.Free()
m.onConnect(newConn)
} else {
if m.handler != nil {
m.handler(context)
}
},
}
}
ipcRead(readHandler)
newConn.ipcRead()
}
......@@ -12,6 +12,7 @@
package ipc
import (
"bytes"
"fmt"
. "github.com/energye/energy/consts"
"github.com/energye/energy/logger"
......@@ -22,9 +23,7 @@ import (
// renderChannel 渲染进程
type renderChannel struct {
channelId int64
ipcType IPC_TYPE
connect net.Conn
connect *connect
mutex sync.Mutex
isConnect bool
handler IPCCallback
......@@ -41,8 +40,7 @@ func (m *ipcChannel) NewRender(channelId int64, memoryAddresses ...string) *rend
if err != nil {
panic("Client failed to connect to IPC service Error: " + err.Error())
}
m.render.ipcType = IPCT_NET
m.render.connect = conn
m.render.connect = &connect{writeBuf: new(bytes.Buffer), conn: conn, channelId: channelId, ipcType: IPCT_NET, ct: Ct_Client}
} else {
memoryAddr := ipcSock
logger.Debug("new render channel for IPC Sock", memoryAddr)
......@@ -57,10 +55,8 @@ func (m *ipcChannel) NewRender(channelId int64, memoryAddresses ...string) *rend
if err != nil {
panic("Client failed to connect to IPC service Error: " + err.Error())
}
m.render.ipcType = IPCT_UNIX
m.render.connect = unixConn
m.render.connect = &connect{writeBuf: new(bytes.Buffer), conn: unixConn, channelId: channelId, ipcType: IPCT_UNIX, ct: Ct_Client}
}
m.render.channelId = channelId
go m.render.receive()
m.render.onConnection()
return m.render
......@@ -69,7 +65,7 @@ func (m *ipcChannel) NewRender(channelId int64, memoryAddresses ...string) *rend
// onConnection 建立链接
func (m *renderChannel) onConnection() {
message := json.NewJSONObject(nil)
message.Set(key_channelId, m.channelId)
message.Set(key_channelId, m.connect.channelId)
m.sendMessage(mt_connection, message.Bytes())
message.Free()
}
......@@ -83,7 +79,7 @@ func (m *renderChannel) Send(data []byte) {
func (m *renderChannel) sendMessage(messageType mt, data []byte) {
m.mutex.Lock()
defer m.mutex.Unlock()
_, _ = ipcWrite(messageType, m.channelId, data, m.conn())
_, _ = m.connect.ipcWrite(messageType, m.connect.channelId, data)
}
// Handler 设置自定义处理回调函数
......@@ -99,11 +95,6 @@ func (m *renderChannel) Close() {
}
}
// conn 返回通道链接
func (m *renderChannel) conn() net.Conn {
return m.connect
}
// receive 接收数据
func (m *renderChannel) receive() {
defer func() {
......@@ -112,15 +103,10 @@ func (m *renderChannel) receive() {
}
m.Close()
}()
var readHandler = &ipcReadHandler{
ct: Ct_Client,
ipcType: m.ipcType,
connect: m.connect,
handler: func(context IIPCContext) {
if m.handler != nil {
m.handler(context)
}
},
m.connect.handler = func(context IIPCContext) {
if m.handler != nil {
m.handler(context)
}
}
ipcRead(readHandler)
m.connect.ipcRead()
}
......@@ -48,7 +48,6 @@ var (
mutex: sync.Mutex{},
},
}
ipcWriteBuf = new(bytes.Buffer)
)
//消息类型
......@@ -165,12 +164,12 @@ type IMessage interface {
}
// ipcReadHandler ipc 消息读取处理
type ipcReadHandler struct {
ipcType IPC_TYPE
ct ChannelType
connect net.Conn
handler IPCCallback
}
//type ipcReadHandler struct {
// ipcType IPC_TYPE
// ct ChannelType
// connect net.Conn
// handler IPCCallback
//}
// ipcMessage 消息内容
type ipcMessage struct {
......@@ -187,24 +186,6 @@ type IPCContext struct {
message IMessage //
}
// Close 关闭当前ipc通道链接
func (m *ipcReadHandler) Close() {
if m.connect != nil {
m.connect.Close()
m.connect = nil
}
}
// Read 读取内容
func (m *ipcReadHandler) Read(b []byte) (n int, err error) {
if m.ipcType == IPCT_NET {
return m.connect.Read(b)
} else {
n, _, err := m.connect.(*net.UnixConn).ReadFromUnix(b)
return n, err
}
}
// Free 释放消息内存空间
func (m *IPCContext) Free() {
if m.message != nil {
......@@ -255,13 +236,40 @@ func (m *ipcMessage) clear() {
m.s = 0
}
type connect struct {
writeBuf *bytes.Buffer //
channelId int64 //通道ID
conn net.Conn //通道链接
ipcType IPC_TYPE //IPC类型
ct ChannelType //链接通道类型
handler IPCCallback //
}
// Close 关闭当前ipc通道链接
func (m *connect) Close() {
if m.conn != nil {
m.conn.Close()
m.conn = nil
}
}
// Read 读取内容
func (m *connect) Read(b []byte) (n int, err error) {
if m.ipcType == IPCT_NET {
return m.conn.Read(b)
} else {
n, _, err := m.conn.(*net.UnixConn).ReadFromUnix(b)
return n, err
}
}
// ipcWrite 写入消息
func ipcWrite(messageType mt, channelId int64, data []byte, conn net.Conn) (n int, err error) {
func (m *connect) ipcWrite(messageType mt, channelId int64, data []byte) (n int, err error) {
defer func() {
data = nil
}()
if conn == nil {
return 0, errors.New("链接未建立成功")
if m.conn == nil {
return 0, errors.New("通道链接未建立成功")
}
var (
dataByteLen = len(data)
......@@ -269,36 +277,36 @@ func ipcWrite(messageType mt, channelId int64, data []byte, conn net.Conn) (n in
if dataByteLen > math.MaxUint32 {
return 0, errors.New("超出最大消息长度")
}
binary.Write(ipcWriteBuf, binary.BigEndian, protocolHeader) //协议头
binary.Write(ipcWriteBuf, binary.BigEndian, int8(messageType)) //消息类型
binary.Write(ipcWriteBuf, binary.BigEndian, channelId) //通道Id
binary.Write(ipcWriteBuf, binary.BigEndian, uint32(dataByteLen)) //数据长度
binary.Write(ipcWriteBuf, binary.BigEndian, data) //数据
n, err = conn.Write(ipcWriteBuf.Bytes())
ipcWriteBuf.Reset()
binary.Write(m.writeBuf, binary.BigEndian, protocolHeader) //协议头
binary.Write(m.writeBuf, binary.BigEndian, int8(messageType)) //消息类型
binary.Write(m.writeBuf, binary.BigEndian, channelId) //通道Id
binary.Write(m.writeBuf, binary.BigEndian, uint32(dataByteLen)) //数据长度
binary.Write(m.writeBuf, binary.BigEndian, data) //数据
n, err = m.conn.Write(m.writeBuf.Bytes())
m.writeBuf.Reset()
return n, err
}
// ipcRead 读取消息
func ipcRead(handler *ipcReadHandler) {
func (m *connect) ipcRead() {
var ipcType, chnType string
if handler.ipcType == IPCT_NET {
if m.ipcType == IPCT_NET {
ipcType = "[net]"
} else {
ipcType = "[unix]"
}
if handler.ct == Ct_Server {
if m.ct == Ct_Server {
chnType = "[server]"
} else {
chnType = "[client]"
}
defer func() {
logger.Debug("IPC Read Disconnect type:", ipcType, "ChannelType:", chnType, "processType:", common.Args.ProcessType())
handler.Close()
m.Close()
}()
for {
header := make([]byte, headerLength)
size, err := handler.Read(header)
size, err := m.Read(header)
if err != nil {
logger.Debug("IPC Read【Error】IPCType:", ipcType, "ChannelType:", chnType, "Error:", err)
return
......@@ -346,16 +354,16 @@ func ipcRead(handler *ipcReadHandler) {
//数据
dataByte := make([]byte, dataLen)
if dataLen > 0 {
size, err = handler.Read(dataByte)
size, err = m.Read(dataByte)
}
if err != nil {
logger.Debug("binary.Read.data: ", err)
return
}
handler.handler(&IPCContext{
m.handler(&IPCContext{
channelId: channelId,
ipcType: handler.ipcType,
connect: handler.connect,
ipcType: m.ipcType,
connect: m.conn,
message: &ipcMessage{
t: mt(t),
s: dataLen,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册