From 5f5eeb0e71b4f603c6ff684d0adf7c1f06337d89 Mon Sep 17 00:00:00 2001 From: Jason Date: Fri, 4 Sep 2020 16:12:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=81=E8=A3=85=20packConn=E4=B8=BAnet.Conn?= =?UTF-8?q?=EF=BC=8C=E6=96=B9=E4=BE=BF=E6=93=8D=E4=BD=9C=EF=BC=8C=E9=93=BE?= =?UTF-8?q?=E6=8E=A5=E5=A4=8D=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dtu/channel.go | 66 +++++++----- dtu/conn.go | 23 ++++ dtu/{misc.go => dtu.go} | 7 +- dtu/link.go | 99 ++++++++++++------ model/channel.go | 4 +- model/link.go | 6 +- model/plugin.go | 4 +- model/user.go | 8 +- .../app/main/channel/channel.component.html | 4 +- portal/src/app/main/link/link.component.html | 6 +- portal/src/favicon.ico | Bin 948 -> 4286 bytes web/api/channel.go | 2 - 12 files changed, 152 insertions(+), 77 deletions(-) create mode 100644 dtu/conn.go rename dtu/{misc.go => dtu.go} (92%) diff --git a/dtu/channel.go b/dtu/channel.go index 56c9a8e..186878d 100644 --- a/dtu/channel.go +++ b/dtu/channel.go @@ -14,7 +14,8 @@ import ( type Channel struct { model.Channel - Error string + Rx int + Tx int listener net.Listener @@ -45,6 +46,7 @@ func (c *Channel) Open() error { func (c *Channel) Dial() error { conn, err := net.Dial(c.Net, c.Addr) if err != nil { + c.Error = err.Error() return err } @@ -61,6 +63,7 @@ func (c *Channel) Listen() error { case "tcp", "tcp4", "tcp6", "unix": c.listener, err = net.Listen(c.Net, c.Addr) if err != nil { + c.Error = err.Error() return err } go c.accept() @@ -69,6 +72,7 @@ func (c *Channel) Listen() error { c.packetConn, err = net.ListenPacket(c.Net, c.Addr) if err != nil { + c.Error = err.Error() return err } go c.receivePacket() @@ -98,6 +102,14 @@ func (c *Channel) Close() error { return nil } +func (c *Channel) GetLink(id int64) (*Link, error) { + v, ok := c.links.Load(id) + if !ok { + return nil, errors.New("连接不存在") + } + return v.(*Link), nil +} + func (c *Channel) accept() { for c.listener != nil { conn, err := c.listener.Accept() @@ -105,52 +117,59 @@ func (c *Channel) accept() { log.Println("accept fail:", err) continue } - go c.receive(conn) } } func (c *Channel) receive(conn net.Conn) { - client := newConnection(conn) - client.channel = c + link := newLink(c, conn) - //TODO 未开启注册,则直接保存 + //未开启注册,则直接保存 if !c.RegisterEnable { - c.storeLink(client) + c.storeLink(link) } buf := make([]byte, 1024) - for client.conn != nil { + for link.conn != nil { n, e := conn.Read(buf) if e != nil { log.Println(e) break } - client.onData(buf[:n]) + link.onData(buf[:n]) } - //TODO 删除connect,或状态置空 - -} - -func (c *Channel) storeLink(conn *Link) { + //删除connect,或状态置空 + err := link.Close() + if err != nil { + log.Println(err) + } - lnk := model.Link{ - Addr: conn.RemoteAddr.String(), - ChannelId: c.Id, - Online: time.Now(), - Created: time.Now(), + if c.Role == "server" && link.Serial != "" { + c.links.Delete(link.Id) + } else { + //等待5分钟,之后设为离线 + time.AfterFunc(time.Minute*5, func() { + c.links.Delete(link.Id) + }) } +} - //storage.DB("link").Save(&lnk) - //TODO 保存链接 - _, err := db.Engine.Insert(&lnk) +func (c *Channel) storeLink(l *Link) { + //保存链接 + _, err := db.Engine.Insert(&l.Link) if err != nil { log.Println(err) } //根据ID保存 - c.links.Store(c.Id, conn) + c.links.Store(c.Id, l) +} + +func (c *Channel) storeError(err error) error { + c.Error = err.Error() + _, err = db.Engine.ID(c.Id).Cols("error").Update(&c.Channel) + return err } func (c *Channel) receivePacket() { @@ -170,8 +189,7 @@ func (c *Channel) receivePacket() { if ok { client = v.(*Link) } else { - client = newPacketConnection(c.packetConn, addr) - client.channel = c + client = newPacketLink(c, c.packetConn, addr) //根据ID保存 if !c.RegisterEnable { diff --git a/dtu/conn.go b/dtu/conn.go new file mode 100644 index 0000000..1ab4986 --- /dev/null +++ b/dtu/conn.go @@ -0,0 +1,23 @@ +package dtu + +import ( + "errors" + "net" +) + +type PackConn struct { + net.PacketConn + addr net.Addr +} + +func (c *PackConn)Read(b []byte) (n int, err error) { + return 0, errors.New("此接口不支持读") +} + +func (c *PackConn)Write(b []byte) (n int, err error){ + return c.WriteTo(b, c.addr) +} + +func (c *PackConn)RemoteAddr() net.Addr { + return c.addr +} \ No newline at end of file diff --git a/dtu/misc.go b/dtu/dtu.go similarity index 92% rename from dtu/misc.go rename to dtu/dtu.go index 1f7109d..9611758 100644 --- a/dtu/misc.go +++ b/dtu/dtu.go @@ -41,12 +41,11 @@ func Recovery() error { } func StartChannel(c *model.Channel) (*Channel, error) { - log.Println("start", c) - + //log.Println("Start channel", c) channel := NewChannel(c) err := channel.Open() - if err != nil && channel != nil { - channel.Error = err.Error() + if err != nil { + return nil, err } channels.Store(c.Id, channel) return channel, err diff --git a/dtu/link.go b/dtu/link.go index 204a096..6bc38ce 100644 --- a/dtu/link.go +++ b/dtu/link.go @@ -14,16 +14,14 @@ import ( ) type Link struct { - Id int64 + model.Link - Error string - Serial string - RemoteAddr net.Addr + //RemoteAddr net.Addr Rx int Tx int - conn interface{} + conn net.Conn lastTime time.Time @@ -59,30 +57,40 @@ func (l *Link) checkRegister(buf []byte) error { return err } if has { - //TODO 检查工作状态,如果同序号连接还在正常通讯,则关闭当前连接,回复:Duplicate register + lnk, err := l.channel.GetLink(link.Id) + if lnk != nil { + //如果同序号连接还在正常通讯,则关闭当前连接 + if lnk.conn != nil { + return fmt.Errorf("duplicate serial %s", serial) + } + + //复制有用的历史数据 + l.Rx = lnk.Rx + l.Tx = lnk.Tx + + //复制watcher + } - //更新客户端地址, - link.Addr = l.RemoteAddr.String() - link.Online = time.Now() - _, err := db.Engine.ID(link.Id).Cols("addr", "online").Update(link) + link.Addr = l.conn.RemoteAddr().String() + link.Online = true + link.OnlineAt = time.Now() + link.Error = "" + + _, err = db.Engine.ID(link.Id).Cols("addr", "error", "online", "online_at").Update(link) if err != nil { return err } } else { - link = model.Link{ - Serial: serial, - Addr: l.RemoteAddr.String(), - ChannelId: l.channel.Id, - Online: time.Now(), - Created: time.Now(), - } - _, err := db.Engine.Insert(&link) + //插入新记录 + _, err := db.Engine.Insert(&l.Link) if err != nil { return err } - l.Id = link.Id } + //保存链接 + l.channel.links.Store(link.Id, l) + //处理剩余内容 if l.channel.RegisterMax > 0 && n > l.channel.RegisterMax { l.onData(buf[l.channel.RegisterMax:]) @@ -132,29 +140,56 @@ func (l *Link) Send(buf []byte) (int, error) { l.Tx += len(buf) l.lastTime = time.Now() - if conn, ok := l.conn.(net.Conn); ok { - return conn.Write(buf) + return l.conn.Write(buf) +} + +func (l *Link) Close() error { + if l.conn == nil { + return errors.New("连接已经关闭") } - if conn, ok := l.conn.(net.PacketConn); ok { - return conn.WriteTo(buf, l.RemoteAddr) + err := l.conn.Close() + l.conn = nil + if err != nil { + return err } - return 0, errors.New("错误的链接类型") + l.Online = false + _, err = db.Engine.ID(l.Id).Cols("online").Update(&l.Link) + return err } -func (l *Link) Close() error { - return l.conn.(net.Conn).Close() +func (l *Link) storeError(err error) error { + l.Error = err.Error() + _, err = db.Engine.ID(l.Id).Cols("error").Update(&l.Link) + return err } -func newConnection(conn net.Conn) *Link { +func newLink(c *Channel, conn net.Conn) *Link { return &Link{ - RemoteAddr: conn.RemoteAddr(), - conn: conn, + Link: model.Link{ + Addr: conn.RemoteAddr().String(), + ChannelId: c.Id, + PluginId: c.PluginId, + Online: true, + OnlineAt: time.Now(), + }, + channel: c, + conn: conn, } } -func newPacketConnection(conn net.PacketConn, addr net.Addr) *Link { +func newPacketLink(c *Channel, conn net.PacketConn, addr net.Addr) *Link { return &Link{ - RemoteAddr: addr, - conn: conn, + Link: model.Link{ + Addr: addr.String(), + ChannelId: c.Id, + PluginId: c.PluginId, + Online: true, + OnlineAt: time.Now(), + }, + channel: c, + conn: &PackConn{ + PacketConn: conn, + addr: addr, + }, } } diff --git a/model/channel.go b/model/channel.go index 3894e6d..8f9a801 100644 --- a/model/channel.go +++ b/model/channel.go @@ -5,7 +5,7 @@ import "time" type Channel struct { Id int64 `json:"id"` Name string `json:"name" xorm:"varchar(64)"` - //Tags string `json:"tags" xorm:"varchar(256)"` + Error string `json:"error" xorm:"varchar(256)"` Disabled bool `json:"disabled" xorm:"default 0"` //此处 禁用 直接放到顶级,Update无效 Role string `json:"role" xorm:"varchar(16) notnull"` @@ -26,5 +26,5 @@ type Channel struct { PluginId int64 `json:"plugin_id"` //Creator int `json:"creator"` - Created time.Time `json:"created" xorm:"created"` + CreatedAt time.Time `json:"created_at" xorm:"created"` } diff --git a/model/link.go b/model/link.go index 9fcedad..4aa7542 100644 --- a/model/link.go +++ b/model/link.go @@ -7,10 +7,12 @@ import ( type Link struct { Id int64 `json:"id"` Name string `json:"name" xorm:"varchar(64)"` + Error string `json:"error" xorm:"varchar(256)"` Serial string `json:"serial" xorm:"varchar(128)"` Addr string `json:"addr" xorm:"varchar(128) notnull"` ChannelId int64 `json:"channel_id"` PluginId int64 `json:"plugin_id"` //插件ID - Online time.Time `json:"online"` - Created time.Time `json:"created" xorm:"created"` + Online bool `json:"online"` + OnlineAt time.Time `json:"online_at"` + CreatedAt time.Time `json:"created_at" xorm:"created"` } diff --git a/model/plugin.go b/model/plugin.go index 647454e..3edef24 100644 --- a/model/plugin.go +++ b/model/plugin.go @@ -10,6 +10,6 @@ type Plugin struct { Address string `json:"address" xorm:"varchar(128)"` Path string `json:"path" xorm:"varchar(256)"` Entry string `json:"entry" xorm:"varchar(256)"` - Expire time.Time `json:"expire"` - Created time.Time `json:"created" xorm:"created"` + ExpireAt time.Time `json:"expire_at"` + CreatedAt time.Time `json:"created_at" xorm:"created"` } diff --git a/model/user.go b/model/user.go index ab16a65..d2a845c 100644 --- a/model/user.go +++ b/model/user.go @@ -4,13 +4,13 @@ import "time" type User struct { //Id,自增 - Id int64 `json:"id"` + Id int64 `json:"id"` //用户名 - Username string `json:"username" xorm:"varchar(64) notnull unique"` + Username string `json:"username" xorm:"varchar(64) notnull unique"` //密码 MD5加密 - Password string `json:"password" xorm:"varchar(64) notnull"` + Password string `json:"password" xorm:"varchar(64) notnull"` //姓名 Name string `json:"name" xorm:"varchar(64)"` @@ -19,5 +19,5 @@ type User struct { Disabled bool `json:"disabled,omitempty" xorm:"default 0"` //创建时间 - Created time.Time `json:"created" xorm:"created"` + CreatedAt time.Time `json:"created_at" xorm:"created"` } diff --git a/portal/src/app/main/channel/channel.component.html b/portal/src/app/main/channel/channel.component.html index bbddd2a..e341780 100644 --- a/portal/src/app/main/channel/channel.component.html +++ b/portal/src/app/main/channel/channel.component.html @@ -30,7 +30,7 @@ 角色 网络 状态 - 创建时间 + 创建时间 @@ -44,7 +44,7 @@ {{data.disabled ? '禁用' : ''}} 启动/停止 - {{ data.created | amDateFormat:'YYYY-MM-DD HH:mm:ss' }} + {{ data.created_at | amDateFormat:'YYYY-MM-DD HH:mm:ss' }} diff --git a/portal/src/app/main/link/link.component.html b/portal/src/app/main/link/link.component.html index 626cc1c..70d80bf 100644 --- a/portal/src/app/main/link/link.component.html +++ b/portal/src/app/main/link/link.component.html @@ -25,7 +25,7 @@ 序列号 地址 状态 - 上线时间 + 上线时间 @@ -37,9 +37,9 @@ {{ data.serial }} {{ data.addr}} - + {{data.online ? '在线' : '离线'}} - {{ data.online | amDateFormat:'YYYY-MM-DD HH:mm:ss' }} + {{ data.online_at | amDateFormat:'YYYY-MM-DD HH:mm:ss' }} diff --git a/portal/src/favicon.ico b/portal/src/favicon.ico index 997406ad22c29aae95893fb3d666c30258a09537..38637538a1b04b6fe2306f107479ae4a6d72418c 100644 GIT binary patch literal 4286 zcmc(jO^+2t6ozZ~kOc(j3ju-XjBHT$A|w!IG5-K}u;dRY!+K_i?!5v|0=kmmny=je zF%Srei7V&M7*>*?CL)^1hm6lteG7}8?!I>>L3{Gvsy=njc~93lRb4$HjPN%$7W@qt zHixh+gzzpv6vlz_crn&lU!oKY{_TC<31BVQ117;ma2Nanp7dc~nPcb_=oZ=xCTP&W z9^U9Qmos zS5NM8z7NoT3toUG-^0rFp)Y+#|8+5x^S#CK3Gf3L*8i}2`oy;!ujZrI-JG8LgV)5J zVl)Oh!jGS^82Xp@Ua}f76=eTNTzW0{T-MLvA zzl?kon75V2Am<)%5!ZvKL%DaW(>>Cg`9G`XIFiD@4Ox+KdWfJ zYS6iw#W00V-E-9WPBOXgHTYI%hFp_ppR)tk%Q%0ce+@hZml)ql>wf(`NWO<@<^HwB z=1;~ZsS{(^P5(Ra3>;@(Hqm|!+#8OR`&#D<`tm9phF|VMbLKi{trgaS&vyqD;B9al z$aA#M-=%Nf%-16@CTk<@eY00K|Y@PUo!669S7zu$@A9rcM=R!1G(K} zY}j+$dzP%tr01UH`@*_i07;%&IpuZ_?A44Xy{6V@G6vsAqx8*1u6qOddSrXSgUXnC za>5<=@U|S~KCAr{182;*9?#)(4qOKl{W?8CziQ__dt2lV^qbzlb#_&mxnQluce3xJ z?O?G&5GI173Y{2dyjKxgSSh6t>o5lb%3R*g*KV-{cc@u_b)CBYU7IjCFmI-B}4sMJt3^s9NVg!P0 z6hDQy(L`XWMkB@zOLgN$4KYz;j0zZxq9KKdpZE#5@k0crP^5f9KO};h)ZDQ%ybhht z%t9#h|nu0K(bJ ztIkhEr!*UyrZWQ1k2+YkGqDi8Z<|mIN&$kzpKl{cNP=OQzXHz>vn+c)F)zO|Bou>E z2|-d_=qY#Y+yOu1a}XI?cU}%04)zz%anD(XZC{#~WreV!a$7k2Ug`?&CUEc0EtrkZ zL49MB)h!_K{H(*l_93D5tO0;BUnvYlo+;yss%n^&qjt6fZOa+}+FDO(~2>G z2dx@=JZ?DHP^;b7*Y1as5^uphBsh*s*z&MBd?e@I>-9kU>63PjP&^#5YTOb&x^6Cf z?674rmSHB5Fk!{Gv7rv!?qX#ei_L(XtwVqLX3L}$MI|kJ*w(rhx~tc&L&xP#?cQow zX_|gx$wMr3pRZIIr_;;O|8fAjd;1`nOeu5K(pCu7>^3E&D2OBBq?sYa(%S?GwG&_0-s%_v$L@R!5H_fc)lOb9ZoOO#p`Nn`KU z3LTTBtjwo`7(HA6 z7gmO$yTR!5L>Bsg!X8616{JUngg_@&85%>W=mChTR;x4`P=?PJ~oPuy5 zU-L`C@_!34D21{fD~Y8NVnR3t;aqZI3fIhmgmx}$oc-dKDC6Ap$Gy>a!`A*x2L1v0 WcZ@i?LyX}70000