diff --git a/src/rpc/auth/open_im_auth.go b/src/rpc/auth/open_im_auth.go new file mode 100644 index 0000000000000000000000000000000000000000..765d83264260d5f21cdde8a56d3cd17130ac89be --- /dev/null +++ b/src/rpc/auth/open_im_auth.go @@ -0,0 +1,13 @@ +package main + +import ( + rpcAuth "Open_IM/src/rpc/auth/auth" + "flag" +) + +func main() { + rpcPort := flag.Int("port", 10600, "RpcToken default listen port 10800") + flag.Parse() + rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort) + rpcServer.Run() +} diff --git a/src/rpc/chat/Makefile b/src/rpc/chat/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..2a2d08918225ec6ad51b83073c728df8ad7865ec --- /dev/null +++ b/src/rpc/chat/Makefile @@ -0,0 +1,25 @@ +.PHONY: all build run gotool install clean help + +BINARY_NAME=open_im_msg +BIN_DIR=../../../bin/ +LAN_FILE=.go +GO_FILE:=${BINARY_NAME}${LAN_FILE} + +all: gotool build + +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE} + +run: + @go run ./ + +gotool: + go fmt ./ + go vet ./ + +install: + make build + mv ${BINARY_NAME} ${BIN_DIR} + +clean: + @if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi diff --git a/src/rpc/chat/chat/pull_message.go b/src/rpc/chat/chat/pull_message.go new file mode 100644 index 0000000000000000000000000000000000000000..bbad5eb5dd0481df09d875ce7b57d6681af4cd21 --- /dev/null +++ b/src/rpc/chat/chat/pull_message.go @@ -0,0 +1,159 @@ +//实现pb定义的rpc服务 +package rpcChat + +import ( + "context" + + commonDB "Open_IM/src/common/db" + "Open_IM/src/common/log" + + "sort" + "strings" + + pbMsg "Open_IM/src/proto/chat" +) + +func (rpc *rpcChat) GetNewSeq(_ context.Context, in *pbMsg.GetNewSeqReq) (*pbMsg.GetNewSeqResp, error) { + log.InfoByKv("rpc getNewSeq is arriving", in.OperationID, in.String()) + //seq, err := model.GetBiggestSeqFromReceive(in.UserID) + seq, err := commonDB.DB.GetUserSeq(in.UserID) + resp := new(pbMsg.GetNewSeqResp) + if err == nil { + resp.Seq = seq + resp.ErrCode = 0 + resp.ErrMsg = "" + return resp, err + } else { + log.ErrorByKv("getSeq from redis error", in.OperationID, "args", in.String(), "err", err.Error()) + resp.Seq = 0 + resp.ErrCode = 0 + resp.ErrMsg = "" + return resp, nil + } + +} + +//func (s *MsgServer) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*pbMsg.PullMessageResp, error) { +// log.InfoByArgs(fmt.Sprintf("rpc pullMessage is arriving,args=%s", in.String())) +// resp := new(pbMsg.PullMessageResp) +// var respMsgFormat []*pbMsg.MsgFormat +// var respUserMsgFormat []*pbMsg.UserMsgFormat +// conn := db.NewDbConnection() +// rows, err := conn.Table("receive r").Select("c.sender_id,c.receiver_id,"+ +// "c.msg_type,c.push_msg_type,c.chat_type,c.msg_id,c.send_content,r.seq,c.send_time,c.sender_nickname,c.receiver_nickname,c.sender_head_url,c.receiver_head_url"). +// Joins("INNER JOIN chat_log c ON r.msg_id = c.msg_id AND r.user_id = ? AND seq BETWEEN ? AND ?", +// in.UserID, in.SeqBegin, in.SeqEnd).Rows() +// if err != nil { +// fmt.Printf("pullMsg data error: %v\n", err) +// resp.ErrCode = 1 +// resp.ErrMsg = err.Error() +// return resp, nil +// } +// defer rows.Close() +// for rows.Next() { +// tempResp := new(pbMsg.MsgFormat) +// rows.Scan(&tempResp.SendID, &tempResp.RecvID, &tempResp.MsgType, &tempResp.PushMsgType, &tempResp.ChatType, +// &tempResp.MsgID, &tempResp.Msg, &tempResp.Seq, &tempResp.Time, &tempResp.SendNickName, &tempResp.RecvNickName, +// &tempResp.SendHeadUrl, &tempResp.RecvHeadUrl) +// respMsgFormat = append(respMsgFormat, tempResp) +// } +// respUserMsgFormat = msgHandleByUser(respMsgFormat, in.UserID) +// return &pbMsg.PullMessageResp{ +// ErrCode: 0, +// ErrMsg: "", +// UserMsg: respUserMsgFormat, +// }, nil +//} +func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*pbMsg.PullMessageResp, error) { + log.InfoByKv("rpc pullMessage is arriving", in.OperationID, "args", in.String()) + resp := new(pbMsg.PullMessageResp) + var respSingleMsgFormat []*pbMsg.GatherFormat + var respGroupMsgFormat []*pbMsg.GatherFormat + SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetUserChat(in.UserID, in.SeqBegin, in.SeqEnd) + if err != nil { + log.ErrorByKv("pullMsg data error", in.OperationID, in.String()) + resp.ErrCode = 1 + resp.ErrMsg = err.Error() + return resp, nil + } + respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID) + respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat) + return &pbMsg.PullMessageResp{ + ErrCode: 0, + ErrMsg: "", + MaxSeq: MaxSeq, + MinSeq: MinSeq, + SingleUserMsg: respSingleMsgFormat, + GroupUserMsg: respGroupMsgFormat, + }, nil +} +func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat { + var userid string + var respMsgFormat []*pbMsg.GatherFormat + m := make(map[string]MsgFormats) + //将消息以用户为维度聚集 + for _, v := range allMsg { + if v.RecvID != ownerId { + userid = v.RecvID + } else { + userid = v.SendID + } + if value, ok := m[userid]; !ok { + var t MsgFormats + m[userid] = append(t, v) + } else { + m[userid] = append(value, v) + } + } + //形成pb格式返回 + for user, msg := range m { + tempUserMsg := new(pbMsg.GatherFormat) + tempUserMsg.ID = user + tempUserMsg.List = msg + sort.Sort(msg) + respMsgFormat = append(respMsgFormat, tempUserMsg) + } + return respMsgFormat +} +func groupMsgHandleByUser(allMsg []*pbMsg.MsgFormat) []*pbMsg.GatherFormat { + var respMsgFormat []*pbMsg.GatherFormat + m := make(map[string]MsgFormats) + //将消息以用户为维度聚集 + for _, v := range allMsg { + //获得群ID + groupID := strings.Split(v.RecvID, " ")[1] + if value, ok := m[groupID]; !ok { + var t MsgFormats + m[groupID] = append(t, v) + } else { + m[groupID] = append(value, v) + } + + } + //形成pb格式返回 + for groupID, msg := range m { + tempUserMsg := new(pbMsg.GatherFormat) + tempUserMsg.ID = groupID + tempUserMsg.List = msg + sort.Sort(msg) + respMsgFormat = append(respMsgFormat, tempUserMsg) + } + return respMsgFormat +} + +type MsgFormats []*pbMsg.MsgFormat + +// 实现sort.Interface接口取元素数量方法 +func (s MsgFormats) Len() int { + return len(s) +} + +// 实现sort.Interface接口比较元素方法 +func (s MsgFormats) Less(i, j int) bool { + return s[i].SendTime < s[j].SendTime +} + +// 实现sort.Interface接口交换元素方法 +func (s MsgFormats) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/src/rpc/chat/chat/rpcChat.go b/src/rpc/chat/chat/rpcChat.go new file mode 100644 index 0000000000000000000000000000000000000000..85a800bdb71b6aafc948c2e08fcb6f7ea0f5cd3f --- /dev/null +++ b/src/rpc/chat/chat/rpcChat.go @@ -0,0 +1,65 @@ +package rpcChat + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/kafka" + log2 "Open_IM/src/common/log" + pbChat "Open_IM/src/proto/chat" + "Open_IM/src/utils" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "google.golang.org/grpc" + "net" + "strconv" + "strings" +) + +type rpcChat struct { + rpcPort int + rpcRegisterName string + etcdSchema string + etcdAddr []string + producer *kafka.Producer +} + +func NewRpcChatServer(port int) *rpcChat { + rc := rpcChat{ + rpcPort: port, + rpcRegisterName: config.Config.RpcRegisterName.OpenImOfflineMessageName, + etcdSchema: config.Config.Etcd.EtcdSchema, + etcdAddr: config.Config.Etcd.EtcdAddr, + } + rc.producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + return &rc +} + +func (rpc *rpcChat) Run() { + log2.Info("", "", "rpc get_token init...") + + address := utils.ServerIP + ":" + strconv.Itoa(rpc.rpcPort) + listener, err := net.Listen("tcp", address) + if err != nil { + log2.Error("", "", "listen network failed, err = %s, address = %s", err.Error(), address) + return + } + log2.Info("", "", "listen network success, address = %s", address) + + //grpc server + srv := grpc.NewServer() + defer srv.GracefulStop() + + //service registers with etcd + + pbChat.RegisterChatServer(srv, rpc) + err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName, 10) + if err != nil { + log2.Error("", "", "register rpc get_token to etcd failed, err = %s", err.Error()) + return + } + + err = srv.Serve(listener) + if err != nil { + log2.Info("", "", "rpc get_token fail, err = %s", err.Error()) + return + } + log2.Info("", "", "rpc get_token init success") +} diff --git a/src/rpc/chat/chat/send_msg.go b/src/rpc/chat/chat/send_msg.go new file mode 100644 index 0000000000000000000000000000000000000000..cd7e5e18ec0ede53c4cd38eeef308b3c8a8cff60 --- /dev/null +++ b/src/rpc/chat/chat/send_msg.go @@ -0,0 +1,50 @@ +package rpcChat + +import ( + "Open_IM/src/common/log" + pbChat "Open_IM/src/proto/chat" + "Open_IM/src/utils" + "context" + "math/rand" + "strconv" + "time" +) + +func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) { + + serverMsgID := GetMsgID(pb.SendID) + pbData := pbChat.WSToMsgSvrChatMsg{} + pbData.MsgFrom = pb.MsgFrom + pbData.SessionType = pb.SessionType + pbData.ContentType = pb.ContentType + pbData.Content = pb.Content + pbData.RecvID = pb.RecvID + pbData.ForceList = pb.ForceList + pbData.OfflineInfo = pb.OffLineInfo + pbData.Options = pb.Options + pbData.PlatformID = pb.PlatformID + pbData.SendID = pb.SendID + pbData.MsgID = serverMsgID + pbData.OperationID = pb.OperationID + pbData.Token = pb.Token + pbData.SendTime = utils.GetCurrentTimestampBySecond() + rpc.sendMsgToKafka(&pbData, pbData.RecvID) + rpc.sendMsgToKafka(&pbData, pbData.SendID) + replay := pbChat.UserSendMsgResp{} + replay.ReqIdentifier = pb.ReqIdentifier + replay.MsgIncr = pb.MsgIncr + replay.ClientMsgID = pb.ClientMsgID + replay.ServerMsgID = serverMsgID + + return &replay, nil +} +func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) { + pid, offset, err := rpc.producer.SendMessage(m, key) + if err != nil { + log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error()) + } +} +func GetMsgID(sendID string) string { + t := time.Now().Format("2006-01-02 15:04:05") + return t + "-" + sendID + "-" + strconv.Itoa(rand.Int()) +} diff --git a/src/rpc/chat/open_im_msg.go b/src/rpc/chat/open_im_msg.go new file mode 100644 index 0000000000000000000000000000000000000000..e1a0a11a4f4da40b2a20720e031a8f2308de8715 --- /dev/null +++ b/src/rpc/chat/open_im_msg.go @@ -0,0 +1,14 @@ +package main + +import ( + rpcChat "Open_IM/src/rpc/chat/chat" + "Open_IM/src/utils" + "flag" +) + +func main() { + rpcPort := flag.String("port", "", "rpc listening port") + flag.Parse() + rpcServer := rpcChat.NewRpcChatServer(utils.StringToInt(*rpcPort)) + rpcServer.Run() +}