提交 5f5eeb0e 编写于 作者: J Jason

封装 packConn为net.Conn,方便操作,链接复用

上级 b8ea1cf3
......@@ -14,7 +14,8 @@ import (
type Channel struct {
model.Channel
Error string
Rx int
Tx int
listener net.Listener
......@@ -45,6 +46,7 @@ func (c *Channel) Open() error {
func (c *Channel) Dial() error {
conn, err := net.Dial(c.Net, c.Addr)
if err != nil {
c.Error = err.Error()
return err
}
......@@ -61,6 +63,7 @@ func (c *Channel) Listen() error {
case "tcp", "tcp4", "tcp6", "unix":
c.listener, err = net.Listen(c.Net, c.Addr)
if err != nil {
c.Error = err.Error()
return err
}
go c.accept()
......@@ -69,6 +72,7 @@ func (c *Channel) Listen() error {
c.packetConn, err = net.ListenPacket(c.Net, c.Addr)
if err != nil {
c.Error = err.Error()
return err
}
go c.receivePacket()
......@@ -98,6 +102,14 @@ func (c *Channel) Close() error {
return nil
}
func (c *Channel) GetLink(id int64) (*Link, error) {
v, ok := c.links.Load(id)
if !ok {
return nil, errors.New("连接不存在")
}
return v.(*Link), nil
}
func (c *Channel) accept() {
for c.listener != nil {
conn, err := c.listener.Accept()
......@@ -105,52 +117,59 @@ func (c *Channel) accept() {
log.Println("accept fail:", err)
continue
}
go c.receive(conn)
}
}
func (c *Channel) receive(conn net.Conn) {
client := newConnection(conn)
client.channel = c
link := newLink(c, conn)
//TODO 未开启注册,则直接保存
//未开启注册,则直接保存
if !c.RegisterEnable {
c.storeLink(client)
c.storeLink(link)
}
buf := make([]byte, 1024)
for client.conn != nil {
for link.conn != nil {
n, e := conn.Read(buf)
if e != nil {
log.Println(e)
break
}
client.onData(buf[:n])
link.onData(buf[:n])
}
//TODO 删除connect,或状态置空
}
func (c *Channel) storeLink(conn *Link) {
//删除connect,或状态置空
err := link.Close()
if err != nil {
log.Println(err)
}
lnk := model.Link{
Addr: conn.RemoteAddr.String(),
ChannelId: c.Id,
Online: time.Now(),
Created: time.Now(),
if c.Role == "server" && link.Serial != "" {
c.links.Delete(link.Id)
} else {
//等待5分钟,之后设为离线
time.AfterFunc(time.Minute*5, func() {
c.links.Delete(link.Id)
})
}
}
//storage.DB("link").Save(&lnk)
//TODO 保存链接
_, err := db.Engine.Insert(&lnk)
func (c *Channel) storeLink(l *Link) {
//保存链接
_, err := db.Engine.Insert(&l.Link)
if err != nil {
log.Println(err)
}
//根据ID保存
c.links.Store(c.Id, conn)
c.links.Store(c.Id, l)
}
func (c *Channel) storeError(err error) error {
c.Error = err.Error()
_, err = db.Engine.ID(c.Id).Cols("error").Update(&c.Channel)
return err
}
func (c *Channel) receivePacket() {
......@@ -170,8 +189,7 @@ func (c *Channel) receivePacket() {
if ok {
client = v.(*Link)
} else {
client = newPacketConnection(c.packetConn, addr)
client.channel = c
client = newPacketLink(c, c.packetConn, addr)
//根据ID保存
if !c.RegisterEnable {
......
package dtu
import (
"errors"
"net"
)
type PackConn struct {
net.PacketConn
addr net.Addr
}
func (c *PackConn)Read(b []byte) (n int, err error) {
return 0, errors.New("此接口不支持读")
}
func (c *PackConn)Write(b []byte) (n int, err error){
return c.WriteTo(b, c.addr)
}
func (c *PackConn)RemoteAddr() net.Addr {
return c.addr
}
\ No newline at end of file
......@@ -41,12 +41,11 @@ func Recovery() error {
}
func StartChannel(c *model.Channel) (*Channel, error) {
log.Println("start", c)
//log.Println("Start channel", c)
channel := NewChannel(c)
err := channel.Open()
if err != nil && channel != nil {
channel.Error = err.Error()
if err != nil {
return nil, err
}
channels.Store(c.Id, channel)
return channel, err
......
......@@ -14,16 +14,14 @@ import (
)
type Link struct {
Id int64
model.Link
Error string
Serial string
RemoteAddr net.Addr
//RemoteAddr net.Addr
Rx int
Tx int
conn interface{}
conn net.Conn
lastTime time.Time
......@@ -59,30 +57,40 @@ func (l *Link) checkRegister(buf []byte) error {
return err
}
if has {
//TODO 检查工作状态,如果同序号连接还在正常通讯,则关闭当前连接,回复:Duplicate register
lnk, err := l.channel.GetLink(link.Id)
if lnk != nil {
//如果同序号连接还在正常通讯,则关闭当前连接
if lnk.conn != nil {
return fmt.Errorf("duplicate serial %s", serial)
}
//复制有用的历史数据
l.Rx = lnk.Rx
l.Tx = lnk.Tx
//复制watcher
}
//更新客户端地址,
link.Addr = l.RemoteAddr.String()
link.Online = time.Now()
_, err := db.Engine.ID(link.Id).Cols("addr", "online").Update(link)
link.Addr = l.conn.RemoteAddr().String()
link.Online = true
link.OnlineAt = time.Now()
link.Error = ""
_, err = db.Engine.ID(link.Id).Cols("addr", "error", "online", "online_at").Update(link)
if err != nil {
return err
}
} else {
link = model.Link{
Serial: serial,
Addr: l.RemoteAddr.String(),
ChannelId: l.channel.Id,
Online: time.Now(),
Created: time.Now(),
}
_, err := db.Engine.Insert(&link)
//插入新记录
_, err := db.Engine.Insert(&l.Link)
if err != nil {
return err
}
l.Id = link.Id
}
//保存链接
l.channel.links.Store(link.Id, l)
//处理剩余内容
if l.channel.RegisterMax > 0 && n > l.channel.RegisterMax {
l.onData(buf[l.channel.RegisterMax:])
......@@ -132,29 +140,56 @@ func (l *Link) Send(buf []byte) (int, error) {
l.Tx += len(buf)
l.lastTime = time.Now()
if conn, ok := l.conn.(net.Conn); ok {
return conn.Write(buf)
return l.conn.Write(buf)
}
func (l *Link) Close() error {
if l.conn == nil {
return errors.New("连接已经关闭")
}
if conn, ok := l.conn.(net.PacketConn); ok {
return conn.WriteTo(buf, l.RemoteAddr)
err := l.conn.Close()
l.conn = nil
if err != nil {
return err
}
return 0, errors.New("错误的链接类型")
l.Online = false
_, err = db.Engine.ID(l.Id).Cols("online").Update(&l.Link)
return err
}
func (l *Link) Close() error {
return l.conn.(net.Conn).Close()
func (l *Link) storeError(err error) error {
l.Error = err.Error()
_, err = db.Engine.ID(l.Id).Cols("error").Update(&l.Link)
return err
}
func newConnection(conn net.Conn) *Link {
func newLink(c *Channel, conn net.Conn) *Link {
return &Link{
RemoteAddr: conn.RemoteAddr(),
Link: model.Link{
Addr: conn.RemoteAddr().String(),
ChannelId: c.Id,
PluginId: c.PluginId,
Online: true,
OnlineAt: time.Now(),
},
channel: c,
conn: conn,
}
}
func newPacketConnection(conn net.PacketConn, addr net.Addr) *Link {
func newPacketLink(c *Channel, conn net.PacketConn, addr net.Addr) *Link {
return &Link{
RemoteAddr: addr,
conn: conn,
Link: model.Link{
Addr: addr.String(),
ChannelId: c.Id,
PluginId: c.PluginId,
Online: true,
OnlineAt: time.Now(),
},
channel: c,
conn: &PackConn{
PacketConn: conn,
addr: addr,
},
}
}
......@@ -5,7 +5,7 @@ import "time"
type Channel struct {
Id int64 `json:"id"`
Name string `json:"name" xorm:"varchar(64)"`
//Tags string `json:"tags" xorm:"varchar(256)"`
Error string `json:"error" xorm:"varchar(256)"`
Disabled bool `json:"disabled" xorm:"default 0"` //此处 禁用 直接放到顶级,Update无效
Role string `json:"role" xorm:"varchar(16) notnull"`
......@@ -26,5 +26,5 @@ type Channel struct {
PluginId int64 `json:"plugin_id"`
//Creator int `json:"creator"`
Created time.Time `json:"created" xorm:"created"`
CreatedAt time.Time `json:"created_at" xorm:"created"`
}
......@@ -7,10 +7,12 @@ import (
type Link struct {
Id int64 `json:"id"`
Name string `json:"name" xorm:"varchar(64)"`
Error string `json:"error" xorm:"varchar(256)"`
Serial string `json:"serial" xorm:"varchar(128)"`
Addr string `json:"addr" xorm:"varchar(128) notnull"`
ChannelId int64 `json:"channel_id"`
PluginId int64 `json:"plugin_id"` //插件ID
Online time.Time `json:"online"`
Created time.Time `json:"created" xorm:"created"`
Online bool `json:"online"`
OnlineAt time.Time `json:"online_at"`
CreatedAt time.Time `json:"created_at" xorm:"created"`
}
......@@ -10,6 +10,6 @@ type Plugin struct {
Address string `json:"address" xorm:"varchar(128)"`
Path string `json:"path" xorm:"varchar(256)"`
Entry string `json:"entry" xorm:"varchar(256)"`
Expire time.Time `json:"expire"`
Created time.Time `json:"created" xorm:"created"`
ExpireAt time.Time `json:"expire_at"`
CreatedAt time.Time `json:"created_at" xorm:"created"`
}
......@@ -19,5 +19,5 @@ type User struct {
Disabled bool `json:"disabled,omitempty" xorm:"default 0"`
//创建时间
Created time.Time `json:"created" xorm:"created"`
CreatedAt time.Time `json:"created_at" xorm:"created"`
}
......@@ -30,7 +30,7 @@
<th nzColumnKey="role" nzShowFilter [nzFilterFn]="true" [nzFilters]="roleFilters">角色</th>
<th nzColumnKey="net" nzShowFilter [nzFilterFn]="true" [nzFilters]="netFilters">网络</th>
<th nzColumnKey="status" nzShowFilter [nzFilterFn]="true" [nzFilters]="statusFilters">状态</th>
<th nzColumnKey="created" [nzSortFn]="true">创建时间</th>
<th nzColumnKey="created_at" [nzSortFn]="true">创建时间</th>
<th></th>
</tr>
</thead>
......@@ -44,7 +44,7 @@
{{data.disabled ? '禁用' : ''}}
<a>启动/停止</a>
</td>
<td>{{ data.created | amDateFormat:'YYYY-MM-DD HH:mm:ss' }}</td>
<td>{{ data.created_at | amDateFormat:'YYYY-MM-DD HH:mm:ss' }}</td>
<td>
<a [routerLink]="'/admin/channel-monitor/'+data.id" title="监控">
<i nz-icon nzType="aim"></i>
......
......@@ -25,7 +25,7 @@
<th>序列号</th>
<th>地址</th>
<th nzColumnKey="status" nzShowFilter [nzFilterFn]="true" [nzFilters]="statusFilters">状态</th>
<th nzColumnKey="online" [nzSortFn]="true">上线时间</th>
<th nzColumnKey="online_at" [nzSortFn]="true">上线时间</th>
<th></th>
</tr>
</thead>
......@@ -37,9 +37,9 @@
<td>{{ data.serial }}</td>
<td>{{ data.addr}}</td>
<td>
{{data.online ? '在线' : '离线'}}
</td>
<td>{{ data.online | amDateFormat:'YYYY-MM-DD HH:mm:ss' }}</td>
<td>{{ data.online_at | amDateFormat:'YYYY-MM-DD HH:mm:ss' }}</td>
<td>
<a [routerLink]="'/admin/link-monitor/'+data.id" title="监控">
<i nz-icon nzType="aim"></i>
......
portal/src/favicon.ico

948 字节 | W: | H:

portal/src/favicon.ico

4.2 KB | W: | H:

portal/src/favicon.ico
portal/src/favicon.ico
portal/src/favicon.ico
portal/src/favicon.ico
  • 2-up
  • Swipe
  • Onion skin
......@@ -7,7 +7,6 @@ import (
"github.com/zgwit/dtu-admin/model"
"log"
"net/http"
"time"
)
func channels(ctx *gin.Context) {
......@@ -68,7 +67,6 @@ func channelCreate(ctx *gin.Context) {
}
// channel.Creator = TODO 从session中获取
channel.Created = time.Now()
_, err := db.Engine.Insert(&channel)
if err != nil {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册