提交 a0159eca 编写于 作者: W wuhanqing

ADD chtroom

上级 2fb09665
......@@ -7,6 +7,11 @@ easyim 是一个简单易用,二开友好,方便部署的即时通讯服务
代码源于刘丹冰老师视频教程:[8小时转职Golang工程师](https://www.bilibili.com/video/BV1gf4y1r79E/) - 即时通讯系统
## 客户端
- Flutter [https://github.com/dou23/easy_im](https://github.com/dou23/easy_im)
## 开发环境
下载并安装Go: https://golang.google.cn/doc/install
......
......@@ -2,9 +2,10 @@ package contract
// 定义服务接口
type IServer interface {
Start() //启动服务器方法
Lock()
Unlock()
// Start() //启动服务器方法
HandlerMsg(u IUser, msg []byte) error
// Lock()
// Unlock()
// Stop() //停止服务器方法
// Serve() //开启业务服务方法
// AddRouter(msgID uint32, router IRouter) //路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
......
package model
import (
"github.com/iotames/easyim/contract"
)
type GroupChat struct {
ID int64
msgCount int
Users []contract.IUser
}
package server
import (
"github.com/iotames/easyim/model"
)
// 创建聊天的基本单位: 聊天室
type ChatRoom struct {
ID int64
server *Server
msgCount int
msg chan []byte
usersMap map[string]bool
}
func NewChatRoom(addr string, server *Server) *ChatRoom {
usersMap := make(map[string]bool)
usersMap[addr] = true
cr := &ChatRoom{server: server, usersMap: usersMap}
go cr.ListenMessage()
return cr
}
func (c *ChatRoom) Join(addr string) {
c.usersMap[addr] = true
}
func (c *ChatRoom) Remove(addr string) {
delete(c.usersMap, addr)
}
func (c *ChatRoom) ReceiveDataToSend(msg *model.Msg) error {
// TODO 如果接收消息的用户离线,则保存离线消息。下次用户上线再发送
c.msgCount += 1
dp := model.GetDataPack()
data, err := dp.Pack(msg)
if err != nil {
return err
}
c.msg <- data
return nil
}
// 监听本房间是否有消息进来
func (c *ChatRoom) ListenMessage() {
s := c.server
for {
msg := <-c.msg
for k, _ := range c.usersMap {
u, ok := s.onLineMap[k]
if ok {
u.ReceiveDataToSend(msg)
}
}
}
}
......@@ -11,17 +11,21 @@ import (
// Handler 当前链接的业务
func Handler(s *Server, conn net.Conn) {
u := user.NewUser(conn, s)
u := user.NewUser(conn)
u.SetOnConnectStart(func(u user.User) {
fmt.Println("TCP连接建立成功:", conn.RemoteAddr().String())
})
u.SetOnConnectLost(func(u user.User) { fmt.Println("TCP连接断开") })
u.SetOnConnectLost(func(u user.User) {
addr := u.GetConn().RemoteAddr().String()
s.UserOffline(addr)
fmt.Println("TCP连接断开")
})
u.ConnectStart()
//接受客户端发送的消息
go func() {
for {
err := handler.MainHandler(u)
err := handler.MainHandler(s, u)
if err != nil {
if err.Error() == user.ERR_CONNECT_LOST {
return
......
......@@ -9,7 +9,7 @@ import (
)
// 用户处理消息的业务 Request
func MainHandler(u contract.IUser) error {
func MainHandler(s contract.IServer, u contract.IUser) error {
// 通过命令行读取的消息data, 有换行符,转为字符串值为: string(data[:len(data)-1])
logger := miniutils.GetLogger("")
data, err := u.GetConnData()
......@@ -33,7 +33,7 @@ func MainHandler(u contract.IUser) error {
req := model.NewRequest(data, u.GetConn())
err = req.ParseHttp()
if err != nil {
logger.Error(fmt.Sprintf("---ParseHttpError(%v)---", err))
logger.Error(fmt.Sprintf("---ParseHttpError(%v)--RequestRAW(%v)---", err, string(data)))
return err
}
if req.IsWebSocket() {
......@@ -53,33 +53,19 @@ func MainHandler(u contract.IUser) error {
logger.Debug("---TCP---ReceivedMessage--SUCCESS-----u.MsgCount=", u.MsgCount())
msg := model.Msg{}
err = dp.Unpack(data, &msg)
if err != nil {
return fmt.Errorf("unpack msg fail:%v", err)
}
logger.Debug("-----ReceivedMsg(%v)--msg.ChatType(%d)--", msg.String(), msg.ChatType)
// 在线调试 http://www.websocket-test.com/, https://websocketking.com/
if (u.IsWebSocket() && msgCount == 2) || (!u.IsWebSocket() && msgCount == 1) {
// 接收到的第一条消息
// TODO 用户身份鉴权, 添加到聊天室
}
if msg.ChatType == model.Msg_SINGLE {
// 单聊。发送给TO_USER
data, err = dp.Pack(&msg)
return u.SendData(data)
}
if msg.ChatType == model.Msg_GROUP {
// 群聊。发送给群里的每一个成员。
data, err = dp.Pack(&msg)
return u.SendData(data)
}
return fmt.Errorf("unknown ChatType")
return s.HandlerMsg(u, data)
// msg := model.Msg{}
// err = dp.Unpack(data, &msg)
// if err != nil {
// return fmt.Errorf("unpack msg fail:%v", err)
// }
// logger.Debug(fmt.Sprintf("---msg.ChatType(%d)--msg.MsgType(%d)-msg.Seq(%d)--msg.Status(%d)--ReceivedMsg(%v)-", msg.ChatType, msg.MsgType, msg.Seq, msg.Status, msg.String()))
// // TODO 发送消息到监听组件
// // u.ReceiveDataToSend(data)
// data, _ = dp.Pack(&msg)
// return u.SendData(data)
// return fmt.Errorf("unknown ChatType")
//提取用户的消息(去除'\n')
// msg := string(data[:n-1])
......
......@@ -6,18 +6,21 @@ import (
"sync"
"github.com/iotames/easyim/config"
"github.com/iotames/easyim/contract"
"github.com/iotames/easyim/model"
"github.com/iotames/miniutils"
)
type Server struct {
Ip string
Port, DropAfter int
onLineMap map[string]contract.IUser
chatRoomsMap map[string]*ChatRoom
// //在线用户的列表
// OnlineMap map[string]*User
// 对User字典或字典中的user, 进行操作时,要加锁
lock sync.RWMutex
// //消息广播的channel
// Message chan string
}
// 创建一个server的接口
......@@ -32,6 +35,29 @@ func NewServer(conf config.Server) *Server {
return server
}
func (s *Server) UserOffline(addr string) {
s.Lock()
if _, ok := s.onLineMap[addr]; ok {
delete(s.onLineMap, addr)
// TODO 判断从哪个聊天室移除
fmt.Println("移除onLineMap")
}
s.Unlock()
}
func (s *Server) UserOnline(addr string, u contract.IUser) {
s.Lock()
if s.onLineMap == nil {
s.onLineMap = make(map[string]contract.IUser, 10)
}
_, ok := s.onLineMap[addr]
if !ok {
s.onLineMap[addr] = u
// TODO 判断加入哪个聊天室
}
s.Unlock()
}
func (s *Server) Lock() {
s.lock.Lock()
}
......@@ -88,6 +114,56 @@ func (s *Server) Start() {
}
}
func (s *Server) HandlerMsg(u contract.IUser, data []byte) error {
logger := miniutils.GetLogger("")
dp := model.GetDataPack()
msg := model.Msg{}
err := dp.Unpack(data, &msg)
if err != nil {
return fmt.Errorf("unpack msg fail:%v", err)
}
logger.Debug(fmt.Sprintf("---msg.ChatType(%d)--msg.MsgType(%d)-msg.Seq(%d)--msg.Status(%d)--ReceivedMsg(%v)-", msg.ChatType, msg.MsgType, msg.Seq, msg.Status, msg.String()))
// 在线调试 http://www.websocket-test.com/, https://websocketking.com/
msgCount := u.MsgCount()
if (u.IsWebSocket() && msgCount == 2) || (!u.IsWebSocket() && msgCount == 1) {
// 接收到的第一条消息
err = s.firstMsgComeIn(u, data)
if err != nil {
return err
}
// TODO 发送消息到监听组件
data, err = dp.Pack(&msg)
u.ReceiveDataToSend(data)
return err
}
// TODO 发送消息到监听组件
data, err = dp.Pack(&msg)
u.ReceiveDataToSend(data)
return err
// if msg.ChatType == model.Msg_SINGLE {
// // 单聊。发送给TO_USER
// data, err = dp.Pack(&msg)
// return u.SendData(data)
// }
// if msg.ChatType == model.Msg_GROUP {
// // 群聊。发送给群里的每一个成员。
// data, err = dp.Pack(&msg)
// return u.SendData(data)
// }
return fmt.Errorf("unknown ChatType")
}
func (s *Server) firstMsgComeIn(u contract.IUser, data []byte) error {
// TODO 根据access_token进行用户身份鉴权, 再添加到聊天室
addr := u.GetConn().RemoteAddr().String()
s.UserOnline(addr, u)
// TODO 发送消息到监听组件
return nil
}
// func (s *Server)Stop(){
// fmt.Println("[STOP] EasyIM Server")
// }
......@@ -6,7 +6,6 @@ import (
"net"
"strings"
"github.com/iotames/easyim/contract"
"github.com/iotames/easyim/model"
"github.com/iotames/miniutils"
)
......@@ -15,29 +14,22 @@ const ERR_CONNECT_LOST = "connect lost"
// User. 一个TCP连接。ClentSocket
type User struct {
Name string
Addr string
protocol string
IsClosed bool
message chan []byte
msgCount int
isActive chan bool
conn net.Conn
server contract.IServer
onConnectStart func(u User)
onConnectLost func(u User)
}
// 创建一个用户的API
func NewUser(conn net.Conn, s contract.IServer) *User {
userAddr := conn.RemoteAddr().String()
func NewUser(conn net.Conn) *User {
u := &User{
Name: userAddr,
Addr: userAddr,
message: make(chan []byte),
isActive: make(chan bool),
conn: conn,
server: s,
}
//启动监听当前user channel消息的goroutine
go u.ListenMessage()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册