diff --git a/dtu/channel.go b/dtu/channel.go index 56c9a8e0ff1e7bb0d953d1a1ba301e84f774734f..186878df37f1b175291a760446326f85667884f3 100644 --- a/dtu/channel.go +++ b/dtu/channel.go @@ -14,7 +14,8 @@ import ( type Channel struct { model.Channel - Error string + Rx int + Tx int listener net.Listener @@ -45,6 +46,7 @@ func (c *Channel) Open() error { func (c *Channel) Dial() error { conn, err := net.Dial(c.Net, c.Addr) if err != nil { + c.Error = err.Error() return err } @@ -61,6 +63,7 @@ func (c *Channel) Listen() error { case "tcp", "tcp4", "tcp6", "unix": c.listener, err = net.Listen(c.Net, c.Addr) if err != nil { + c.Error = err.Error() return err } go c.accept() @@ -69,6 +72,7 @@ func (c *Channel) Listen() error { c.packetConn, err = net.ListenPacket(c.Net, c.Addr) if err != nil { + c.Error = err.Error() return err } go c.receivePacket() @@ -98,6 +102,14 @@ func (c *Channel) Close() error { return nil } +func (c *Channel) GetLink(id int64) (*Link, error) { + v, ok := c.links.Load(id) + if !ok { + return nil, errors.New("连接不存在") + } + return v.(*Link), nil +} + func (c *Channel) accept() { for c.listener != nil { conn, err := c.listener.Accept() @@ -105,52 +117,59 @@ func (c *Channel) accept() { log.Println("accept fail:", err) continue } - go c.receive(conn) } } func (c *Channel) receive(conn net.Conn) { - client := newConnection(conn) - client.channel = c + link := newLink(c, conn) - //TODO 未开启注册,则直接保存 + //未开启注册,则直接保存 if !c.RegisterEnable { - c.storeLink(client) + c.storeLink(link) } buf := make([]byte, 1024) - for client.conn != nil { + for link.conn != nil { n, e := conn.Read(buf) if e != nil { log.Println(e) break } - client.onData(buf[:n]) + link.onData(buf[:n]) } - //TODO 删除connect,或状态置空 - -} - -func (c *Channel) storeLink(conn *Link) { + //删除connect,或状态置空 + err := link.Close() + if err != nil { + log.Println(err) + } - lnk := model.Link{ - Addr: conn.RemoteAddr.String(), - ChannelId: c.Id, - Online: time.Now(), - Created: time.Now(), + if c.Role == "server" && link.Serial != "" { + c.links.Delete(link.Id) + } else { + //等待5分钟,之后设为离线 + time.AfterFunc(time.Minute*5, func() { + c.links.Delete(link.Id) + }) } +} - //storage.DB("link").Save(&lnk) - //TODO 保存链接 - _, err := db.Engine.Insert(&lnk) +func (c *Channel) storeLink(l *Link) { + //保存链接 + _, err := db.Engine.Insert(&l.Link) if err != nil { log.Println(err) } //根据ID保存 - c.links.Store(c.Id, conn) + c.links.Store(c.Id, l) +} + +func (c *Channel) storeError(err error) error { + c.Error = err.Error() + _, err = db.Engine.ID(c.Id).Cols("error").Update(&c.Channel) + return err } func (c *Channel) receivePacket() { @@ -170,8 +189,7 @@ func (c *Channel) receivePacket() { if ok { client = v.(*Link) } else { - client = newPacketConnection(c.packetConn, addr) - client.channel = c + client = newPacketLink(c, c.packetConn, addr) //根据ID保存 if !c.RegisterEnable { diff --git a/dtu/conn.go b/dtu/conn.go new file mode 100644 index 0000000000000000000000000000000000000000..1ab498630ab08436077fd5878f50f57c65a433be --- /dev/null +++ b/dtu/conn.go @@ -0,0 +1,23 @@ +package dtu + +import ( + "errors" + "net" +) + +type PackConn struct { + net.PacketConn + addr net.Addr +} + +func (c *PackConn)Read(b []byte) (n int, err error) { + return 0, errors.New("此接口不支持读") +} + +func (c *PackConn)Write(b []byte) (n int, err error){ + return c.WriteTo(b, c.addr) +} + +func (c *PackConn)RemoteAddr() net.Addr { + return c.addr +} \ No newline at end of file diff --git a/dtu/misc.go b/dtu/dtu.go similarity index 92% rename from dtu/misc.go rename to dtu/dtu.go index 1f7109d1853fd95d75f7b26a00b73e4cd288fc37..9611758a606647de75f4486db98fd6d8d86754a6 100644 --- a/dtu/misc.go +++ b/dtu/dtu.go @@ -41,12 +41,11 @@ func Recovery() error { } func StartChannel(c *model.Channel) (*Channel, error) { - log.Println("start", c) - + //log.Println("Start channel", c) channel := NewChannel(c) err := channel.Open() - if err != nil && channel != nil { - channel.Error = err.Error() + if err != nil { + return nil, err } channels.Store(c.Id, channel) return channel, err diff --git a/dtu/link.go b/dtu/link.go index 204a096f41179db92bf710ea45cf1cd56389a866..6bc38ce8c432265cac9ace4a3409bfaa9c8cf113 100644 --- a/dtu/link.go +++ b/dtu/link.go @@ -14,16 +14,14 @@ import ( ) type Link struct { - Id int64 + model.Link - Error string - Serial string - RemoteAddr net.Addr + //RemoteAddr net.Addr Rx int Tx int - conn interface{} + conn net.Conn lastTime time.Time @@ -59,30 +57,40 @@ func (l *Link) checkRegister(buf []byte) error { return err } if has { - //TODO 检查工作状态,如果同序号连接还在正常通讯,则关闭当前连接,回复:Duplicate register + lnk, err := l.channel.GetLink(link.Id) + if lnk != nil { + //如果同序号连接还在正常通讯,则关闭当前连接 + if lnk.conn != nil { + return fmt.Errorf("duplicate serial %s", serial) + } + + //复制有用的历史数据 + l.Rx = lnk.Rx + l.Tx = lnk.Tx + + //复制watcher + } - //更新客户端地址, - link.Addr = l.RemoteAddr.String() - link.Online = time.Now() - _, err := db.Engine.ID(link.Id).Cols("addr", "online").Update(link) + link.Addr = l.conn.RemoteAddr().String() + link.Online = true + link.OnlineAt = time.Now() + link.Error = "" + + _, err = db.Engine.ID(link.Id).Cols("addr", "error", "online", "online_at").Update(link) if err != nil { return err } } else { - link = model.Link{ - Serial: serial, - Addr: l.RemoteAddr.String(), - ChannelId: l.channel.Id, - Online: time.Now(), - Created: time.Now(), - } - _, err := db.Engine.Insert(&link) + //插入新记录 + _, err := db.Engine.Insert(&l.Link) if err != nil { return err } - l.Id = link.Id } + //保存链接 + l.channel.links.Store(link.Id, l) + //处理剩余内容 if l.channel.RegisterMax > 0 && n > l.channel.RegisterMax { l.onData(buf[l.channel.RegisterMax:]) @@ -132,29 +140,56 @@ func (l *Link) Send(buf []byte) (int, error) { l.Tx += len(buf) l.lastTime = time.Now() - if conn, ok := l.conn.(net.Conn); ok { - return conn.Write(buf) + return l.conn.Write(buf) +} + +func (l *Link) Close() error { + if l.conn == nil { + return errors.New("连接已经关闭") } - if conn, ok := l.conn.(net.PacketConn); ok { - return conn.WriteTo(buf, l.RemoteAddr) + err := l.conn.Close() + l.conn = nil + if err != nil { + return err } - return 0, errors.New("错误的链接类型") + l.Online = false + _, err = db.Engine.ID(l.Id).Cols("online").Update(&l.Link) + return err } -func (l *Link) Close() error { - return l.conn.(net.Conn).Close() +func (l *Link) storeError(err error) error { + l.Error = err.Error() + _, err = db.Engine.ID(l.Id).Cols("error").Update(&l.Link) + return err } -func newConnection(conn net.Conn) *Link { +func newLink(c *Channel, conn net.Conn) *Link { return &Link{ - RemoteAddr: conn.RemoteAddr(), - conn: conn, + Link: model.Link{ + Addr: conn.RemoteAddr().String(), + ChannelId: c.Id, + PluginId: c.PluginId, + Online: true, + OnlineAt: time.Now(), + }, + channel: c, + conn: conn, } } -func newPacketConnection(conn net.PacketConn, addr net.Addr) *Link { +func newPacketLink(c *Channel, conn net.PacketConn, addr net.Addr) *Link { return &Link{ - RemoteAddr: addr, - conn: conn, + Link: model.Link{ + Addr: addr.String(), + ChannelId: c.Id, + PluginId: c.PluginId, + Online: true, + OnlineAt: time.Now(), + }, + channel: c, + conn: &PackConn{ + PacketConn: conn, + addr: addr, + }, } } diff --git a/model/channel.go b/model/channel.go index 3894e6d8308ad5edffc9644009174730cf0bfa88..8f9a801cf9490609fc60575a811404a4ed610fd3 100644 --- a/model/channel.go +++ b/model/channel.go @@ -5,7 +5,7 @@ import "time" type Channel struct { Id int64 `json:"id"` Name string `json:"name" xorm:"varchar(64)"` - //Tags string `json:"tags" xorm:"varchar(256)"` + Error string `json:"error" xorm:"varchar(256)"` Disabled bool `json:"disabled" xorm:"default 0"` //此处 禁用 直接放到顶级,Update无效 Role string `json:"role" xorm:"varchar(16) notnull"` @@ -26,5 +26,5 @@ type Channel struct { PluginId int64 `json:"plugin_id"` //Creator int `json:"creator"` - Created time.Time `json:"created" xorm:"created"` + CreatedAt time.Time `json:"created_at" xorm:"created"` } diff --git a/model/link.go b/model/link.go index 9fcedadbdef553628c6c49362eb1fbf01a518633..4aa75420f40b4c666c99f9a46d7157ba855818fe 100644 --- a/model/link.go +++ b/model/link.go @@ -7,10 +7,12 @@ import ( type Link struct { Id int64 `json:"id"` Name string `json:"name" xorm:"varchar(64)"` + Error string `json:"error" xorm:"varchar(256)"` Serial string `json:"serial" xorm:"varchar(128)"` Addr string `json:"addr" xorm:"varchar(128) notnull"` ChannelId int64 `json:"channel_id"` PluginId int64 `json:"plugin_id"` //插件ID - Online time.Time `json:"online"` - Created time.Time `json:"created" xorm:"created"` + Online bool `json:"online"` + OnlineAt time.Time `json:"online_at"` + CreatedAt time.Time `json:"created_at" xorm:"created"` } diff --git a/model/plugin.go b/model/plugin.go index 647454e26f01dfb70f632540a8980b2e409a55e5..3edef24ff8848b77ac8eb9ec555d65a3f34acbba 100644 --- a/model/plugin.go +++ b/model/plugin.go @@ -10,6 +10,6 @@ type Plugin struct { Address string `json:"address" xorm:"varchar(128)"` Path string `json:"path" xorm:"varchar(256)"` Entry string `json:"entry" xorm:"varchar(256)"` - Expire time.Time `json:"expire"` - Created time.Time `json:"created" xorm:"created"` + ExpireAt time.Time `json:"expire_at"` + CreatedAt time.Time `json:"created_at" xorm:"created"` } diff --git a/model/user.go b/model/user.go index ab16a653074cf8970fc54e00bff528444b8fa8cb..d2a845cb43e43e5811efa1bd75d56312bfa0a363 100644 --- a/model/user.go +++ b/model/user.go @@ -4,13 +4,13 @@ import "time" type User struct { //Id,自增 - Id int64 `json:"id"` + Id int64 `json:"id"` //用户名 - Username string `json:"username" xorm:"varchar(64) notnull unique"` + Username string `json:"username" xorm:"varchar(64) notnull unique"` //密码 MD5加密 - Password string `json:"password" xorm:"varchar(64) notnull"` + Password string `json:"password" xorm:"varchar(64) notnull"` //姓名 Name string `json:"name" xorm:"varchar(64)"` @@ -19,5 +19,5 @@ type User struct { Disabled bool `json:"disabled,omitempty" xorm:"default 0"` //创建时间 - Created time.Time `json:"created" xorm:"created"` + CreatedAt time.Time `json:"created_at" xorm:"created"` } diff --git a/portal/src/app/main/channel/channel.component.html b/portal/src/app/main/channel/channel.component.html index bbddd2a16d63a5bb4f6b9498675bf1d5376322e4..e3417805e5ac1a8c2415f83432b9a878ebebfdc8 100644 --- a/portal/src/app/main/channel/channel.component.html +++ b/portal/src/app/main/channel/channel.component.html @@ -30,7 +30,7 @@ 角色 网络 状态 - 创建时间 + 创建时间 @@ -44,7 +44,7 @@ {{data.disabled ? '禁用' : ''}} 启动/停止 - {{ data.created | amDateFormat:'YYYY-MM-DD HH:mm:ss' }} + {{ data.created_at | amDateFormat:'YYYY-MM-DD HH:mm:ss' }} diff --git a/portal/src/app/main/link/link.component.html b/portal/src/app/main/link/link.component.html index 626cc1cee004980db06584f5e22ea502d4a2b1f6..70d80bf95a66abe67dd5cc9aafbeb2d1d2fd115c 100644 --- a/portal/src/app/main/link/link.component.html +++ b/portal/src/app/main/link/link.component.html @@ -25,7 +25,7 @@ 序列号 地址 状态 - 上线时间 + 上线时间 @@ -37,9 +37,9 @@ {{ data.serial }} {{ data.addr}} - + {{data.online ? '在线' : '离线'}} - {{ data.online | amDateFormat:'YYYY-MM-DD HH:mm:ss' }} + {{ data.online_at | amDateFormat:'YYYY-MM-DD HH:mm:ss' }} diff --git a/portal/src/favicon.ico b/portal/src/favicon.ico index 997406ad22c29aae95893fb3d666c30258a09537..38637538a1b04b6fe2306f107479ae4a6d72418c 100644 Binary files a/portal/src/favicon.ico and b/portal/src/favicon.ico differ diff --git a/web/api/channel.go b/web/api/channel.go index 3c5971d5c92eb2903e312717907d8d85c24dfee1..9dd2727f57e7b6719d4dbf28758c5b370dcd3d69 100644 --- a/web/api/channel.go +++ b/web/api/channel.go @@ -7,7 +7,6 @@ import ( "github.com/zgwit/dtu-admin/model" "log" "net/http" - "time" ) func channels(ctx *gin.Context) { @@ -68,7 +67,6 @@ func channelCreate(ctx *gin.Context) { } // channel.Creator = TODO 从session中获取 - channel.Created = time.Now() _, err := db.Engine.Insert(&channel) if err != nil {