send_msg.go 11.3 KB
Newer Older
programor_guo's avatar
programor_guo 已提交
1
package msg
H
hailong 已提交
2 3

import (
4 5
	"Open_IM/pkg/common/config"
	"Open_IM/pkg/common/constant"
6
	"Open_IM/pkg/common/db"
7 8 9
	http2 "Open_IM/pkg/common/http"
	"Open_IM/pkg/common/log"
	"Open_IM/pkg/grpc-etcdv3/getcdv3"
10 11
	pbChat "Open_IM/pkg/proto/chat"
	pbGroup "Open_IM/pkg/proto/group"
programor_guo's avatar
programor_guo 已提交
12
	sdk_ws "Open_IM/pkg/proto/sdk_ws"
13
	"Open_IM/pkg/utils"
H
hailong 已提交
14
	"context"
落凡尘.'s avatar
落凡尘. 已提交
15
	"encoding/json"
programor_guo's avatar
programor_guo 已提交
16
	"github.com/garyburd/redigo/redis"
H
hailong 已提交
17
	"math/rand"
落凡尘.'s avatar
落凡尘. 已提交
18
	"net/http"
H
hailong 已提交
19
	"strconv"
落凡尘.'s avatar
落凡尘. 已提交
20
	"strings"
H
hailong 已提交
21 22 23
	"time"
)

落凡尘.'s avatar
落凡尘. 已提交
24
type MsgCallBackReq struct {
25 26 27 28 29 30 31 32 33 34
	SendID       string `json:"sendID"`
	RecvID       string `json:"recvID"`
	Content      string `json:"content"`
	SendTime     int64  `json:"sendTime"`
	MsgFrom      int32  `json:"msgFrom"`
	ContentType  int32  `json:"contentType"`
	SessionType  int32  `json:"sessionType"`
	PlatformID   int32  `json:"senderPlatformID"`
	MsgID        string `json:"msgID"`
	IsOnlineOnly bool   `json:"isOnlineOnly"`
落凡尘.'s avatar
落凡尘. 已提交
35 36 37 38 39 40 41 42 43 44
}
type MsgCallBackResp struct {
	ErrCode         int32  `json:"errCode"`
	ErrMsg          string `json:"errMsg"`
	ResponseErrCode int32  `json:"responseErrCode"`
	ResponseResult  struct {
		ModifiedMsg string `json:"modifiedMsg"`
		Ext         string `json:"ext"`
	}
}
H
hailong 已提交
45

programor_guo's avatar
programor_guo 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
	msg.ServerMsgID = GetMsgID(msg.SendID)
	if msg.SendTime == 0 {
		msg.SendTime = utils.GetCurrentTimestampByNano()
	}
	switch msg.ContentType {
	case constant.Text:
		fallthrough
	case constant.Picture:
		fallthrough
	case constant.Voice:
		fallthrough
	case constant.Video:
		fallthrough
	case constant.File:
		fallthrough
	case constant.AtText:
		fallthrough
	case constant.Merger:
		fallthrough
	case constant.Card:
		fallthrough
	case constant.Location:
		fallthrough
	case constant.Custom:
		fallthrough
	case constant.Quote:
		utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, true)
		utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, true)
		utils.SetSwitchFromOptions(msg.Options, constant.IsSenderSync, true)
	case constant.Revoke:
		utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
	case constant.HasReadReceipt:
		utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
	case constant.Typing:
		utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsSenderSync, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)

	}
}
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
	replay := pbChat.SendMsgResp{}
95 96 97
	log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String())
	//if !utils.VerifyToken(pb.Token, pb.SendID) {
	//	return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
programor_guo's avatar
programor_guo 已提交
98 99 100 101
	rpc.encapsulateMsgData(pb.MsgData)
	msgToMQ := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID}
	//options := utils.JsonStringToMap(pbData.Options)
	isHistory := utils.GetSwitchFromOptions(pb.MsgData.Options, constant.IsHistory)
102
	mReq := MsgCallBackReq{
programor_guo's avatar
programor_guo 已提交
103 104 105 106 107 108 109 110 111
		SendID:      pb.MsgData.SendID,
		RecvID:      pb.MsgData.RecvID,
		Content:     string(pb.MsgData.Content),
		SendTime:    pb.MsgData.SendTime,
		MsgFrom:     pb.MsgData.MsgFrom,
		ContentType: pb.MsgData.ContentType,
		SessionType: pb.MsgData.SessionType,
		PlatformID:  pb.MsgData.SenderPlatformID,
		MsgID:       pb.MsgData.ClientMsgID,
112 113 114 115 116
	}
	if !isHistory {
		mReq.IsOnlineOnly = true
	}
	mResp := MsgCallBackResp{}
落凡尘.'s avatar
落凡尘. 已提交
117
	if config.Config.MessageCallBack.CallbackSwitch {
118
		bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, mReq, config.Config.MessageCallBack.CallBackTimeOut)
落凡尘.'s avatar
落凡尘. 已提交
119 120 121
		if err != nil {
			log.ErrorByKv("callback to Business server err", pb.OperationID, "args", pb.String(), "err", err.Error())
			return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "", 0)
122
		} else if err = json.Unmarshal(bMsg, &mResp); err != nil {
落凡尘.'s avatar
落凡尘. 已提交
123 124 125
			log.ErrorByKv("ws json Unmarshal err", pb.OperationID, "args", pb.String(), "err", err.Error())
			return returnMsg(&replay, pb, 200, err.Error(), "", 0)
		} else {
126 127
			if mResp.ErrCode != 0 {
				return returnMsg(&replay, pb, mResp.ResponseErrCode, mResp.ErrMsg, "", 0)
落凡尘.'s avatar
落凡尘. 已提交
128
			} else {
programor_guo's avatar
programor_guo 已提交
129
				pb.MsgData.Content = []byte(mResp.ResponseResult.ModifiedMsg)
落凡尘.'s avatar
落凡尘. 已提交
130 131
			}
		}
Z
Zzr 已提交
132
	}
programor_guo's avatar
programor_guo 已提交
133
	switch pb.MsgData.SessionType {
Z
Zzr 已提交
134
	case constant.SingleChatType:
programor_guo's avatar
programor_guo 已提交
135
		isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
136
		if isSend {
programor_guo's avatar
programor_guo 已提交
137 138
			msgToMQ.MsgData = pb.MsgData
			err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID)
139
			if err1 != nil {
programor_guo's avatar
programor_guo 已提交
140
				log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String())
141 142 143
				return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
			}
		}
programor_guo's avatar
programor_guo 已提交
144 145 146 147 148 149
		if msgToMQ.MsgData.SendID != msgToMQ.MsgData.RecvID { //Filter messages sent to yourself
			err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID)
			if err2 != nil {
				log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String())
				return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
			}
Z
Zzr 已提交
150
		}
programor_guo's avatar
programor_guo 已提交
151
		return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
Z
Zzr 已提交
152 153 154 155
	case constant.GroupChatType:
		etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
		client := pbGroup.NewGroupClient(etcdConn)
		req := &pbGroup.GetGroupAllMemberReq{
programor_guo's avatar
programor_guo 已提交
156 157
			GroupID:     pb.MsgData.GroupID,
			OperationID: pb.OperationID,
Z
Zzr 已提交
158 159 160
		}
		reply, err := client.GetGroupAllMember(context.Background(), req)
		if err != nil {
programor_guo's avatar
programor_guo 已提交
161
			log.Error(pb.Token, pb.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error())
Z
Zzr 已提交
162 163
			return returnMsg(&replay, pb, 201, err.Error(), "", 0)
		}
programor_guo's avatar
programor_guo 已提交
164 165 166
		if reply.ErrCode != 0 {
			log.Error(pb.Token, pb.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrMsg)
			return returnMsg(&replay, pb, reply.ErrCode, reply.ErrMsg, "", 0)
Z
Zzr 已提交
167
		}
programor_guo's avatar
programor_guo 已提交
168 169
		groupID := pb.MsgData.GroupID
		for _, v := range reply.MemberList {
programor_guo's avatar
programor_guo 已提交
170 171
			pb.MsgData.RecvID = v.UserID
			isSend := modifyMessageByUserMessageReceiveOpt(v.UserID, groupID, constant.GroupChatType, pb)
172
			if isSend {
programor_guo's avatar
programor_guo 已提交
173
				msgToMQ.MsgData = pb.MsgData
programor_guo's avatar
programor_guo 已提交
174
				err := rpc.sendMsgToKafka(&msgToMQ, v.UserID)
175
				if err != nil {
programor_guo's avatar
programor_guo 已提交
176
					log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v.UserID, msgToMQ.String())
177 178
					return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
				}
Z
Zzr 已提交
179
			}
180

Z
Zzr 已提交
181
		}
programor_guo's avatar
programor_guo 已提交
182
		return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
Z
Zzr 已提交
183
	default:
184
		return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
W
wenxu12345 已提交
185
	}
H
hailong 已提交
186
}
W
pb  
wenxu12345 已提交
187

programor_guo's avatar
programor_guo 已提交
188
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error {
H
hailong 已提交
189 190
	pid, offset, err := rpc.producer.SendMessage(m, key)
	if err != nil {
落凡尘.'s avatar
落凡尘. 已提交
191
		log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key)
H
hailong 已提交
192
	}
programor_guo's avatar
programor_guo 已提交
193
	return err
H
hailong 已提交
194 195 196 197 198
}
func GetMsgID(sendID string) string {
	t := time.Now().Format("2006-01-02 15:04:05")
	return t + "-" + sendID + "-" + strconv.Itoa(rand.Int())
}
W
wenxu12345 已提交
199

programor_guo's avatar
programor_guo 已提交
200
func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) {
落凡尘.'s avatar
落凡尘. 已提交
201 202 203
	replay.ErrCode = errCode
	replay.ErrMsg = errMsg
	replay.ServerMsgID = serverMsgID
programor_guo's avatar
programor_guo 已提交
204
	replay.ClientMsgID = pb.MsgData.ClientMsgID
落凡尘.'s avatar
落凡尘. 已提交
205 206 207
	replay.SendTime = sendTime
	return replay, nil
}
W
wenxu12345 已提交
208

programor_guo's avatar
programor_guo 已提交
209
func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) bool {
210
	conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType)
211
	opt, err := db.DB.GetSingleConversationMsgOpt(userID, conversationID)
programor_guo's avatar
programor_guo 已提交
212 213
	if err != nil || err != redis.ErrNil {
		log.NewError(pb.OperationID, "GetSingleConversationMsgOpt from redis err", pb.String())
214 215 216 217 218 219 220 221
		return true
	}
	switch opt {
	case constant.ReceiveMessage:
		return true
	case constant.NotReceiveMessage:
		return false
	case constant.ReceiveNotNotifyMessage:
programor_guo's avatar
programor_guo 已提交
222 223
		if pb.MsgData.Options == nil {
			pb.MsgData.Options = make(map[string]bool, 10)
programor_guo's avatar
programor_guo 已提交
224
		}
programor_guo's avatar
programor_guo 已提交
225
		utils.SetSwitchFromOptions(pb.MsgData.Options, constant.IsOfflinePush, false)
226 227 228 229 230
		return true
	}

	return true
}
W
wenxu12345 已提交
231 232 233 234 235 236 237 238 239 240 241 242

type NotificationMsg struct {
	SendID      string
	RecvID      string
	Content     []byte
	MsgFrom     int32
	ContentType int32
	SessionType int32
	OperationID string
}

func Notification(n *NotificationMsg, onlineUserOnly bool) {
programor_guo's avatar
programor_guo 已提交
243 244 245
	var req pbChat.SendMsgReq
	var msg sdk_ws.MsgData
	var offlineInfo sdk_ws.OfflinePushInfo
programor_guo's avatar
programor_guo 已提交
246
	var title, desc, ex string
programor_guo's avatar
programor_guo 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
	var pushSwitch bool
	req.OperationID = n.OperationID
	msg.SendID = n.SendID
	msg.RecvID = n.RecvID
	msg.Content = n.Content
	msg.MsgFrom = n.MsgFrom
	msg.ContentType = n.ContentType
	msg.SessionType = n.SessionType
	msg.CreateTime = utils.GetCurrentTimestampByNano()
	msg.ClientMsgID = utils.GetMsgID(n.SendID)
	switch n.SessionType {
	case constant.GroupChatType:
		msg.RecvID = ""
		msg.GroupID = n.RecvID
	}
	if onlineUserOnly {
		msg.Options = make(map[string]bool, 10)
programor_guo's avatar
programor_guo 已提交
264
		//utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
programor_guo's avatar
programor_guo 已提交
265 266 267 268 269 270
		utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false)
	}
	offlineInfo.IOSBadgeCount = config.Config.IOSPush.BadgeCount
	offlineInfo.IOSPushSound = config.Config.IOSPush.PushSound
	switch msg.ContentType {
programor_guo's avatar
programor_guo 已提交
271
	case constant.GroupCreatedNotification:
programor_guo's avatar
programor_guo 已提交
272 273 274
		pushSwitch = config.Config.Notification.GroupCreated.OfflinePush.PushSwitch
		title = config.Config.Notification.GroupCreated.OfflinePush.Title
		desc = config.Config.Notification.GroupCreated.OfflinePush.Desc
programor_guo's avatar
programor_guo 已提交
275 276
		ex = config.Config.Notification.GroupCreated.OfflinePush.Ext
	case constant.GroupInfoChangedNotification:
programor_guo's avatar
programor_guo 已提交
277 278 279
		pushSwitch = config.Config.Notification.GroupInfoChanged.OfflinePush.PushSwitch
		title = config.Config.Notification.GroupInfoChanged.OfflinePush.Title
		desc = config.Config.Notification.GroupInfoChanged.OfflinePush.Desc
programor_guo's avatar
programor_guo 已提交
280 281
		ex = config.Config.Notification.GroupInfoChanged.OfflinePush.Ext
	case constant.JoinApplicationNotification:
programor_guo's avatar
programor_guo 已提交
282 283 284
		pushSwitch = config.Config.Notification.ApplyJoinGroup.OfflinePush.PushSwitch
		title = config.Config.Notification.ApplyJoinGroup.OfflinePush.Title
		desc = config.Config.Notification.ApplyJoinGroup.OfflinePush.Desc
programor_guo's avatar
programor_guo 已提交
285
		ex = config.Config.Notification.ApplyJoinGroup.OfflinePush.Ext
programor_guo's avatar
programor_guo 已提交
286 287 288 289
	}
	utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, pushSwitch)
	offlineInfo.Title = title
	offlineInfo.Desc = desc
programor_guo's avatar
programor_guo 已提交
290
	offlineInfo.Ex = ex
programor_guo's avatar
programor_guo 已提交
291 292 293 294 295 296 297 298 299 300
	msg.OfflinePushInfo = &offlineInfo
	req.MsgData = &msg
	etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
	client := pbChat.NewChatClient(etcdConn)
	reply, err := client.SendMsg(context.Background(), &req)
	if err != nil {
		log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), err.Error())
	} else if reply.ErrCode != 0 {
		log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String())
	}
W
wenxu12345 已提交
301
}